aboutsummaryrefslogtreecommitdiff
path: root/backend/tilo.py
diff options
context:
space:
mode:
authorTerry Truong <terry06890@gmail.com>2023-01-29 11:30:47 +1100
committerTerry Truong <terry06890@gmail.com>2023-01-29 11:30:47 +1100
commit8781fdb2b8c530a6c1531ae9e82221eb062e34fb (patch)
treeffd824aa9b945d69b47f012617ee13d98764d078 /backend/tilo.py
parentf5e87ae628bab0eef97b3e3e62f6d71cca9c99c0 (diff)
Adjust backend coding style
Add line spacing, section comments, and import consistency
Diffstat (limited to 'backend/tilo.py')
-rwxr-xr-xbackend/tilo.py107
1 files changed, 76 insertions, 31 deletions
diff --git a/backend/tilo.py b/backend/tilo.py
index 21b5a7f..f33449b 100755
--- a/backend/tilo.py
+++ b/backend/tilo.py
@@ -18,16 +18,20 @@ Expected HTTP query parameters:
"""
from typing import Iterable, cast
-import sys, re
-import urllib.parse, sqlite3
-import gzip, jsonpickle
+import sys
+import re
+import urllib.parse
+import sqlite3
+import gzip
+import jsonpickle
DB_FILE = 'tol_data/data.db'
DEFAULT_SUGG_LIM = 5
MAX_SUGG_LIM = 50
ROOT_NAME = 'cellular organisms'
-# Classes for objects sent as responses (matches lib.ts types in client-side code)
+# ========== Classes for values sent as responses ==========
+
class TolNode:
""" Used when responding to 'node' and 'chain' requests """
def __init__(
@@ -48,52 +52,61 @@ class TolNode:
self.commonName = commonName
self.imgName = imgName
self.iucn = iucn
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, TolNode) and \
(self.otolId, set(self.children), self.parent, self.tips, \
self.pSupport, self.commonName, self.imgName, self.iucn) == \
(other.otolId, set(other.children), other.parent, other.tips, \
other.pSupport, other.commonName, other.imgName, other.iucn)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
+
class SearchSugg:
""" Represents a search suggestion """
def __init__(self, name: str, canonicalName: str | None = None, pop=0):
self.name = name
self.canonicalName = canonicalName
self.pop = pop if pop is not None else 0
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, SearchSugg) and \
(self.name, self.canonicalName, self.pop) == (other.name, other.canonicalName, other.pop)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
- def __hash__(self):
+
+ def __hash__(self): # Used in unit testing
return (self.name, self.canonicalName, self.pop).__hash__()
+
class SearchSuggResponse:
""" Sent as responses to 'sugg' requests """
def __init__(self, searchSuggs: list[SearchSugg], hasMore: bool):
self.suggs = searchSuggs
self.hasMore = hasMore
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, SearchSuggResponse) and \
(set(self.suggs), self.hasMore) == (set(other.suggs), other.hasMore)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
+
class DescInfo:
""" Represents a node's associated description """
def __init__(self, text: str, wikiId: int, fromDbp: bool):
self.text = text
self.wikiId = wikiId
self.fromDbp = fromDbp
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, DescInfo) and \
(self.text, self.wikiId, self.fromDbp) == (other.text, other.wikiId, other.fromDbp)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
+
class ImgInfo:
""" Represents a node's associated image """
def __init__(self, id: int, src: str, url: str, license: str, artist: str, credit: str):
@@ -103,38 +116,44 @@ class ImgInfo:
self.license = license
self.artist = artist
self.credit = credit
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, ImgInfo) and \
(self.id, self.src, self.url, self.license, self.artist, self.credit) == \
(other.id, other.src, other.url, other.license, other.artist, other.credit)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
+
class NodeInfo:
""" Represents info about a node """
def __init__(self, tolNode: TolNode, descInfo: DescInfo | None, imgInfo: ImgInfo | None):
self.tolNode = tolNode
self.descInfo = descInfo
self.imgInfo = imgInfo
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, NodeInfo) and \
(self.tolNode, self.descInfo, self.imgInfo) == (other.tolNode, other.descInfo, other.imgInfo)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
+
class InfoResponse:
""" Sent as responses to 'info' requests """
def __init__(self, nodeInfo: NodeInfo, subNodesInfo: tuple[()] | tuple[NodeInfo | None, NodeInfo | None]):
self.nodeInfo = nodeInfo
self.subNodesInfo = subNodesInfo
- # Used in unit testing
- def __eq__(self, other):
+
+ def __eq__(self, other): # Used in unit testing
return isinstance(other, InfoResponse) and \
(self.nodeInfo, self.subNodesInfo) == (other.nodeInfo, other.subNodesInfo)
- def __repr__(self):
+
+ def __repr__(self): # Used in unit testing
return str(self.__dict__)
-# For data lookup
+# ========== For data lookup ==========
+
def lookupNodes(names: list[str], tree: str, dbCur: sqlite3.Cursor) -> dict[str, TolNode]:
""" For a set of node names, returns a name-to-TolNode map that describes those nodes """
# Get node info
@@ -146,6 +165,7 @@ def lookupNodes(names: list[str], tree: str, dbCur: sqlite3.Cursor) -> dict[str,
query = f'SELECT name, id, tips FROM {nodesTable} WHERE name IN ({queryParamStr})'
for nodeName, otolId, tips in dbCur.execute(query, names):
nameToNodes[nodeName] = TolNode(otolId, [], tips=tips)
+
# Get child info
query = f'SELECT parent, child FROM {edgesTable} WHERE parent IN ({queryParamStr})'
for nodeName, childName in dbCur.execute(query, names):
@@ -158,11 +178,13 @@ def lookupNodes(names: list[str], tree: str, dbCur: sqlite3.Cursor) -> dict[str,
for n, tips in dbCur.execute(query, node.children):
childToTips[n] = tips
node.children.sort(key=lambda n: childToTips[n], reverse=True)
+
# Get parent info
query = f'SELECT parent, child, p_support FROM {edgesTable} WHERE child IN ({queryParamStr})'
for nodeName, childName, pSupport in dbCur.execute(query, names):
nameToNodes[childName].parent = nodeName
nameToNodes[childName].pSupport = pSupport == 1
+
# Get image names
idsToNames = {nameToNodes[n].otolId: n for n in nameToNodes.keys()}
query = f'SELECT {nodesTable}.id from {nodesTable}' \
@@ -170,6 +192,7 @@ def lookupNodes(names: list[str], tree: str, dbCur: sqlite3.Cursor) -> dict[str,
f' WHERE {nodesTable}.id IN ' '({})'.format(','.join(['?'] * len(idsToNames)))
for (otolId,) in dbCur.execute(query, list(idsToNames.keys())):
nameToNodes[idsToNames[otolId]].imgName = otolId + '.jpg'
+
# Get 'linked' images for unresolved names
unresolvedNames = [n for n in nameToNodes if nameToNodes[n].imgName is None]
query = 'SELECT name, otol_ids from linked_imgs WHERE name IN ({})'
@@ -183,21 +206,25 @@ def lookupNodes(names: list[str], tree: str, dbCur: sqlite3.Cursor) -> dict[str,
id1 + '.jpg' if id1 != '' else None,
id2 + '.jpg' if id2 != '' else None,
)
+
# Get preferred-name info
query = f'SELECT name, alt_name FROM names WHERE pref_alt = 1 AND name IN ({queryParamStr})'
for name, altName in dbCur.execute(query, names):
if name in nameToNodes:
nameToNodes[name].commonName = altName
+
# Get IUCN status
query = f'SELECT name, iucn FROM node_iucn WHERE name IN ({queryParamStr})'
for name, iucn in dbCur.execute(query, names):
if name in nameToNodes:
nameToNodes[name].iucn = iucn
- #
+
return nameToNodes
+
def lookupSuggs(searchStr: str, suggLimit: int, tree: str, dbCur: sqlite3.Cursor) -> SearchSuggResponse:
""" For a search string, returns a SearchSuggResponse describing search suggestions """
hasMore = False
+
# Get node names and alt-names, ordering by popularity
nodesTable = f'nodes_{getTableSuffix(tree)}'
nameQuery = f'SELECT {nodesTable}.name, node_pop.pop FROM {nodesTable}' \
@@ -210,6 +237,7 @@ def lookupSuggs(searchStr: str, suggLimit: int, tree: str, dbCur: sqlite3.Cursor
f' WHERE alt_name LIKE ? ORDER BY node_pop.pop DESC'
suggs: dict[str, SearchSugg] = {}
tempLimit = suggLimit + 1 # For determining if 'more suggestions exist'
+
# Prefix search
for altName, nodeName, prefAlt, pop in dbCur.execute(altNameQuery, (searchStr + '%',)):
if nodeName not in suggs or prefAlt == 1 and suggs[nodeName].canonicalName is not None:
@@ -224,6 +252,7 @@ def lookupSuggs(searchStr: str, suggLimit: int, tree: str, dbCur: sqlite3.Cursor
if len(suggs) == tempLimit:
break
suggList = sorted(suggs.values(), key=lambda x: x.pop, reverse=True)
+
# If insufficient results, try substring-search
if len(suggs) < tempLimit:
newNames: set[str] = set()
@@ -243,18 +272,21 @@ def lookupSuggs(searchStr: str, suggLimit: int, tree: str, dbCur: sqlite3.Cursor
if len(suggs) == tempLimit:
break
suggList.extend(sorted([suggs[n] for n in newNames], key=lambda x: x.pop, reverse=True))
- #
+
if len(suggList) > suggLimit:
hasMore = True
return SearchSuggResponse(suggList[:suggLimit], hasMore)
+
def lookupInfo(name: str, tree: str, dbCur: sqlite3.Cursor) -> InfoResponse | None:
""" For a node name, returns a descriptive InfoResponse, or None """
nodesTable = f'nodes_{getTableSuffix(tree)}'
+
# Get node info
nameToNodes = lookupNodes([name], tree, dbCur)
tolNode = nameToNodes[name] if name in nameToNodes else None
if tolNode is None:
return None
+
# Check for compound node
match = re.fullmatch(r'\[(.+) \+ (.+)]', name)
subNames = [match.group(1), match.group(2)] if match is not None else []
@@ -264,6 +296,7 @@ def lookupInfo(name: str, tree: str, dbCur: sqlite3.Cursor) -> InfoResponse | No
subNames = [n if n in nameToSubNodes else None for n in subNames]
nameToNodes.update(nameToSubNodes)
namesToLookup = [name] if not subNames else [n for n in subNames if n is not None]
+
# Get desc info
nameToDescInfo: dict[str, DescInfo] = {}
query = 'SELECT name, desc, wiki_id, from_dbp FROM' \
@@ -271,6 +304,7 @@ def lookupInfo(name: str, tree: str, dbCur: sqlite3.Cursor) -> InfoResponse | No
' WHERE wiki_ids.name IN ({})'.format(','.join(['?'] * len(namesToLookup)))
for nodeName, desc, wikiId, fromDbp in dbCur.execute(query, namesToLookup):
nameToDescInfo[nodeName] = DescInfo(desc, wikiId, fromDbp == 1)
+
# Get image info
nameToImgInfo: dict[str, ImgInfo] = {}
idsToNames = {cast(str, nameToNodes[n].imgName)[:-4]: n
@@ -282,6 +316,7 @@ def lookupInfo(name: str, tree: str, dbCur: sqlite3.Cursor) -> InfoResponse | No
f' WHERE {nodesTable}.id IN ' '({})'.format(','.join(['?'] * len(idsToLookup)))
for id, imgId, imgSrc, url, license, artist, credit in dbCur.execute(query, idsToLookup):
nameToImgInfo[idsToNames[id]] = ImgInfo(imgId, imgSrc, url, license, artist, credit)
+
# Construct response
nodeInfoObjs = [
NodeInfo(
@@ -293,15 +328,19 @@ def lookupInfo(name: str, tree: str, dbCur: sqlite3.Cursor) -> InfoResponse | No
return InfoResponse(
nodeInfoObjs[0],
cast(tuple[()] | tuple[NodeInfo | None, NodeInfo | None], nodeInfoObjs[1:]))
+
def getTableSuffix(tree: str) -> str:
- """ converts a reduced-tree descriptor into a sql-table-suffix """
+ """ Converts a reduced-tree descriptor into a sql-table-suffix """
return 't' if tree == 'trimmed' else 'i' if tree == 'images' else 'p'
+# ========== Entry point ==========
+
def handleReq(dbFile: str, environ: dict[str, str]) -> None | dict[str, TolNode] | SearchSuggResponse | InfoResponse:
""" Queries the database, and constructs a response object """
# Open db
dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
+
# Get query params
queryStr = environ['QUERY_STRING'] if 'QUERY_STRING' in environ else ''
queryDict = urllib.parse.parse_qs(queryStr)
@@ -313,6 +352,7 @@ def handleReq(dbFile: str, environ: dict[str, str]) -> None | dict[str, TolNode]
#(name,) = dbCur.execute(query).fetchone()
reqType = queryDict['type'][0] if 'type' in queryDict else None
tree = queryDict['tree'][0] if 'tree' in queryDict else 'images'
+
# Check for valid 'tree'
if tree is not None and re.fullmatch(r'trimmed|images|picked', tree) is None:
return None
@@ -339,7 +379,7 @@ def handleReq(dbFile: str, environ: dict[str, str]) -> None | dict[str, TolNode]
parent = row[0]
nodesToSkip.add(parent)
nodeName = parent
- #
+
results: dict[str, TolNode] = {}
ranOnce = False
while True:
@@ -378,6 +418,7 @@ def handleReq(dbFile: str, environ: dict[str, str]) -> None | dict[str, TolNode]
except ValueError:
invalidLimit = True
print(f'INFO: Invalid limit {suggLimit}', file=sys.stderr)
+
# Get search suggestions
if not invalidLimit:
return lookupSuggs(name, suggLimit, tree, dbCur)
@@ -385,12 +426,15 @@ def handleReq(dbFile: str, environ: dict[str, str]) -> None | dict[str, TolNode]
infoResponse = lookupInfo(name, tree, dbCur)
if infoResponse is not None:
return infoResponse
+
# On failure, provide empty response
return None
+
def application(environ: dict[str, str], start_response) -> Iterable[bytes]:
""" Entry point for the WSGI script """
# Get response object
val = handleReq(DB_FILE, environ)
+
# Construct response
data = jsonpickle.encode(val, unpicklable=False).encode()
headers = [('Content-type', 'application/json')]
@@ -400,4 +444,5 @@ def application(environ: dict[str, str], start_response) -> Iterable[bytes]:
headers.append(('Content-encoding', 'gzip'))
headers.append(('Content-Length', str(len(data))))
start_response('200 OK', headers)
+
return [data]