diff options
| author | Terry Truong <terry06890@gmail.com> | 2023-01-21 12:21:03 +1100 |
|---|---|---|
| committer | Terry Truong <terry06890@gmail.com> | 2023-01-21 12:32:01 +1100 |
| commit | 0a9b2c2e5eca8a04e37fbdd423379882863237c2 (patch) | |
| tree | 1812bdb6bb13e4f76fdd7ef04075b291f775c213 /backend/histplorer.py | |
| parent | 8321e2f92dbc073b8f1de87895d6620a2021b22e (diff) | |
Adjust backend coding style
Increase line spacing, add section comments, etc
Diffstat (limited to 'backend/histplorer.py')
| -rwxr-xr-x | backend/histplorer.py | 124 |
1 files changed, 84 insertions, 40 deletions
diff --git a/backend/histplorer.py b/backend/histplorer.py index 26e131e..ab061d1 100755 --- a/backend/histplorer.py +++ b/backend/histplorer.py @@ -3,28 +3,33 @@ WSGI script that serves historical data. Expected HTTP query parameters: - type: - If 'events', reply with information on events within a date range, for a given scale - If 'info', reply with information about a given event - If 'sugg', reply with search suggestions for an event search string -- range: With type=events, specifies a historical-date range + If 'events', reply with information on events within a date range, for a given scale. + If 'info', reply with information about a given event. + If 'sugg', reply with search suggestions for an event search string. +- range: With type=events, specifies a historical-date range. If absent, the default is 'all of time'. Examples: range=1000.1910-10-09 means '1000 AD up to and excluding 09/10/1910' range=-13000. means '13000 BC onwards' -- scale: With type=events, specifies a date scale -- incl: With type=events, specifies an event to include, as an event ID -- event: With type=info, specifies the event title to get info for -- input: With type=sugg, specifies a search string to suggest for -- limit: With type=events or type=sugg, specifies the max number of results -- ctgs: With type=events|info|sugg, specifies event categories to restrict results to - Interpreted as a period-separated list of category names (eg: person.place). An empty string is ignored. +- scale: With type=events, specifies a date scale (see SCALES in hist_data/cal.py). +- incl: With type=events, specifies an event to include, as an event ID. +- event: With type=info, specifies the title of an event to get info for. +- input: With type=sugg, specifies a search string to suggest for. +- limit: With type=events or type=sugg, specifies the max number of results. +- ctgs: With type=events|info|sugg, specifies event categories to restrict results to. + Interpreted as a period-separated list of category names (eg: person.place). + An empty string is ignored. - imgonly: With type=events|info|sugg, if present, restricts results to events with images. """ from typing import Iterable, cast -import sys, re -import urllib.parse, sqlite3 -import gzip, jsonpickle +import sys +import re +import urllib.parse +import sqlite3 +import gzip +import jsonpickle + from hist_data.cal import HistDate, dbDateToHistDate, dateToUnit DB_FILE = 'hist_data/data.db' @@ -34,7 +39,8 @@ DEFAULT_REQ_EVENTS = 20 MAX_REQ_SUGGS = 50 DEFAULT_REQ_SUGGS = 5 -# Classes for values sent as responses +# ========== Classes for values sent as responses ========== + class Event: """ Represents an historical event """ def __init__( @@ -57,26 +63,30 @@ class Event: self.ctg = ctg self.imgId = imgId self.pop = pop - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, Event) and \ (self.id, self.title, self.start, self.startUpper, self.end, self.endUpper, \ self.ctg, self.pop, self.imgId) == \ (other.id, other.title, other.start, other.startUpper, other.end, other.endUpper, \ other.ctg, other.pop, other.imgId) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) + class EventResponse: """ Used when responding to type=events requests """ def __init__(self, events: list[Event], unitCounts: dict[int, int] | None): self.events = events self.unitCounts = unitCounts # None indicates exceeding MAX_REQ_UNIT_COUNTS - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, EventResponse) and \ (self.events, self.unitCounts) == (other.events, other.unitCounts) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) + class ImgInfo: """ Represents an event's associated image """ def __init__(self, url: str, license: str, artist: str, credit: str): @@ -84,13 +94,15 @@ class ImgInfo: self.license = license self.artist = artist self.credit = credit - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, ImgInfo) and \ (self.url, self.license, self.artist, self.credit) == \ (other.url, other.license, other.artist, other.credit) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) + class EventInfo: """ Used when responding to type=info requests """ def __init__(self, event: Event, desc: str | None, wikiId: int, imgInfo: ImgInfo | None): @@ -98,29 +110,34 @@ class EventInfo: self.desc = desc self.wikiId = wikiId self.imgInfo = imgInfo - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, EventInfo) and \ (self.event, self.desc, self.wikiId, self.imgInfo) == (other.event, other.desc, other.wikiId, other.imgInfo) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) + class SuggResponse: """ Used when responding to type=sugg requests """ def __init__(self, suggs: list[str], hasMore: bool): self.suggs = suggs self.hasMore = hasMore - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, SuggResponse) and \ (self.suggs, self.hasMore) == (other.suggs, other.hasMore) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) -# Entry point +# ========== Entry point ========== + def application(environ: dict[str, str], start_response) -> Iterable[bytes]: """ Entry point for the WSGI script """ # Get response object val = handleReq(DB_FILE, environ) + # Construct response data = jsonpickle.encode(val, unpicklable=False).encode() headers = [('Content-type', 'application/json')] @@ -130,16 +147,20 @@ def application(environ: dict[str, str], start_response) -> Iterable[bytes]: headers.append(('Content-encoding', 'gzip')) headers.append(('Content-Length', str(len(data)))) start_response('200 OK', headers) + return [data] + def handleReq(dbFile: str, environ: dict[str, str]) -> None | EventResponse | EventInfo | SuggResponse: """ Queries the database, and constructs a response object """ # Open db dbCon = sqlite3.connect(dbFile) dbCur = dbCon.cursor() + # Get query params queryStr = environ['QUERY_STRING'] if 'QUERY_STRING' in environ else '' queryDict = urllib.parse.parse_qs(queryStr) params = {k: v[0] for k, v in queryDict.items()} + # Get data of requested type reqType = queryDict['type'][0] if 'type' in queryDict else None if reqType == 'events': @@ -150,7 +171,8 @@ def handleReq(dbFile: str, environ: dict[str, str]) -> None | EventResponse | Ev return handleSuggReq(params, dbCur) return None -# For type=events +# ========== For handling type=events ========== + def handleEventsReq(params: dict[str, str], dbCur: sqlite3.Cursor) -> EventResponse | None: """ Generates a response for a type=events request """ # Get dates @@ -163,6 +185,7 @@ def handleEventsReq(params: dict[str, str], dbCur: sqlite3.Cursor) -> EventRespo except ValueError: print(f'INFO: Invalid date-range value {dateRange}', file=sys.stderr) return None + # Get scale if 'scale' not in params: print('INFO: No scale provided', file=sys.stderr) @@ -172,12 +195,14 @@ def handleEventsReq(params: dict[str, str], dbCur: sqlite3.Cursor) -> EventRespo except ValueError: print('INFO: Invalid scale value', file=sys.stderr) return None + # Get incl value try: incl = int(params['incl']) if 'incl' in params else None except ValueError: print('INFO: Invalid incl value', file=sys.stderr) return None + # Get result set limit try: resultLimit = int(params['limit']) if 'limit' in params else DEFAULT_REQ_EVENTS @@ -187,13 +212,15 @@ def handleEventsReq(params: dict[str, str], dbCur: sqlite3.Cursor) -> EventRespo if resultLimit <= 0 or resultLimit > MAX_REQ_EVENTS: print(f'INFO: Invalid results limit {resultLimit}', file=sys.stderr) return None - # + ctgs = params['ctgs'].split('.') if 'ctgs' in params else None imgonly = 'imgonly' in params - # + events = lookupEvents(start, end, scale, incl, resultLimit, ctgs, imgonly, dbCur) unitCounts = lookupUnitCounts(start, end, scale, imgonly, dbCur) + return EventResponse(events, unitCounts) + def reqParamToHistDate(s: str): """ Produces a HistDate from strings like '2010-10-3', '-8000', and '' (throws ValueError if invalid) """ if not s: @@ -205,6 +232,7 @@ def reqParamToHistDate(s: str): return HistDate(None, int(m.group(1))) else: return HistDate(True, int(m.group(1)), int(m.group(2)), int(m.group(3))) + def lookupEvents( start: HistDate | None, end: HistDate | None, scale: int, incl: int | None, resultLimit: int, ctgs: list[str] | None, imgonly: bool, dbCur: sqlite3.Cursor) -> list[Event]: @@ -219,6 +247,7 @@ def lookupEvents( ' LEFT JOIN images ON event_imgs.img_id = images.id' constraints = [f'{dispTable}.scale = ?'] params: list[str | int] = [scale] + # Constrain by start/end startUnit = dateToUnit(start, scale) if start is not None else None endUnit = dateToUnit(end, scale) if end is not None else None @@ -232,22 +261,26 @@ def lookupEvents( if endUnit is not None: constraints.append(f'{dispTable}.unit < ?') params.append(endUnit) + # Constrain by event category if ctgs is not None: constraints.append('ctg IN (' + ','.join('?' * len(ctgs)) + ')') params.extend(ctgs) + # Add constraints to query query2 = query if constraints: query2 += ' WHERE ' + ' AND '.join(constraints) query2 += ' ORDER BY pop.pop DESC' query2 += f' LIMIT {resultLimit}' + # Run query results: list[Event] = [] for row in dbCur.execute(query2, params): results.append(eventEntryToResults(row)) if incl is not None and incl == row[0]: incl = None + # Get any additional inclusion if incl is not None: row = dbCur.execute(query + ' WHERE events.id = ?', (incl,)).fetchone() @@ -255,8 +288,9 @@ def lookupEvents( if len(results) == resultLimit: results.pop() results.append(eventEntryToResults(row)) - # + return results + def eventEntryToResults( row: tuple[int, str, int, int | None, int | None, int | None, int, str, int | None, int]) -> Event: eventId, title, start, startUpper, end, endUpper, fmt, ctg, imageId, pop = row @@ -267,11 +301,13 @@ def eventEntryToResults( for i, n in enumerate(dateVals): if n is not None: newDates[i] = dbDateToHistDate(n, fmt, i < 2) - # + return Event(eventId, title, cast(HistDate, newDates[0]), newDates[1], newDates[2], newDates[3], ctg, imageId, pop) + def lookupUnitCounts( start: HistDate | None, end: HistDate | None, scale: int, imgonly: bool, dbCur: sqlite3.Cursor) -> dict[int, int] | None: + """ Return list of units with counts given scale and a date range """ # Build query distTable = 'dist' if not imgonly else 'img_dist' query = f'SELECT unit, count FROM {distTable} WHERE scale = ?' @@ -283,13 +319,15 @@ def lookupUnitCounts( query += ' AND unit < ?' params.append(dateToUnit(end, scale)) query += ' ORDER BY unit ASC LIMIT ' + str(MAX_REQ_UNIT_COUNTS + 1) + # Get results unitCounts: dict[int, int] = {} for unit, count in dbCur.execute(query, params): unitCounts[unit] = count return unitCounts if len(unitCounts) <= MAX_REQ_UNIT_COUNTS else None -# For type=info +# ========== For handling type=info ========== + def handleInfoReq(params: dict[str, str], dbCur: sqlite3.Cursor): """ Generates a response for a type=info request """ if 'event' not in params: @@ -298,6 +336,7 @@ def handleInfoReq(params: dict[str, str], dbCur: sqlite3.Cursor): ctgs = params['ctgs'].split('.') if 'ctgs' in params else None imgonly = 'imgonly' in params return lookupEventInfo(params['event'], ctgs, imgonly, dbCur) + def lookupEventInfo(eventTitle: str, ctgs: list[str] | None, imgonly: bool, dbCur: sqlite3.Cursor) -> EventInfo | None: """ Look up an event with given title, and return a descriptive EventInfo """ imgJoin = 'INNER JOIN' if imgonly else 'LEFT JOIN' @@ -320,7 +359,8 @@ def lookupEventInfo(eventTitle: str, ctgs: list[str] | None, imgonly: bool, dbCu else: return None -# For type=sugg +# ========== For handling type=sugg ========== + def handleSuggReq(params: dict[str, str], dbCur: sqlite3.Cursor): """ Generates a response for a type=sugg request """ # Get search string @@ -331,6 +371,7 @@ def handleSuggReq(params: dict[str, str], dbCur: sqlite3.Cursor): if not searchStr: print('INFO: Empty \'input\' parameter for type=sugg request', file=sys.stderr) return None + # Get result limit try: resultLimit = int(params['limit']) if 'limit' in params else DEFAULT_REQ_SUGGS @@ -340,10 +381,11 @@ def handleSuggReq(params: dict[str, str], dbCur: sqlite3.Cursor): if resultLimit <= 0 or resultLimit > MAX_REQ_SUGGS: print(f'INFO: Invalid suggestion limit {resultLimit}', file=sys.stderr) return None - # + ctgs = params['ctgs'].split('.') if 'ctgs' in params else None imgonly = 'imgonly' in params return lookupSuggs(searchStr, resultLimit, ctgs, imgonly, dbCur) + def lookupSuggs( searchStr: str, resultLimit: int, ctgs: list[str] | None, imgonly: bool, dbCur: sqlite3.Cursor) -> SuggResponse: """ For a search string, returns a SuggResponse describing search suggestions """ @@ -355,10 +397,12 @@ def lookupSuggs( query += ' AND ctg IN (' + ','.join('?' * len(ctgs)) + ')' query += f' ORDER BY pop.pop DESC LIMIT {tempLimit}' suggs: list[str] = [] + # Prefix search params = [searchStr + '%'] + (ctgs if ctgs is not None else []) for (title,) in dbCur.execute(query, params): suggs.append(title) + # If insufficient results, try substring search if len(suggs) < tempLimit: existing = set(suggs) @@ -368,5 +412,5 @@ def lookupSuggs( suggs.append(title) if len(suggs) == tempLimit: break - # + return SuggResponse(suggs[:resultLimit], len(suggs) > resultLimit) |
