aboutsummaryrefslogtreecommitdiff
path: root/backend/histplorer.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/histplorer.py')
-rwxr-xr-xbackend/histplorer.py124
1 files changed, 84 insertions, 40 deletions
diff --git a/backend/histplorer.py b/backend/histplorer.py
index 26e131e..ab061d1 100755
--- a/backend/histplorer.py
+++ b/backend/histplorer.py
@@ -3,28 +3,33 @@ WSGI script that serves historical data.
Expected HTTP query parameters:
- type:
- If 'events', reply with information on events within a date range, for a given scale
- If 'info', reply with information about a given event
- If 'sugg', reply with search suggestions for an event search string
-- range: With type=events, specifies a historical-date range
+ If 'events', reply with information on events within a date range, for a given scale.
+ If 'info', reply with information about a given event.
+ If 'sugg', reply with search suggestions for an event search string.
+- range: With type=events, specifies a historical-date range.
If absent, the default is 'all of time'.
Examples:
range=1000.1910-10-09 means '1000 AD up to and excluding 09/10/1910'
range=-13000. means '13000 BC onwards'
-- scale: With type=events, specifies a date scale
-- incl: With type=events, specifies an event to include, as an event ID
-- event: With type=info, specifies the event title to get info for
-- input: With type=sugg, specifies a search string to suggest for
-- limit: With type=events or type=sugg, specifies the max number of results
-- ctgs: With type=events|info|sugg, specifies event categories to restrict results to
- Interpreted as a period-separated list of category names (eg: person.place). An empty string is ignored.
+- scale: With type=events, specifies a date scale (see SCALES in hist_data/cal.py).
+- incl: With type=events, specifies an event to include, as an event ID.
+- event: With type=info, specifies the title of an event to get info for.
+- input: With type=sugg, specifies a search string to suggest for.
+- limit: With type=events or type=sugg, specifies the max number of results.
+- ctgs: With type=events|info|sugg, specifies event categories to restrict results to.
+ Interpreted as a period-separated list of category names (eg: person.place).
+ An empty string is ignored.
- imgonly: With type=events|info|sugg, if present, restricts results to events with images.
"""
from typing import Iterable, cast
-import sys, re
-import urllib.parse, sqlite3
-import gzip, jsonpickle
+import sys
+import re
+import urllib.parse
+import sqlite3
+import gzip
+import jsonpickle
+
from hist_data.cal import HistDate, dbDateToHistDate, dateToUnit
DB_FILE = 'hist_data/data.db'
@@ -34,7 +39,8 @@ DEFAULT_REQ_EVENTS = 20
MAX_REQ_SUGGS = 50
DEFAULT_REQ_SUGGS = 5
-# Classes for values sent as responses
+# ========== Classes for values sent as responses ==========
+
class Event:
""" Represents an historical event """
def __init__(
@@ -57,26 +63,30 @@ class Event:
self.ctg = ctg
self.imgId = imgId
self.pop = pop
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, Event) and \
(self.id, self.title, self.start, self.startUpper, self.end, self.endUpper, \
self.ctg, self.pop, self.imgId) == \
(other.id, other.title, other.start, other.startUpper, other.end, other.endUpper, \
other.ctg, other.pop, other.imgId)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
+
class EventResponse:
""" Used when responding to type=events requests """
def __init__(self, events: list[Event], unitCounts: dict[int, int] | None):
self.events = events
self.unitCounts = unitCounts # None indicates exceeding MAX_REQ_UNIT_COUNTS
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, EventResponse) and \
(self.events, self.unitCounts) == (other.events, other.unitCounts)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
+
class ImgInfo:
""" Represents an event's associated image """
def __init__(self, url: str, license: str, artist: str, credit: str):
@@ -84,13 +94,15 @@ class ImgInfo:
self.license = license
self.artist = artist
self.credit = credit
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, ImgInfo) and \
(self.url, self.license, self.artist, self.credit) == \
(other.url, other.license, other.artist, other.credit)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
+
class EventInfo:
""" Used when responding to type=info requests """
def __init__(self, event: Event, desc: str | None, wikiId: int, imgInfo: ImgInfo | None):
@@ -98,29 +110,34 @@ class EventInfo:
self.desc = desc
self.wikiId = wikiId
self.imgInfo = imgInfo
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, EventInfo) and \
(self.event, self.desc, self.wikiId, self.imgInfo) == (other.event, other.desc, other.wikiId, other.imgInfo)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
+
class SuggResponse:
""" Used when responding to type=sugg requests """
def __init__(self, suggs: list[str], hasMore: bool):
self.suggs = suggs
self.hasMore = hasMore
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, SuggResponse) and \
(self.suggs, self.hasMore) == (other.suggs, other.hasMore)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
-# Entry point
+# ========== Entry point ==========
+
def application(environ: dict[str, str], start_response) -> Iterable[bytes]:
""" Entry point for the WSGI script """
# Get response object
val = handleReq(DB_FILE, environ)
+
# Construct response
data = jsonpickle.encode(val, unpicklable=False).encode()
headers = [('Content-type', 'application/json')]
@@ -130,16 +147,20 @@ def application(environ: dict[str, str], start_response) -> Iterable[bytes]:
headers.append(('Content-encoding', 'gzip'))
headers.append(('Content-Length', str(len(data))))
start_response('200 OK', headers)
+
return [data]
+
def handleReq(dbFile: str, environ: dict[str, str]) -> None | EventResponse | EventInfo | SuggResponse:
""" Queries the database, and constructs a response object """
# Open db
dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
+
# Get query params
queryStr = environ['QUERY_STRING'] if 'QUERY_STRING' in environ else ''
queryDict = urllib.parse.parse_qs(queryStr)
params = {k: v[0] for k, v in queryDict.items()}
+
# Get data of requested type
reqType = queryDict['type'][0] if 'type' in queryDict else None
if reqType == 'events':
@@ -150,7 +171,8 @@ def handleReq(dbFile: str, environ: dict[str, str]) -> None | EventResponse | Ev
return handleSuggReq(params, dbCur)
return None
-# For type=events
+# ========== For handling type=events ==========
+
def handleEventsReq(params: dict[str, str], dbCur: sqlite3.Cursor) -> EventResponse | None:
""" Generates a response for a type=events request """
# Get dates
@@ -163,6 +185,7 @@ def handleEventsReq(params: dict[str, str], dbCur: sqlite3.Cursor) -> EventRespo
except ValueError:
print(f'INFO: Invalid date-range value {dateRange}', file=sys.stderr)
return None
+
# Get scale
if 'scale' not in params:
print('INFO: No scale provided', file=sys.stderr)
@@ -172,12 +195,14 @@ def handleEventsReq(params: dict[str, str], dbCur: sqlite3.Cursor) -> EventRespo
except ValueError:
print('INFO: Invalid scale value', file=sys.stderr)
return None
+
# Get incl value
try:
incl = int(params['incl']) if 'incl' in params else None
except ValueError:
print('INFO: Invalid incl value', file=sys.stderr)
return None
+
# Get result set limit
try:
resultLimit = int(params['limit']) if 'limit' in params else DEFAULT_REQ_EVENTS
@@ -187,13 +212,15 @@ def handleEventsReq(params: dict[str, str], dbCur: sqlite3.Cursor) -> EventRespo
if resultLimit <= 0 or resultLimit > MAX_REQ_EVENTS:
print(f'INFO: Invalid results limit {resultLimit}', file=sys.stderr)
return None
- #
+
ctgs = params['ctgs'].split('.') if 'ctgs' in params else None
imgonly = 'imgonly' in params
- #
+
events = lookupEvents(start, end, scale, incl, resultLimit, ctgs, imgonly, dbCur)
unitCounts = lookupUnitCounts(start, end, scale, imgonly, dbCur)
+
return EventResponse(events, unitCounts)
+
def reqParamToHistDate(s: str):
""" Produces a HistDate from strings like '2010-10-3', '-8000', and '' (throws ValueError if invalid) """
if not s:
@@ -205,6 +232,7 @@ def reqParamToHistDate(s: str):
return HistDate(None, int(m.group(1)))
else:
return HistDate(True, int(m.group(1)), int(m.group(2)), int(m.group(3)))
+
def lookupEvents(
start: HistDate | None, end: HistDate | None, scale: int, incl: int | None, resultLimit: int,
ctgs: list[str] | None, imgonly: bool, dbCur: sqlite3.Cursor) -> list[Event]:
@@ -219,6 +247,7 @@ def lookupEvents(
' LEFT JOIN images ON event_imgs.img_id = images.id'
constraints = [f'{dispTable}.scale = ?']
params: list[str | int] = [scale]
+
# Constrain by start/end
startUnit = dateToUnit(start, scale) if start is not None else None
endUnit = dateToUnit(end, scale) if end is not None else None
@@ -232,22 +261,26 @@ def lookupEvents(
if endUnit is not None:
constraints.append(f'{dispTable}.unit < ?')
params.append(endUnit)
+
# Constrain by event category
if ctgs is not None:
constraints.append('ctg IN (' + ','.join('?' * len(ctgs)) + ')')
params.extend(ctgs)
+
# Add constraints to query
query2 = query
if constraints:
query2 += ' WHERE ' + ' AND '.join(constraints)
query2 += ' ORDER BY pop.pop DESC'
query2 += f' LIMIT {resultLimit}'
+
# Run query
results: list[Event] = []
for row in dbCur.execute(query2, params):
results.append(eventEntryToResults(row))
if incl is not None and incl == row[0]:
incl = None
+
# Get any additional inclusion
if incl is not None:
row = dbCur.execute(query + ' WHERE events.id = ?', (incl,)).fetchone()
@@ -255,8 +288,9 @@ def lookupEvents(
if len(results) == resultLimit:
results.pop()
results.append(eventEntryToResults(row))
- #
+
return results
+
def eventEntryToResults(
row: tuple[int, str, int, int | None, int | None, int | None, int, str, int | None, int]) -> Event:
eventId, title, start, startUpper, end, endUpper, fmt, ctg, imageId, pop = row
@@ -267,11 +301,13 @@ def eventEntryToResults(
for i, n in enumerate(dateVals):
if n is not None:
newDates[i] = dbDateToHistDate(n, fmt, i < 2)
- #
+
return Event(eventId, title, cast(HistDate, newDates[0]), newDates[1], newDates[2], newDates[3], ctg, imageId, pop)
+
def lookupUnitCounts(
start: HistDate | None, end: HistDate | None, scale: int,
imgonly: bool, dbCur: sqlite3.Cursor) -> dict[int, int] | None:
+ """ Return list of units with counts given scale and a date range """
# Build query
distTable = 'dist' if not imgonly else 'img_dist'
query = f'SELECT unit, count FROM {distTable} WHERE scale = ?'
@@ -283,13 +319,15 @@ def lookupUnitCounts(
query += ' AND unit < ?'
params.append(dateToUnit(end, scale))
query += ' ORDER BY unit ASC LIMIT ' + str(MAX_REQ_UNIT_COUNTS + 1)
+
# Get results
unitCounts: dict[int, int] = {}
for unit, count in dbCur.execute(query, params):
unitCounts[unit] = count
return unitCounts if len(unitCounts) <= MAX_REQ_UNIT_COUNTS else None
-# For type=info
+# ========== For handling type=info ==========
+
def handleInfoReq(params: dict[str, str], dbCur: sqlite3.Cursor):
""" Generates a response for a type=info request """
if 'event' not in params:
@@ -298,6 +336,7 @@ def handleInfoReq(params: dict[str, str], dbCur: sqlite3.Cursor):
ctgs = params['ctgs'].split('.') if 'ctgs' in params else None
imgonly = 'imgonly' in params
return lookupEventInfo(params['event'], ctgs, imgonly, dbCur)
+
def lookupEventInfo(eventTitle: str, ctgs: list[str] | None, imgonly: bool, dbCur: sqlite3.Cursor) -> EventInfo | None:
""" Look up an event with given title, and return a descriptive EventInfo """
imgJoin = 'INNER JOIN' if imgonly else 'LEFT JOIN'
@@ -320,7 +359,8 @@ def lookupEventInfo(eventTitle: str, ctgs: list[str] | None, imgonly: bool, dbCu
else:
return None
-# For type=sugg
+# ========== For handling type=sugg ==========
+
def handleSuggReq(params: dict[str, str], dbCur: sqlite3.Cursor):
""" Generates a response for a type=sugg request """
# Get search string
@@ -331,6 +371,7 @@ def handleSuggReq(params: dict[str, str], dbCur: sqlite3.Cursor):
if not searchStr:
print('INFO: Empty \'input\' parameter for type=sugg request', file=sys.stderr)
return None
+
# Get result limit
try:
resultLimit = int(params['limit']) if 'limit' in params else DEFAULT_REQ_SUGGS
@@ -340,10 +381,11 @@ def handleSuggReq(params: dict[str, str], dbCur: sqlite3.Cursor):
if resultLimit <= 0 or resultLimit > MAX_REQ_SUGGS:
print(f'INFO: Invalid suggestion limit {resultLimit}', file=sys.stderr)
return None
- #
+
ctgs = params['ctgs'].split('.') if 'ctgs' in params else None
imgonly = 'imgonly' in params
return lookupSuggs(searchStr, resultLimit, ctgs, imgonly, dbCur)
+
def lookupSuggs(
searchStr: str, resultLimit: int, ctgs: list[str] | None, imgonly: bool, dbCur: sqlite3.Cursor) -> SuggResponse:
""" For a search string, returns a SuggResponse describing search suggestions """
@@ -355,10 +397,12 @@ def lookupSuggs(
query += ' AND ctg IN (' + ','.join('?' * len(ctgs)) + ')'
query += f' ORDER BY pop.pop DESC LIMIT {tempLimit}'
suggs: list[str] = []
+
# Prefix search
params = [searchStr + '%'] + (ctgs if ctgs is not None else [])
for (title,) in dbCur.execute(query, params):
suggs.append(title)
+
# If insufficient results, try substring search
if len(suggs) < tempLimit:
existing = set(suggs)
@@ -368,5 +412,5 @@ def lookupSuggs(
suggs.append(title)
if len(suggs) == tempLimit:
break
- #
+
return SuggResponse(suggs[:resultLimit], len(suggs) > resultLimit)