diff options
| author | Terry Truong <terry06890@gmail.com> | 2022-09-07 11:37:37 +1000 |
|---|---|---|
| committer | Terry Truong <terry06890@gmail.com> | 2022-09-07 11:37:37 +1000 |
| commit | daccbbd9c73a5292ea9d6746560d7009e5aa666d (patch) | |
| tree | 9156bf011ab6302eb3c0d219d40587d594f51841 /backend/tolData/genOtolData.py | |
| parent | 1a7fe33edafa68a6f759d124bdeee673ff9cf9ff (diff) | |
Add python type annotations
Also use consistent quote symbols
Also use 'is None' instead of '== None'
Also use 'if list1' instead of 'if len(list1) > 0'
Diffstat (limited to 'backend/tolData/genOtolData.py')
| -rwxr-xr-x | backend/tolData/genOtolData.py | 133 |
1 files changed, 66 insertions, 67 deletions
diff --git a/backend/tolData/genOtolData.py b/backend/tolData/genOtolData.py index 6310cc9..d4d6ee8 100755 --- a/backend/tolData/genOtolData.py +++ b/backend/tolData/genOtolData.py @@ -1,6 +1,6 @@ #!/usr/bin/python3 -import sys, re, os +import re, os import json, sqlite3 import argparse @@ -26,16 +26,8 @@ Reads from a picked-names file, if present, which specifies name and node ID pai """, formatter_class=argparse.RawDescriptionHelpFormatter) parser.parse_args() -treeFile = "otol/labelled_supertree_ottnames.tre" # Had about 2.5e9 nodes -annFile = "otol/annotations.json" -dbFile = "data.db" -nodeMap = {} # Maps node IDs to node objects -nameToFirstId = {} # Maps node names to first found ID (names might have multiple IDs) -dupNameToIds = {} # Maps names of nodes with multiple IDs to those IDs -pickedNamesFile = "pickedOtolNames.txt" - class Node: - " Represents a tree-of-life node " + ' Represents a tree-of-life node ' def __init__(self, name, childIds, parentId, tips, pSupport): self.name = name self.childIds = childIds @@ -43,35 +35,43 @@ class Node: self.tips = tips self.pSupport = pSupport -print("Parsing tree file") +treeFile = 'otol/labelled_supertree_ottnames.tre' # Had about 2.5e9 nodes +annFile = 'otol/annotations.json' +dbFile = 'data.db' +nodeMap: dict[str, Node] = {} # Maps node IDs to node objects +nameToFirstId: dict[str, str] = {} # Maps node names to first found ID (names might have multiple IDs) +dupNameToIds: dict[str, list[str]] = {} # Maps names of nodes with multiple IDs to those IDs +pickedNamesFile = 'pickedOtolNames.txt' + +print('Parsing tree file') # Read file -data = None +data: str with open(treeFile) as file: data = file.read() dataIdx = 0 # Parse content iterNum = 0 -def parseNewick(): - " Parses a node using 'data' and 'dataIdx', updates nodeMap accordingly, and returns the node's ID " +def parseNewick() -> str: + """ Parses a node using 'data' and 'dataIdx', updates nodeMap accordingly, and returns the node's ID """ global data, dataIdx, iterNum iterNum += 1 if iterNum % 1e5 == 0: - print(f"At iteration {iterNum}") + print(f'At iteration {iterNum}') # Check for EOF if dataIdx == len(data): - raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}") + raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}') # Check for node - if data[dataIdx] == "(": # parse inner node + if data[dataIdx] == '(': # parse inner node dataIdx += 1 - childIds = [] + childIds: list[str] = [] while True: # Read child childId = parseNewick() childIds.append(childId) if (dataIdx == len(data)): - raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}") + raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}') # Check for next child - if (data[dataIdx] == ","): + if (data[dataIdx] == ','): dataIdx += 1 continue else: @@ -94,10 +94,10 @@ def parseNewick(): updateNameMaps(name, id) nodeMap[id] = Node(name, [], None, 1, False) return id -def parseNewickName(): - " Parses a node name using 'data' and 'dataIdx', and returns a (name, id) pair " +def parseNewickName() -> tuple[str, str]: + """ Parses a node name using 'data' and 'dataIdx', and returns a (name, id) pair """ global data, dataIdx - name = None + name: str end = dataIdx # Get name if (end < len(data) and data[end] == "'"): # Check for quoted name @@ -114,33 +114,33 @@ def parseNewickName(): break end += 1 if inQuote: - raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}") + raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}') name = data[dataIdx:end] dataIdx = end else: - while end < len(data) and not re.match(r"[(),]", data[end]): + while end < len(data) and not re.match(r'[(),]', data[end]): end += 1 if (end == dataIdx): - raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}") + raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}') name = data[dataIdx:end].rstrip() if end == len(data): # Ignore trailing input semicolon name = name[:-1] dataIdx = end # Convert to (name, id) name = name.lower() - if name.startswith("mrca"): + if name.startswith('mrca'): return (name, name) elif name[0] == "'": - match = re.fullmatch(r"'([^\\\"]+) (ott\d+)'", name) - if match == None: - raise Exception(f"ERROR: invalid name \"{name}\"") + match = re.fullmatch(r"'([^\\\']+) (ott\d+)'", name) + if match is None: + raise Exception(f'ERROR: invalid name \'{name}\'') name = match.group(1).replace("''", "'") return (name, match.group(2)) else: - match = re.fullmatch(r"([^\\\"]+)_(ott\d+)", name) - if match == None: - raise Exception(f"ERROR: invalid name \"{name}\"") - return (match.group(1).replace("_", " "), match.group(2)) + match = re.fullmatch(r"([^\\\']+)_(ott\d+)", name) + if match is None: + raise Exception(f'ERROR: invalid name \'{name}\'') + return (match.group(1).replace('_', ' '), match.group(2)) def updateNameMaps(name, id): global nameToFirstId, dupNameToIds if name not in nameToFirstId: @@ -150,18 +150,18 @@ def updateNameMaps(name, id): dupNameToIds[name] = [nameToFirstId[name], id] else: dupNameToIds[name].append(id) -rootId = parseNewick() +rootId: str = parseNewick() -print("Resolving duplicate names") +print('Resolving duplicate names') # Read picked-names file -nameToPickedId = {} +nameToPickedId: dict[str, str] = {} if os.path.exists(pickedNamesFile): with open(pickedNamesFile) as file: for line in file: - (name, _, otolId) = line.rstrip().partition("|") + name, _, otolId = line.rstrip().partition('|') nameToPickedId[name] = otolId # Resolve duplicates -for (dupName, ids) in dupNameToIds.items(): +for dupName, ids in dupNameToIds.items(): # Check for picked id if dupName in nameToPickedId: idToUse = nameToPickedId[dupName] @@ -174,16 +174,16 @@ for (dupName, ids) in dupNameToIds.items(): counter = 2 for id in ids: if id != idToUse: - nodeMap[id].name += f" [{counter}]" + nodeMap[id].name += f' [{counter}]' counter += 1 -print("Changing mrca* names") -def convertMrcaName(id): +print('Changing mrca* names') +def convertMrcaName(id: str): node = nodeMap[id] name = node.name childIds = node.childIds if len(childIds) < 2: - print(f"WARNING: MRCA node \"{name}\" has less than 2 children") + print(f'WARNING: MRCA node \'{name}\' has less than 2 children') return # Get 2 children with most tips childTips = [nodeMap[id].tips for id in childIds] @@ -195,53 +195,52 @@ def convertMrcaName(id): childName1 = nodeMap[childId1].name childName2 = nodeMap[childId2].name # Check for mrca* child names - if childName1.startswith("mrca"): + if childName1.startswith('mrca'): childName1 = convertMrcaName(childId1) - if childName2.startswith("mrca"): + if childName2.startswith('mrca'): childName2 = convertMrcaName(childId2) # Check for composite names - match = re.fullmatch(r"\[(.+) \+ (.+)]", childName1) - if match != None: + match = re.fullmatch(r'\[(.+) \+ (.+)]', childName1) + if match is not None: childName1 = match.group(1) - match = re.fullmatch(r"\[(.+) \+ (.+)]", childName2) - if match != None: + match = re.fullmatch(r'\[(.+) \+ (.+)]', childName2) + if match is not None: childName2 = match.group(1) # Create composite name - node.name = f"[{childName1} + {childName2}]" + node.name = f'[{childName1} + {childName2}]' return childName1 -for (id, node) in nodeMap.items(): - if node.name.startswith("mrca"): +for id, node in nodeMap.items(): + if node.name.startswith('mrca'): convertMrcaName(id) -print("Parsing annotations file") +print('Parsing annotations file') # Read file -data = None with open(annFile) as file: data = file.read() obj = json.loads(data) -nodeAnnsMap = obj["nodes"] +nodeAnnsMap = obj['nodes'] # Find relevant annotations -for (id, node) in nodeMap.items(): +for id, node in nodeMap.items(): # Set has-support value using annotations if id in nodeAnnsMap: nodeAnns = nodeAnnsMap[id] - supportQty = len(nodeAnns["supported_by"]) if "supported_by" in nodeAnns else 0 - conflictQty = len(nodeAnns["conflicts_with"]) if "conflicts_with" in nodeAnns else 0 + supportQty = len(nodeAnns['supported_by']) if 'supported_by' in nodeAnns else 0 + conflictQty = len(nodeAnns['conflicts_with']) if 'conflicts_with' in nodeAnns else 0 node.pSupport = supportQty > 0 and conflictQty == 0 -print("Creating nodes and edges tables") +print('Creating nodes and edges tables') dbCon = sqlite3.connect(dbFile) dbCur = dbCon.cursor() -dbCur.execute("CREATE TABLE nodes (name TEXT PRIMARY KEY, id TEXT UNIQUE, tips INT)") -dbCur.execute("CREATE INDEX nodes_idx_nc ON nodes(name COLLATE NOCASE)") -dbCur.execute("CREATE TABLE edges (parent TEXT, child TEXT, p_support INT, PRIMARY KEY (parent, child))") -dbCur.execute("CREATE INDEX edges_child_idx ON edges(child)") -for (otolId, node) in nodeMap.items(): - dbCur.execute("INSERT INTO nodes VALUES (?, ?, ?)", (node.name, otolId, node.tips)) +dbCur.execute('CREATE TABLE nodes (name TEXT PRIMARY KEY, id TEXT UNIQUE, tips INT)') +dbCur.execute('CREATE INDEX nodes_idx_nc ON nodes(name COLLATE NOCASE)') +dbCur.execute('CREATE TABLE edges (parent TEXT, child TEXT, p_support INT, PRIMARY KEY (parent, child))') +dbCur.execute('CREATE INDEX edges_child_idx ON edges(child)') +for otolId, node in nodeMap.items(): + dbCur.execute('INSERT INTO nodes VALUES (?, ?, ?)', (node.name, otolId, node.tips)) for childId in node.childIds: childNode = nodeMap[childId] - dbCur.execute("INSERT INTO edges VALUES (?, ?, ?)", + dbCur.execute('INSERT INTO edges VALUES (?, ?, ?)', (node.name, childNode.name, 1 if childNode.pSupport else 0)) -print("Closing database") +print('Closing database') dbCon.commit() dbCon.close() |
