diff options
| author | Terry Truong <terry06890@gmail.com> | 2023-01-29 11:30:47 +1100 |
|---|---|---|
| committer | Terry Truong <terry06890@gmail.com> | 2023-01-29 11:30:47 +1100 |
| commit | 8781fdb2b8c530a6c1531ae9e82221eb062e34fb (patch) | |
| tree | ffd824aa9b945d69b47f012617ee13d98764d078 /backend/tilo.py | |
| parent | f5e87ae628bab0eef97b3e3e62f6d71cca9c99c0 (diff) | |
Adjust backend coding style
Add line spacing, section comments, and import consistency
Diffstat (limited to 'backend/tilo.py')
| -rwxr-xr-x | backend/tilo.py | 107 |
1 files changed, 76 insertions, 31 deletions
diff --git a/backend/tilo.py b/backend/tilo.py index 21b5a7f..f33449b 100755 --- a/backend/tilo.py +++ b/backend/tilo.py @@ -18,16 +18,20 @@ Expected HTTP query parameters: """ from typing import Iterable, cast -import sys, re -import urllib.parse, sqlite3 -import gzip, jsonpickle +import sys +import re +import urllib.parse +import sqlite3 +import gzip +import jsonpickle DB_FILE = 'tol_data/data.db' DEFAULT_SUGG_LIM = 5 MAX_SUGG_LIM = 50 ROOT_NAME = 'cellular organisms' -# Classes for objects sent as responses (matches lib.ts types in client-side code) +# ========== Classes for values sent as responses ========== + class TolNode: """ Used when responding to 'node' and 'chain' requests """ def __init__( @@ -48,52 +52,61 @@ class TolNode: self.commonName = commonName self.imgName = imgName self.iucn = iucn - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, TolNode) and \ (self.otolId, set(self.children), self.parent, self.tips, \ self.pSupport, self.commonName, self.imgName, self.iucn) == \ (other.otolId, set(other.children), other.parent, other.tips, \ other.pSupport, other.commonName, other.imgName, other.iucn) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) + class SearchSugg: """ Represents a search suggestion """ def __init__(self, name: str, canonicalName: str | None = None, pop=0): self.name = name self.canonicalName = canonicalName self.pop = pop if pop is not None else 0 - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, SearchSugg) and \ (self.name, self.canonicalName, self.pop) == (other.name, other.canonicalName, other.pop) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) - def __hash__(self): + + def __hash__(self): # Used in unit testing return (self.name, self.canonicalName, self.pop).__hash__() + class SearchSuggResponse: """ Sent as responses to 'sugg' requests """ def __init__(self, searchSuggs: list[SearchSugg], hasMore: bool): self.suggs = searchSuggs self.hasMore = hasMore - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, SearchSuggResponse) and \ (set(self.suggs), self.hasMore) == (set(other.suggs), other.hasMore) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) + class DescInfo: """ Represents a node's associated description """ def __init__(self, text: str, wikiId: int, fromDbp: bool): self.text = text self.wikiId = wikiId self.fromDbp = fromDbp - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, DescInfo) and \ (self.text, self.wikiId, self.fromDbp) == (other.text, other.wikiId, other.fromDbp) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) + class ImgInfo: """ Represents a node's associated image """ def __init__(self, id: int, src: str, url: str, license: str, artist: str, credit: str): @@ -103,38 +116,44 @@ class ImgInfo: self.license = license self.artist = artist self.credit = credit - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, ImgInfo) and \ (self.id, self.src, self.url, self.license, self.artist, self.credit) == \ (other.id, other.src, other.url, other.license, other.artist, other.credit) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) + class NodeInfo: """ Represents info about a node """ def __init__(self, tolNode: TolNode, descInfo: DescInfo | None, imgInfo: ImgInfo | None): self.tolNode = tolNode self.descInfo = descInfo self.imgInfo = imgInfo - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, NodeInfo) and \ (self.tolNode, self.descInfo, self.imgInfo) == (other.tolNode, other.descInfo, other.imgInfo) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) + class InfoResponse: """ Sent as responses to 'info' requests """ def __init__(self, nodeInfo: NodeInfo, subNodesInfo: tuple[()] | tuple[NodeInfo | None, NodeInfo | None]): self.nodeInfo = nodeInfo self.subNodesInfo = subNodesInfo - # Used in unit testing - def __eq__(self, other): + + def __eq__(self, other): # Used in unit testing return isinstance(other, InfoResponse) and \ (self.nodeInfo, self.subNodesInfo) == (other.nodeInfo, other.subNodesInfo) - def __repr__(self): + + def __repr__(self): # Used in unit testing return str(self.__dict__) -# For data lookup +# ========== For data lookup ========== + def lookupNodes(names: list[str], tree: str, dbCur: sqlite3.Cursor) -> dict[str, TolNode]: """ For a set of node names, returns a name-to-TolNode map that describes those nodes """ # Get node info @@ -146,6 +165,7 @@ def lookupNodes(names: list[str], tree: str, dbCur: sqlite3.Cursor) -> dict[str, query = f'SELECT name, id, tips FROM {nodesTable} WHERE name IN ({queryParamStr})' for nodeName, otolId, tips in dbCur.execute(query, names): nameToNodes[nodeName] = TolNode(otolId, [], tips=tips) + # Get child info query = f'SELECT parent, child FROM {edgesTable} WHERE parent IN ({queryParamStr})' for nodeName, childName in dbCur.execute(query, names): @@ -158,11 +178,13 @@ def lookupNodes(names: list[str], tree: str, dbCur: sqlite3.Cursor) -> dict[str, for n, tips in dbCur.execute(query, node.children): childToTips[n] = tips node.children.sort(key=lambda n: childToTips[n], reverse=True) + # Get parent info query = f'SELECT parent, child, p_support FROM {edgesTable} WHERE child IN ({queryParamStr})' for nodeName, childName, pSupport in dbCur.execute(query, names): nameToNodes[childName].parent = nodeName nameToNodes[childName].pSupport = pSupport == 1 + # Get image names idsToNames = {nameToNodes[n].otolId: n for n in nameToNodes.keys()} query = f'SELECT {nodesTable}.id from {nodesTable}' \ @@ -170,6 +192,7 @@ def lookupNodes(names: list[str], tree: str, dbCur: sqlite3.Cursor) -> dict[str, f' WHERE {nodesTable}.id IN ' '({})'.format(','.join(['?'] * len(idsToNames))) for (otolId,) in dbCur.execute(query, list(idsToNames.keys())): nameToNodes[idsToNames[otolId]].imgName = otolId + '.jpg' + # Get 'linked' images for unresolved names unresolvedNames = [n for n in nameToNodes if nameToNodes[n].imgName is None] query = 'SELECT name, otol_ids from linked_imgs WHERE name IN ({})' @@ -183,21 +206,25 @@ def lookupNodes(names: list[str], tree: str, dbCur: sqlite3.Cursor) -> dict[str, id1 + '.jpg' if id1 != '' else None, id2 + '.jpg' if id2 != '' else None, ) + # Get preferred-name info query = f'SELECT name, alt_name FROM names WHERE pref_alt = 1 AND name IN ({queryParamStr})' for name, altName in dbCur.execute(query, names): if name in nameToNodes: nameToNodes[name].commonName = altName + # Get IUCN status query = f'SELECT name, iucn FROM node_iucn WHERE name IN ({queryParamStr})' for name, iucn in dbCur.execute(query, names): if name in nameToNodes: nameToNodes[name].iucn = iucn - # + return nameToNodes + def lookupSuggs(searchStr: str, suggLimit: int, tree: str, dbCur: sqlite3.Cursor) -> SearchSuggResponse: """ For a search string, returns a SearchSuggResponse describing search suggestions """ hasMore = False + # Get node names and alt-names, ordering by popularity nodesTable = f'nodes_{getTableSuffix(tree)}' nameQuery = f'SELECT {nodesTable}.name, node_pop.pop FROM {nodesTable}' \ @@ -210,6 +237,7 @@ def lookupSuggs(searchStr: str, suggLimit: int, tree: str, dbCur: sqlite3.Cursor f' WHERE alt_name LIKE ? ORDER BY node_pop.pop DESC' suggs: dict[str, SearchSugg] = {} tempLimit = suggLimit + 1 # For determining if 'more suggestions exist' + # Prefix search for altName, nodeName, prefAlt, pop in dbCur.execute(altNameQuery, (searchStr + '%',)): if nodeName not in suggs or prefAlt == 1 and suggs[nodeName].canonicalName is not None: @@ -224,6 +252,7 @@ def lookupSuggs(searchStr: str, suggLimit: int, tree: str, dbCur: sqlite3.Cursor if len(suggs) == tempLimit: break suggList = sorted(suggs.values(), key=lambda x: x.pop, reverse=True) + # If insufficient results, try substring-search if len(suggs) < tempLimit: newNames: set[str] = set() @@ -243,18 +272,21 @@ def lookupSuggs(searchStr: str, suggLimit: int, tree: str, dbCur: sqlite3.Cursor if len(suggs) == tempLimit: break suggList.extend(sorted([suggs[n] for n in newNames], key=lambda x: x.pop, reverse=True)) - # + if len(suggList) > suggLimit: hasMore = True return SearchSuggResponse(suggList[:suggLimit], hasMore) + def lookupInfo(name: str, tree: str, dbCur: sqlite3.Cursor) -> InfoResponse | None: """ For a node name, returns a descriptive InfoResponse, or None """ nodesTable = f'nodes_{getTableSuffix(tree)}' + # Get node info nameToNodes = lookupNodes([name], tree, dbCur) tolNode = nameToNodes[name] if name in nameToNodes else None if tolNode is None: return None + # Check for compound node match = re.fullmatch(r'\[(.+) \+ (.+)]', name) subNames = [match.group(1), match.group(2)] if match is not None else [] @@ -264,6 +296,7 @@ def lookupInfo(name: str, tree: str, dbCur: sqlite3.Cursor) -> InfoResponse | No subNames = [n if n in nameToSubNodes else None for n in subNames] nameToNodes.update(nameToSubNodes) namesToLookup = [name] if not subNames else [n for n in subNames if n is not None] + # Get desc info nameToDescInfo: dict[str, DescInfo] = {} query = 'SELECT name, desc, wiki_id, from_dbp FROM' \ @@ -271,6 +304,7 @@ def lookupInfo(name: str, tree: str, dbCur: sqlite3.Cursor) -> InfoResponse | No ' WHERE wiki_ids.name IN ({})'.format(','.join(['?'] * len(namesToLookup))) for nodeName, desc, wikiId, fromDbp in dbCur.execute(query, namesToLookup): nameToDescInfo[nodeName] = DescInfo(desc, wikiId, fromDbp == 1) + # Get image info nameToImgInfo: dict[str, ImgInfo] = {} idsToNames = {cast(str, nameToNodes[n].imgName)[:-4]: n @@ -282,6 +316,7 @@ def lookupInfo(name: str, tree: str, dbCur: sqlite3.Cursor) -> InfoResponse | No f' WHERE {nodesTable}.id IN ' '({})'.format(','.join(['?'] * len(idsToLookup))) for id, imgId, imgSrc, url, license, artist, credit in dbCur.execute(query, idsToLookup): nameToImgInfo[idsToNames[id]] = ImgInfo(imgId, imgSrc, url, license, artist, credit) + # Construct response nodeInfoObjs = [ NodeInfo( @@ -293,15 +328,19 @@ def lookupInfo(name: str, tree: str, dbCur: sqlite3.Cursor) -> InfoResponse | No return InfoResponse( nodeInfoObjs[0], cast(tuple[()] | tuple[NodeInfo | None, NodeInfo | None], nodeInfoObjs[1:])) + def getTableSuffix(tree: str) -> str: - """ converts a reduced-tree descriptor into a sql-table-suffix """ + """ Converts a reduced-tree descriptor into a sql-table-suffix """ return 't' if tree == 'trimmed' else 'i' if tree == 'images' else 'p' +# ========== Entry point ========== + def handleReq(dbFile: str, environ: dict[str, str]) -> None | dict[str, TolNode] | SearchSuggResponse | InfoResponse: """ Queries the database, and constructs a response object """ # Open db dbCon = sqlite3.connect(dbFile) dbCur = dbCon.cursor() + # Get query params queryStr = environ['QUERY_STRING'] if 'QUERY_STRING' in environ else '' queryDict = urllib.parse.parse_qs(queryStr) @@ -313,6 +352,7 @@ def handleReq(dbFile: str, environ: dict[str, str]) -> None | dict[str, TolNode] #(name,) = dbCur.execute(query).fetchone() reqType = queryDict['type'][0] if 'type' in queryDict else None tree = queryDict['tree'][0] if 'tree' in queryDict else 'images' + # Check for valid 'tree' if tree is not None and re.fullmatch(r'trimmed|images|picked', tree) is None: return None @@ -339,7 +379,7 @@ def handleReq(dbFile: str, environ: dict[str, str]) -> None | dict[str, TolNode] parent = row[0] nodesToSkip.add(parent) nodeName = parent - # + results: dict[str, TolNode] = {} ranOnce = False while True: @@ -378,6 +418,7 @@ def handleReq(dbFile: str, environ: dict[str, str]) -> None | dict[str, TolNode] except ValueError: invalidLimit = True print(f'INFO: Invalid limit {suggLimit}', file=sys.stderr) + # Get search suggestions if not invalidLimit: return lookupSuggs(name, suggLimit, tree, dbCur) @@ -385,12 +426,15 @@ def handleReq(dbFile: str, environ: dict[str, str]) -> None | dict[str, TolNode] infoResponse = lookupInfo(name, tree, dbCur) if infoResponse is not None: return infoResponse + # On failure, provide empty response return None + def application(environ: dict[str, str], start_response) -> Iterable[bytes]: """ Entry point for the WSGI script """ # Get response object val = handleReq(DB_FILE, environ) + # Construct response data = jsonpickle.encode(val, unpicklable=False).encode() headers = [('Content-type', 'application/json')] @@ -400,4 +444,5 @@ def application(environ: dict[str, str], start_response) -> Iterable[bytes]: headers.append(('Content-encoding', 'gzip')) headers.append(('Content-Length', str(len(data)))) start_response('200 OK', headers) + return [data] |
