aboutsummaryrefslogtreecommitdiff
path: root/backend/histplorer.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/histplorer.py')
-rwxr-xr-xbackend/histplorer.py84
1 files changed, 39 insertions, 45 deletions
diff --git a/backend/histplorer.py b/backend/histplorer.py
index 72ef88e..edd675f 100755
--- a/backend/histplorer.py
+++ b/backend/histplorer.py
@@ -23,39 +23,17 @@ from typing import Iterable
import sys, re
import urllib.parse, sqlite3
import gzip, jsonpickle
-from hist_data.cal import gregorianToJdn, jdnToGregorian, jdnToJulian
+from hist_data.cal import gregorianToJdn, HistDate, dbDateToHistDate, dateToUnit
DB_FILE = 'hist_data/data.db'
MAX_REQ_EVENTS = 500
DEFAULT_REQ_EVENTS = 20
MAX_REQ_SUGGS = 50
DEFAULT_REQ_SUGGS = 5
-MIN_CAL_YEAR = -4713 # Disallow within-year dates before this year
# Classes for objects sent as responses
-class HistDate:
- """
- Represents a historical date
- - 'year' may be negative (-1 means 1 BCE)
- - 'month' and 'day' are at least 1, if given
- - 'gcal' may be:
- - True: Indicates a Gregorian calendar date
- - False: Means the date should, for display, be converted to a Julian calendar date
- - None: 'month' and 'day' are 1 (used for dates before the Julian period starting year 4713 BCE)
- """
- def __init__(self, gcal: bool | None, year: int, month=1, day=1):
- self.gcal = gcal
- self.year = year
- self.month = month
- self.day = day
- # Used in unit testing
- def __eq__(self, other):
- return isinstance(other, HistDate) and \
- (self.gcal, self.year, self.month, self.day) == (other.gcal, other.year, other.month, other.day)
- def __repr__(self):
- return str(self.__dict__)
class Event:
- """ Used when responding to type=events requests """
+ """ Represents an historical event """
def __init__(
self,
id: int,
@@ -85,6 +63,17 @@ class Event:
other.ctg, other.pop, other.imgId)
def __repr__(self):
return str(self.__dict__)
+class EventResponse:
+ """ Used when responding to type=events requests """
+ def __init__(self, events: list[Event], unitCounts: dict[int, int]):
+ self.events = events
+ self.unitCounts = unitCounts
+ # Used in unit testing
+ def __eq__(self, other):
+ return isinstance(other, EventResponse) and \
+ (self.events, self.unitCounts) == (other.events, other.unitCounts)
+ def __repr__(self):
+ return str(self.__dict__)
class ImgInfo:
""" Represents an event's associated image """
def __init__(self, url: str, license: str, artist: str, credit: str):
@@ -138,7 +127,7 @@ def application(environ: dict[str, str], start_response) -> Iterable[bytes]:
headers.append(('Content-Length', str(len(data))))
start_response('200 OK', headers)
return [data]
-def handleReq(dbFile: str, environ: dict[str, str]) -> None | list[Event] | EventInfo | SuggResponse:
+def handleReq(dbFile: str, environ: dict[str, str]) -> None | EventResponse | EventInfo | SuggResponse:
""" Queries the database, and constructs a response object """
# Open db
dbCon = sqlite3.connect(dbFile)
@@ -169,7 +158,7 @@ def reqParamToHistDate(s: str):
return HistDate(True, int(m.group(1)), int(m.group(2)), int(m.group(3)))
# For type=events
-def handleEventsReq(params: dict[str, str], dbCur: sqlite3.Cursor):
+def handleEventsReq(params: dict[str, str], dbCur: sqlite3.Cursor) -> EventResponse | None:
""" Generates a response for a type=events request """
# Get dates
dateRange = params['range'] if 'range' in params else '.'
@@ -208,9 +197,11 @@ def handleEventsReq(params: dict[str, str], dbCur: sqlite3.Cursor):
print(f'INFO: Invalid results limit {resultLimit}', file=sys.stderr)
return None
#
- return lookupEvents(start, end, scale, ctg, incl, resultLimit, dbCur)
+ events = lookupEvents(start, end, scale, ctg, incl, resultLimit, dbCur)
+ unitCounts = lookupUnitCounts(start, end, scale, dbCur)
+ return EventResponse(events, unitCounts)
def lookupEvents(start: HistDate | None, end: HistDate | None, scale: int, ctg: str | None,
- incl: int | None, resultLimit: int, dbCur: sqlite3.Cursor) -> list[Event] | None:
+ incl: int | None, resultLimit: int, dbCur: sqlite3.Cursor) -> list[Event]:
""" Looks for events within a date range, in given scale,
restricted by event category, an optional particular inclusion, and a result limit """
query = \
@@ -270,31 +261,34 @@ def lookupEvents(start: HistDate | None, end: HistDate | None, scale: int, ctg:
#
return results
def eventEntryToResults(
- row: tuple[int, str, int, int | None, int | None, int | None, int, str, int, int | None]) -> Event:
+ row: tuple[int, str, int, int | None, int | None, int | None, int, str, int, int]) -> Event:
eventId, title, start, startUpper, end, endUpper, fmt, ctg, imageId, pop = row
""" Helper for converting an 'events' db entry into an Event object """
# Convert dates
dateVals: list[int | None] = [start, startUpper, end, endUpper]
newDates: list[HistDate | None] = [None for n in dateVals]
for i, n in enumerate(dateVals):
- if n is None:
- continue
- elif fmt == 0:
- if n >= MIN_CAL_YEAR:
- newDates[i] = HistDate(True, n, 1, 1)
- else:
- newDates[i] = HistDate(None, n)
- elif fmt == 1:
- newDates[i] = HistDate(False, *jdnToJulian(n))
- elif fmt == 2:
- newDates[i] = HistDate(True, *jdnToGregorian(n))
- elif fmt == 3:
- if i in [0, 2]:
- newDates[i] = HistDate(False, *jdnToJulian(n))
- else:
- newDates[i] = HistDate(True, *jdnToGregorian(n))
+ if n:
+ newDates[i] = dbDateToHistDate(n, fmt, i < 2)
#
return Event(eventId, title, newDates[0], newDates[1], newDates[2], newDates[3], ctg, imageId, pop)
+def lookupUnitCounts(
+ start: HistDate | None, end: HistDate | None, scale: int, dbCur: sqlite3.Cursor) -> dict[int, int]:
+ # Build query
+ query = 'SELECT unit, count FROM dist WHERE scale = ?'
+ params = [scale]
+ if start:
+ query += ' AND unit >= ?'
+ params.append(dateToUnit(start, scale))
+ if end:
+ query += ' AND unit <= ?'
+ params.append(dateToUnit(end, scale))
+ query += ' ORDER BY unit ASC LIMIT ' + str(MAX_REQ_EVENTS)
+ # Get results
+ unitCounts: dict[int, int] = {}
+ for unit, count in dbCur.execute(query, params):
+ unitCounts[unit] = count
+ return unitCounts
# For type=info
def handleInfoReq(params: dict[str, str], dbCur: sqlite3.Cursor):