aboutsummaryrefslogtreecommitdiff
path: root/backend/tolData/genOtolData.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/tolData/genOtolData.py')
-rwxr-xr-xbackend/tolData/genOtolData.py133
1 files changed, 66 insertions, 67 deletions
diff --git a/backend/tolData/genOtolData.py b/backend/tolData/genOtolData.py
index 6310cc9..d4d6ee8 100755
--- a/backend/tolData/genOtolData.py
+++ b/backend/tolData/genOtolData.py
@@ -1,6 +1,6 @@
#!/usr/bin/python3
-import sys, re, os
+import re, os
import json, sqlite3
import argparse
@@ -26,16 +26,8 @@ Reads from a picked-names file, if present, which specifies name and node ID pai
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-treeFile = "otol/labelled_supertree_ottnames.tre" # Had about 2.5e9 nodes
-annFile = "otol/annotations.json"
-dbFile = "data.db"
-nodeMap = {} # Maps node IDs to node objects
-nameToFirstId = {} # Maps node names to first found ID (names might have multiple IDs)
-dupNameToIds = {} # Maps names of nodes with multiple IDs to those IDs
-pickedNamesFile = "pickedOtolNames.txt"
-
class Node:
- " Represents a tree-of-life node "
+ ' Represents a tree-of-life node '
def __init__(self, name, childIds, parentId, tips, pSupport):
self.name = name
self.childIds = childIds
@@ -43,35 +35,43 @@ class Node:
self.tips = tips
self.pSupport = pSupport
-print("Parsing tree file")
+treeFile = 'otol/labelled_supertree_ottnames.tre' # Had about 2.5e9 nodes
+annFile = 'otol/annotations.json'
+dbFile = 'data.db'
+nodeMap: dict[str, Node] = {} # Maps node IDs to node objects
+nameToFirstId: dict[str, str] = {} # Maps node names to first found ID (names might have multiple IDs)
+dupNameToIds: dict[str, list[str]] = {} # Maps names of nodes with multiple IDs to those IDs
+pickedNamesFile = 'pickedOtolNames.txt'
+
+print('Parsing tree file')
# Read file
-data = None
+data: str
with open(treeFile) as file:
data = file.read()
dataIdx = 0
# Parse content
iterNum = 0
-def parseNewick():
- " Parses a node using 'data' and 'dataIdx', updates nodeMap accordingly, and returns the node's ID "
+def parseNewick() -> str:
+ """ Parses a node using 'data' and 'dataIdx', updates nodeMap accordingly, and returns the node's ID """
global data, dataIdx, iterNum
iterNum += 1
if iterNum % 1e5 == 0:
- print(f"At iteration {iterNum}")
+ print(f'At iteration {iterNum}')
# Check for EOF
if dataIdx == len(data):
- raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}")
+ raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}')
# Check for node
- if data[dataIdx] == "(": # parse inner node
+ if data[dataIdx] == '(': # parse inner node
dataIdx += 1
- childIds = []
+ childIds: list[str] = []
while True:
# Read child
childId = parseNewick()
childIds.append(childId)
if (dataIdx == len(data)):
- raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}")
+ raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}')
# Check for next child
- if (data[dataIdx] == ","):
+ if (data[dataIdx] == ','):
dataIdx += 1
continue
else:
@@ -94,10 +94,10 @@ def parseNewick():
updateNameMaps(name, id)
nodeMap[id] = Node(name, [], None, 1, False)
return id
-def parseNewickName():
- " Parses a node name using 'data' and 'dataIdx', and returns a (name, id) pair "
+def parseNewickName() -> tuple[str, str]:
+ """ Parses a node name using 'data' and 'dataIdx', and returns a (name, id) pair """
global data, dataIdx
- name = None
+ name: str
end = dataIdx
# Get name
if (end < len(data) and data[end] == "'"): # Check for quoted name
@@ -114,33 +114,33 @@ def parseNewickName():
break
end += 1
if inQuote:
- raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}")
+ raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}')
name = data[dataIdx:end]
dataIdx = end
else:
- while end < len(data) and not re.match(r"[(),]", data[end]):
+ while end < len(data) and not re.match(r'[(),]', data[end]):
end += 1
if (end == dataIdx):
- raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}")
+ raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}')
name = data[dataIdx:end].rstrip()
if end == len(data): # Ignore trailing input semicolon
name = name[:-1]
dataIdx = end
# Convert to (name, id)
name = name.lower()
- if name.startswith("mrca"):
+ if name.startswith('mrca'):
return (name, name)
elif name[0] == "'":
- match = re.fullmatch(r"'([^\\\"]+) (ott\d+)'", name)
- if match == None:
- raise Exception(f"ERROR: invalid name \"{name}\"")
+ match = re.fullmatch(r"'([^\\\']+) (ott\d+)'", name)
+ if match is None:
+ raise Exception(f'ERROR: invalid name \'{name}\'')
name = match.group(1).replace("''", "'")
return (name, match.group(2))
else:
- match = re.fullmatch(r"([^\\\"]+)_(ott\d+)", name)
- if match == None:
- raise Exception(f"ERROR: invalid name \"{name}\"")
- return (match.group(1).replace("_", " "), match.group(2))
+ match = re.fullmatch(r"([^\\\']+)_(ott\d+)", name)
+ if match is None:
+ raise Exception(f'ERROR: invalid name \'{name}\'')
+ return (match.group(1).replace('_', ' '), match.group(2))
def updateNameMaps(name, id):
global nameToFirstId, dupNameToIds
if name not in nameToFirstId:
@@ -150,18 +150,18 @@ def updateNameMaps(name, id):
dupNameToIds[name] = [nameToFirstId[name], id]
else:
dupNameToIds[name].append(id)
-rootId = parseNewick()
+rootId: str = parseNewick()
-print("Resolving duplicate names")
+print('Resolving duplicate names')
# Read picked-names file
-nameToPickedId = {}
+nameToPickedId: dict[str, str] = {}
if os.path.exists(pickedNamesFile):
with open(pickedNamesFile) as file:
for line in file:
- (name, _, otolId) = line.rstrip().partition("|")
+ name, _, otolId = line.rstrip().partition('|')
nameToPickedId[name] = otolId
# Resolve duplicates
-for (dupName, ids) in dupNameToIds.items():
+for dupName, ids in dupNameToIds.items():
# Check for picked id
if dupName in nameToPickedId:
idToUse = nameToPickedId[dupName]
@@ -174,16 +174,16 @@ for (dupName, ids) in dupNameToIds.items():
counter = 2
for id in ids:
if id != idToUse:
- nodeMap[id].name += f" [{counter}]"
+ nodeMap[id].name += f' [{counter}]'
counter += 1
-print("Changing mrca* names")
-def convertMrcaName(id):
+print('Changing mrca* names')
+def convertMrcaName(id: str):
node = nodeMap[id]
name = node.name
childIds = node.childIds
if len(childIds) < 2:
- print(f"WARNING: MRCA node \"{name}\" has less than 2 children")
+ print(f'WARNING: MRCA node \'{name}\' has less than 2 children')
return
# Get 2 children with most tips
childTips = [nodeMap[id].tips for id in childIds]
@@ -195,53 +195,52 @@ def convertMrcaName(id):
childName1 = nodeMap[childId1].name
childName2 = nodeMap[childId2].name
# Check for mrca* child names
- if childName1.startswith("mrca"):
+ if childName1.startswith('mrca'):
childName1 = convertMrcaName(childId1)
- if childName2.startswith("mrca"):
+ if childName2.startswith('mrca'):
childName2 = convertMrcaName(childId2)
# Check for composite names
- match = re.fullmatch(r"\[(.+) \+ (.+)]", childName1)
- if match != None:
+ match = re.fullmatch(r'\[(.+) \+ (.+)]', childName1)
+ if match is not None:
childName1 = match.group(1)
- match = re.fullmatch(r"\[(.+) \+ (.+)]", childName2)
- if match != None:
+ match = re.fullmatch(r'\[(.+) \+ (.+)]', childName2)
+ if match is not None:
childName2 = match.group(1)
# Create composite name
- node.name = f"[{childName1} + {childName2}]"
+ node.name = f'[{childName1} + {childName2}]'
return childName1
-for (id, node) in nodeMap.items():
- if node.name.startswith("mrca"):
+for id, node in nodeMap.items():
+ if node.name.startswith('mrca'):
convertMrcaName(id)
-print("Parsing annotations file")
+print('Parsing annotations file')
# Read file
-data = None
with open(annFile) as file:
data = file.read()
obj = json.loads(data)
-nodeAnnsMap = obj["nodes"]
+nodeAnnsMap = obj['nodes']
# Find relevant annotations
-for (id, node) in nodeMap.items():
+for id, node in nodeMap.items():
# Set has-support value using annotations
if id in nodeAnnsMap:
nodeAnns = nodeAnnsMap[id]
- supportQty = len(nodeAnns["supported_by"]) if "supported_by" in nodeAnns else 0
- conflictQty = len(nodeAnns["conflicts_with"]) if "conflicts_with" in nodeAnns else 0
+ supportQty = len(nodeAnns['supported_by']) if 'supported_by' in nodeAnns else 0
+ conflictQty = len(nodeAnns['conflicts_with']) if 'conflicts_with' in nodeAnns else 0
node.pSupport = supportQty > 0 and conflictQty == 0
-print("Creating nodes and edges tables")
+print('Creating nodes and edges tables')
dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
-dbCur.execute("CREATE TABLE nodes (name TEXT PRIMARY KEY, id TEXT UNIQUE, tips INT)")
-dbCur.execute("CREATE INDEX nodes_idx_nc ON nodes(name COLLATE NOCASE)")
-dbCur.execute("CREATE TABLE edges (parent TEXT, child TEXT, p_support INT, PRIMARY KEY (parent, child))")
-dbCur.execute("CREATE INDEX edges_child_idx ON edges(child)")
-for (otolId, node) in nodeMap.items():
- dbCur.execute("INSERT INTO nodes VALUES (?, ?, ?)", (node.name, otolId, node.tips))
+dbCur.execute('CREATE TABLE nodes (name TEXT PRIMARY KEY, id TEXT UNIQUE, tips INT)')
+dbCur.execute('CREATE INDEX nodes_idx_nc ON nodes(name COLLATE NOCASE)')
+dbCur.execute('CREATE TABLE edges (parent TEXT, child TEXT, p_support INT, PRIMARY KEY (parent, child))')
+dbCur.execute('CREATE INDEX edges_child_idx ON edges(child)')
+for otolId, node in nodeMap.items():
+ dbCur.execute('INSERT INTO nodes VALUES (?, ?, ?)', (node.name, otolId, node.tips))
for childId in node.childIds:
childNode = nodeMap[childId]
- dbCur.execute("INSERT INTO edges VALUES (?, ?, ?)",
+ dbCur.execute('INSERT INTO edges VALUES (?, ?, ?)',
(node.name, childNode.name, 1 if childNode.pSupport else 0))
-print("Closing database")
+print('Closing database')
dbCon.commit()
dbCon.close()