aboutsummaryrefslogtreecommitdiff
path: root/backend/tolData
diff options
context:
space:
mode:
authorTerry Truong <terry06890@gmail.com>2022-09-07 11:37:37 +1000
committerTerry Truong <terry06890@gmail.com>2022-09-07 11:37:37 +1000
commitdaccbbd9c73a5292ea9d6746560d7009e5aa666d (patch)
tree9156bf011ab6302eb3c0d219d40587d594f51841 /backend/tolData
parent1a7fe33edafa68a6f759d124bdeee673ff9cf9ff (diff)
Add python type annotations
Also use consistent quote symbols Also use 'is None' instead of '== None' Also use 'if list1' instead of 'if len(list1) > 0'
Diffstat (limited to 'backend/tolData')
-rwxr-xr-xbackend/tolData/dbpedia/genDescData.py104
-rwxr-xr-xbackend/tolData/enwiki/downloadImgLicenseInfo.py136
-rwxr-xr-xbackend/tolData/enwiki/downloadImgs.py50
-rwxr-xr-xbackend/tolData/enwiki/genDescData.py100
-rwxr-xr-xbackend/tolData/enwiki/genDumpIndexDb.py39
-rwxr-xr-xbackend/tolData/enwiki/genImgData.py118
-rwxr-xr-xbackend/tolData/enwiki/genPageviewData.py10
-rwxr-xr-xbackend/tolData/enwiki/lookupPage.py34
-rwxr-xr-xbackend/tolData/eol/downloadImgs.py76
-rwxr-xr-xbackend/tolData/eol/genImagesListDb.py28
-rwxr-xr-xbackend/tolData/eol/reviewImgs.py92
-rwxr-xr-xbackend/tolData/genDescData.py19
-rwxr-xr-xbackend/tolData/genImgs.py154
-rwxr-xr-xbackend/tolData/genLinkedImgs.py102
-rwxr-xr-xbackend/tolData/genMappingData.py60
-rwxr-xr-xbackend/tolData/genNameData.py28
-rwxr-xr-xbackend/tolData/genOtolData.py133
-rwxr-xr-xbackend/tolData/genPopData.py9
-rwxr-xr-xbackend/tolData/genReducedTrees.py270
-rwxr-xr-xbackend/tolData/reviewImgsToGen.py118
-rwxr-xr-xbackend/tolData/wikidata/genTaxonSrcData.py64
21 files changed, 886 insertions, 858 deletions
diff --git a/backend/tolData/dbpedia/genDescData.py b/backend/tolData/dbpedia/genDescData.py
index 8756a40..43ed815 100755
--- a/backend/tolData/dbpedia/genDescData.py
+++ b/backend/tolData/dbpedia/genDescData.py
@@ -1,6 +1,6 @@
#!/usr/bin/python3
-import sys, re
+import re
import bz2, sqlite3
import argparse
@@ -9,120 +9,120 @@ Adds DBpedia labels/types/abstracts/etc data into a database
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-labelsFile = "labels_lang=en.ttl.bz2" # Had about 16e6 entries
-idsFile = "page_lang=en_ids.ttl.bz2"
-redirectsFile = "redirects_lang=en_transitive.ttl.bz2"
-disambigFile = "disambiguations_lang=en.ttl.bz2"
-typesFile = "instance-types_lang=en_specific.ttl.bz2"
-abstractsFile = "short-abstracts_lang=en.ttl.bz2"
-dbFile = "descData.db"
+labelsFile = 'labels_lang=en.ttl.bz2' # Had about 16e6 entries
+idsFile = 'page_lang=en_ids.ttl.bz2'
+redirectsFile = 'redirects_lang=en_transitive.ttl.bz2'
+disambigFile = 'disambiguations_lang=en.ttl.bz2'
+typesFile = 'instance-types_lang=en_specific.ttl.bz2'
+abstractsFile = 'short-abstracts_lang=en.ttl.bz2'
+dbFile = 'descData.db'
# In testing, this script took a few hours to run, and generated about 10GB
-print("Creating database")
+print('Creating database')
dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
-print("Reading/storing label data")
-dbCur.execute("CREATE TABLE labels (iri TEXT PRIMARY KEY, label TEXT)")
-dbCur.execute("CREATE INDEX labels_idx ON labels(label)")
-dbCur.execute("CREATE INDEX labels_idx_nc ON labels(label COLLATE NOCASE)")
+print('Reading/storing label data')
+dbCur.execute('CREATE TABLE labels (iri TEXT PRIMARY KEY, label TEXT)')
+dbCur.execute('CREATE INDEX labels_idx ON labels(label)')
+dbCur.execute('CREATE INDEX labels_idx_nc ON labels(label COLLATE NOCASE)')
labelLineRegex = re.compile(r'<([^>]+)> <[^>]+> "((?:[^"]|\\")+)"@en \.\n')
lineNum = 0
with bz2.open(labelsFile, mode='rt') as file:
for line in file:
lineNum += 1
if lineNum % 1e5 == 0:
- print(f"At line {lineNum}")
+ print(f'At line {lineNum}')
#
match = labelLineRegex.fullmatch(line)
- if match == None:
- raise Exception(f"ERROR: Line {lineNum} has unexpected format")
- dbCur.execute("INSERT INTO labels VALUES (?, ?)", (match.group(1), match.group(2)))
+ if match is None:
+ raise Exception(f'ERROR: Line {lineNum} has unexpected format')
+ dbCur.execute('INSERT INTO labels VALUES (?, ?)', (match.group(1), match.group(2)))
-print("Reading/storing wiki page ids")
-dbCur.execute("CREATE TABLE ids (iri TEXT PRIMARY KEY, id INT)")
-dbCur.execute("CREATE INDEX ids_idx ON ids(id)")
+print('Reading/storing wiki page ids')
+dbCur.execute('CREATE TABLE ids (iri TEXT PRIMARY KEY, id INT)')
+dbCur.execute('CREATE INDEX ids_idx ON ids(id)')
idLineRegex = re.compile(r'<([^>]+)> <[^>]+> "(\d+)".*\n')
lineNum = 0
with bz2.open(idsFile, mode='rt') as file:
for line in file:
lineNum += 1
if lineNum % 1e5 == 0:
- print(f"At line {lineNum}")
+ print(f'At line {lineNum}')
#
match = idLineRegex.fullmatch(line)
- if match == None:
- raise Exception(f"ERROR: Line {lineNum} has unexpected format")
+ if match is None:
+ raise Exception(f'ERROR: Line {lineNum} has unexpected format')
try:
- dbCur.execute("INSERT INTO ids VALUES (?, ?)", (match.group(1), int(match.group(2))))
+ dbCur.execute('INSERT INTO ids VALUES (?, ?)', (match.group(1), int(match.group(2))))
except sqlite3.IntegrityError as e:
# Accounts for certain lines that have the same IRI
- print(f"WARNING: Failed to add entry with IRI \"{match.group(1)}\": {e}")
+ print(f'WARNING: Failed to add entry with IRI "{match.group(1)}": {e}')
-print("Reading/storing redirection data")
-dbCur.execute("CREATE TABLE redirects (iri TEXT PRIMARY KEY, target TEXT)")
+print('Reading/storing redirection data')
+dbCur.execute('CREATE TABLE redirects (iri TEXT PRIMARY KEY, target TEXT)')
redirLineRegex = re.compile(r'<([^>]+)> <[^>]+> <([^>]+)> \.\n')
lineNum = 0
with bz2.open(redirectsFile, mode='rt') as file:
for line in file:
lineNum += 1
if lineNum % 1e5 == 0:
- print(f"At line {lineNum}")
+ print(f'At line {lineNum}')
#
match = redirLineRegex.fullmatch(line)
- if match == None:
- raise Exception(f"ERROR: Line {lineNum} has unexpected format")
- dbCur.execute("INSERT INTO redirects VALUES (?, ?)", (match.group(1), match.group(2)))
+ if match is None:
+ raise Exception(f'ERROR: Line {lineNum} has unexpected format')
+ dbCur.execute('INSERT INTO redirects VALUES (?, ?)', (match.group(1), match.group(2)))
-print("Reading/storing diambiguation-page data")
-dbCur.execute("CREATE TABLE disambiguations (iri TEXT PRIMARY KEY)")
+print('Reading/storing diambiguation-page data')
+dbCur.execute('CREATE TABLE disambiguations (iri TEXT PRIMARY KEY)')
disambigLineRegex = redirLineRegex
lineNum = 0
with bz2.open(disambigFile, mode='rt') as file:
for line in file:
lineNum += 1
if lineNum % 1e5 == 0:
- print(f"At line {lineNum}")
+ print(f'At line {lineNum}')
#
match = disambigLineRegex.fullmatch(line)
- if match == None:
- raise Exception(f"ERROR: Line {lineNum} has unexpected format")
- dbCur.execute("INSERT OR IGNORE INTO disambiguations VALUES (?)", (match.group(1),))
+ if match is None:
+ raise Exception(f'ERROR: Line {lineNum} has unexpected format')
+ dbCur.execute('INSERT OR IGNORE INTO disambiguations VALUES (?)', (match.group(1),))
-print("Reading/storing instance-type data")
-dbCur.execute("CREATE TABLE types (iri TEXT, type TEXT)")
-dbCur.execute("CREATE INDEX types_iri_idx ON types(iri)")
+print('Reading/storing instance-type data')
+dbCur.execute('CREATE TABLE types (iri TEXT, type TEXT)')
+dbCur.execute('CREATE INDEX types_iri_idx ON types(iri)')
typeLineRegex = redirLineRegex
lineNum = 0
with bz2.open(typesFile, mode='rt') as file:
for line in file:
lineNum += 1
if lineNum % 1e5 == 0:
- print(f"At line {lineNum}")
+ print(f'At line {lineNum}')
#
match = typeLineRegex.fullmatch(line)
- if match == None:
- raise Exception(f"ERROR: Line {lineNum} has unexpected format")
- dbCur.execute("INSERT INTO types VALUES (?, ?)", (match.group(1), match.group(2)))
+ if match is None:
+ raise Exception(f'ERROR: Line {lineNum} has unexpected format')
+ dbCur.execute('INSERT INTO types VALUES (?, ?)', (match.group(1), match.group(2)))
-print("Reading/storing abstracts")
-dbCur.execute("CREATE TABLE abstracts (iri TEXT PRIMARY KEY, abstract TEXT)")
+print('Reading/storing abstracts')
+dbCur.execute('CREATE TABLE abstracts (iri TEXT PRIMARY KEY, abstract TEXT)')
descLineRegex = labelLineRegex
lineNum = 0
with bz2.open(abstractsFile, mode='rt') as file:
for line in file:
lineNum += 1
if lineNum % 1e5 == 0:
- print(f"At line {lineNum}")
+ print(f'At line {lineNum}')
#
- if line[0] == "#":
+ if line[0] == '#':
continue
match = descLineRegex.fullmatch(line)
- if match == None:
- raise Exception(f"ERROR: Line {lineNum} has unexpected format")
- dbCur.execute("INSERT INTO abstracts VALUES (?, ?)",
+ if match is None:
+ raise Exception(f'ERROR: Line {lineNum} has unexpected format')
+ dbCur.execute('INSERT INTO abstracts VALUES (?, ?)',
(match.group(1), match.group(2).replace(r'\"', '"')))
-print("Closing database")
+print('Closing database')
dbCon.commit()
dbCon.close()
diff --git a/backend/tolData/enwiki/downloadImgLicenseInfo.py b/backend/tolData/enwiki/downloadImgLicenseInfo.py
index dd39d54..ba6317e 100755
--- a/backend/tolData/enwiki/downloadImgLicenseInfo.py
+++ b/backend/tolData/enwiki/downloadImgLicenseInfo.py
@@ -1,6 +1,6 @@
#!/usr/bin/python3
-import sys, re
+import re
import sqlite3, urllib.parse, html
import requests
import time, signal
@@ -16,33 +16,33 @@ at already-processed names to decide what to skip.
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-imgDb = "imgData.db"
-apiUrl = "https://en.wikipedia.org/w/api.php"
-userAgent = "terryt.dev (terry06890@gmail.com)"
+imgDb = 'imgData.db'
+apiUrl = 'https://en.wikipedia.org/w/api.php'
+userAgent = 'terryt.dev (terry06890@gmail.com)'
batchSz = 50 # Max 50
-tagRegex = re.compile(r"<[^<]+>")
-whitespaceRegex = re.compile(r"\s+")
+tagRegex = re.compile(r'<[^<]+>')
+whitespaceRegex = re.compile(r'\s+')
-print("Opening database")
+print('Opening database')
dbCon = sqlite3.connect(imgDb)
dbCur = dbCon.cursor()
dbCur2 = dbCon.cursor()
-print("Checking for table")
-if dbCur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='imgs'").fetchone() == None:
- dbCur.execute("CREATE TABLE imgs(" \
- "name TEXT PRIMARY KEY, license TEXT, artist TEXT, credit TEXT, restrictions TEXT, url TEXT)")
+print('Checking for table')
+if dbCur.execute('SELECT name FROM sqlite_master WHERE type="table" AND name="imgs"').fetchone() is None:
+ dbCur.execute('CREATE TABLE imgs(' \
+ 'name TEXT PRIMARY KEY, license TEXT, artist TEXT, credit TEXT, restrictions TEXT, url TEXT)')
-print("Reading image names")
-imgNames = set()
-for (imgName,) in dbCur.execute("SELECT DISTINCT img_name FROM page_imgs WHERE img_name NOT NULL"):
+print('Reading image names')
+imgNames: set[str] = set()
+for (imgName,) in dbCur.execute('SELECT DISTINCT img_name FROM page_imgs WHERE img_name NOT NULL'):
imgNames.add(imgName)
-print(f"Found {len(imgNames)}")
+print(f'Found {len(imgNames)}')
-print("Checking for already-processed images")
+print('Checking for already-processed images')
oldSz = len(imgNames)
-for (imgName,) in dbCur.execute("SELECT name FROM imgs"):
+for (imgName,) in dbCur.execute('SELECT name FROM imgs'):
imgNames.discard(imgName)
-print(f"Found {oldSz - len(imgNames)}")
+print(f'Found {oldSz - len(imgNames)}')
# Set SIGINT handler
interrupted = False
@@ -53,95 +53,95 @@ def onSigint(sig, frame):
signal.signal(signal.SIGINT, oldHandler)
oldHandler = signal.signal(signal.SIGINT, onSigint)
-print("Iterating through image names")
-imgNames = list(imgNames)
+print('Iterating through image names')
+imgNameList = list(imgNames)
iterNum = 0
-for i in range(0, len(imgNames), batchSz):
+for i in range(0, len(imgNameList), batchSz):
iterNum += 1
if iterNum % 1 == 0:
- print(f"At iteration {iterNum} (after {(iterNum - 1) * batchSz} images)")
+ print(f'At iteration {iterNum} (after {(iterNum - 1) * batchSz} images)')
if interrupted:
- print(f"Exiting loop at iteration {iterNum}")
+ print(f'Exiting loop at iteration {iterNum}')
break
# Get batch
- imgBatch = imgNames[i:i+batchSz]
- imgBatch = ["File:" + x for x in imgBatch]
+ imgBatch = imgNameList[i:i+batchSz]
+ imgBatch = ['File:' + x for x in imgBatch]
# Make request
headers = {
- "user-agent": userAgent,
- "accept-encoding": "gzip",
+ 'user-agent': userAgent,
+ 'accept-encoding': 'gzip',
}
params = {
- "action": "query",
- "format": "json",
- "prop": "imageinfo",
- "iiprop": "extmetadata|url",
- "maxlag": "5",
- "titles": "|".join(imgBatch),
- "iiextmetadatafilter": "Artist|Credit|LicenseShortName|Restrictions",
+ 'action': 'query',
+ 'format': 'json',
+ 'prop': 'imageinfo',
+ 'iiprop': 'extmetadata|url',
+ 'maxlag': '5',
+ 'titles': '|'.join(imgBatch),
+ 'iiextmetadatafilter': 'Artist|Credit|LicenseShortName|Restrictions',
}
responseObj = None
try:
response = requests.get(apiUrl, params=params, headers=headers)
responseObj = response.json()
except Exception as e:
- print(f"ERROR: Exception while downloading info: {e}")
- print(f"\tImage batch: " + "|".join(imgBatch))
+ print(f'ERROR: Exception while downloading info: {e}')
+ print('\tImage batch: ' + '|'.join(imgBatch))
continue
# Parse response-object
- if "query" not in responseObj or "pages" not in responseObj["query"]:
- print("WARNING: Response object for doesn't have page data")
- print("\tImage batch: " + "|".join(imgBatch))
- if "error" in responseObj:
- errorCode = responseObj["error"]["code"]
- print(f"\tError code: {errorCode}")
- if errorCode == "maxlag":
+ if 'query' not in responseObj or 'pages' not in responseObj['query']:
+ print('WARNING: Response object for doesn\'t have page data')
+ print('\tImage batch: ' + '|'.join(imgBatch))
+ if 'error' in responseObj:
+ errorCode = responseObj['error']['code']
+ print(f'\tError code: {errorCode}')
+ if errorCode == 'maxlag':
time.sleep(5)
continue
- pages = responseObj["query"]["pages"]
- normalisedToInput = {}
- if "normalized" in responseObj["query"]:
- for entry in responseObj["query"]["normalized"]:
- normalisedToInput[entry["to"]] = entry["from"]
- for (_, page) in pages.items():
+ pages = responseObj['query']['pages']
+ normalisedToInput: dict[str, str] = {}
+ if 'normalized' in responseObj['query']:
+ for entry in responseObj['query']['normalized']:
+ normalisedToInput[entry['to']] = entry['from']
+ for _, page in pages.items():
# Some fields // More info at https://www.mediawiki.org/wiki/Extension:CommonsMetadata#Returned_data
# LicenseShortName: short human-readable license name, apparently more reliable than 'License',
# Artist: author name (might contain complex html, multiple authors, etc)
# Credit: 'source'
# For image-map-like images, can be quite large/complex html, creditng each sub-image
- # May be <a href="text1">text2</a>, where the text2 might be non-indicative
+ # May be <a href='text1'>text2</a>, where the text2 might be non-indicative
# Restrictions: specifies non-copyright legal restrictions
- title = page["title"]
+ title: str = page['title']
if title in normalisedToInput:
title = normalisedToInput[title]
title = title[5:] # Remove 'File:'
if title not in imgNames:
- print(f"WARNING: Got title \"{title}\" not in image-name list")
+ print(f'WARNING: Got title "{title}" not in image-name list')
continue
- if "imageinfo" not in page:
- print(f"WARNING: No imageinfo section for page \"{title}\"")
+ if 'imageinfo' not in page:
+ print(f'WARNING: No imageinfo section for page "{title}"')
continue
- metadata = page["imageinfo"][0]["extmetadata"]
- url = page["imageinfo"][0]["url"]
- license = metadata['LicenseShortName']['value'] if 'LicenseShortName' in metadata else None
- artist = metadata['Artist']['value'] if 'Artist' in metadata else None
- credit = metadata['Credit']['value'] if 'Credit' in metadata else None
- restrictions = metadata['Restrictions']['value'] if 'Restrictions' in metadata else None
+ metadata = page['imageinfo'][0]['extmetadata']
+ url: str = page['imageinfo'][0]['url']
+ license: str | None = metadata['LicenseShortName']['value'] if 'LicenseShortName' in metadata else None
+ artist: str | None = metadata['Artist']['value'] if 'Artist' in metadata else None
+ credit: str | None = metadata['Credit']['value'] if 'Credit' in metadata else None
+ restrictions: str | None = metadata['Restrictions']['value'] if 'Restrictions' in metadata else None
# Remove markup
- if artist != None:
- artist = tagRegex.sub(" ", artist)
- artist = whitespaceRegex.sub(" ", artist)
+ if artist is not None:
+ artist = tagRegex.sub(' ', artist)
+ artist = whitespaceRegex.sub(' ', artist)
artist = html.unescape(artist)
artist = urllib.parse.unquote(artist)
- if credit != None:
- credit = tagRegex.sub(" ", credit)
- credit = whitespaceRegex.sub(" ", credit)
+ if credit is not None:
+ credit = tagRegex.sub(' ', credit)
+ credit = whitespaceRegex.sub(' ', credit)
credit = html.unescape(credit)
credit = urllib.parse.unquote(credit)
# Add to db
- dbCur2.execute("INSERT INTO imgs VALUES (?, ?, ?, ?, ?, ?)",
+ dbCur2.execute('INSERT INTO imgs VALUES (?, ?, ?, ?, ?, ?)',
(title, license, artist, credit, restrictions, url))
-print("Closing database")
+print('Closing database')
dbCon.commit()
dbCon.close()
diff --git a/backend/tolData/enwiki/downloadImgs.py b/backend/tolData/enwiki/downloadImgs.py
index 520677f..def4714 100755
--- a/backend/tolData/enwiki/downloadImgs.py
+++ b/backend/tolData/enwiki/downloadImgs.py
@@ -16,20 +16,20 @@ in the output directory do decide what to skip.
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-imgDb = "imgData.db" # About 130k image names
-outDir = "imgs"
-licenseRegex = re.compile(r"cc0|cc([ -]by)?([ -]sa)?([ -][1234]\.[05])?( \w\w\w?)?", flags=re.IGNORECASE)
+imgDb = 'imgData.db' # About 130k image names
+outDir = 'imgs'
+licenseRegex = re.compile(r'cc0|cc([ -]by)?([ -]sa)?([ -][1234]\.[05])?( \w\w\w?)?', flags=re.IGNORECASE)
# In testing, this downloaded about 100k images, over several days
if not os.path.exists(outDir):
os.mkdir(outDir)
-print("Checking for already-downloaded images")
+print('Checking for already-downloaded images')
fileList = os.listdir(outDir)
-pageIdsDone = set()
+pageIdsDone: set[int] = set()
for filename in fileList:
- (basename, extension) = os.path.splitext(filename)
+ basename, extension = os.path.splitext(filename)
pageIdsDone.add(int(basename))
-print(f"Found {len(pageIdsDone)}")
+print(f'Found {len(pageIdsDone)}')
# Set SIGINT handler
interrupted = False
@@ -40,49 +40,49 @@ def onSigint(sig, frame):
signal.signal(signal.SIGINT, oldHandler)
oldHandler = signal.signal(signal.SIGINT, onSigint)
-print("Opening database")
+print('Opening database')
dbCon = sqlite3.connect(imgDb)
dbCur = dbCon.cursor()
-print("Starting downloads")
+print('Starting downloads')
iterNum = 0
-query = "SELECT page_id, license, artist, credit, restrictions, url FROM" \
- " imgs INNER JOIN page_imgs ON imgs.name = page_imgs.img_name"
-for (pageId, license, artist, credit, restrictions, url) in dbCur.execute(query):
+query = 'SELECT page_id, license, artist, credit, restrictions, url FROM' \
+ ' imgs INNER JOIN page_imgs ON imgs.name = page_imgs.img_name'
+for pageId, license, artist, credit, restrictions, url in dbCur.execute(query):
if pageId in pageIdsDone:
continue
if interrupted:
- print(f"Exiting loop")
+ print('Exiting loop')
break
# Check for problematic attributes
- if license == None or licenseRegex.fullmatch(license) == None:
+ if license is None or licenseRegex.fullmatch(license) is None:
continue
- if artist == None or artist == "" or len(artist) > 100 or re.match(r"(\d\. )?File:", artist) != None:
+ if artist is None or artist == '' or len(artist) > 100 or re.match(r'(\d\. )?File:', artist) is not None:
continue
- if credit == None or len(credit) > 300 or re.match(r"File:", credit) != None:
+ if credit is None or len(credit) > 300 or re.match(r'File:', credit) is not None:
continue
- if restrictions != None and restrictions != "":
+ if restrictions is not None and restrictions != '':
continue
# Download image
iterNum += 1
- print(f"Iteration {iterNum}: Downloading for page-id {pageId}")
+ print(f'Iteration {iterNum}: Downloading for page-id {pageId}')
urlParts = urllib.parse.urlparse(url)
extension = os.path.splitext(urlParts.path)[1]
if len(extension) <= 1:
- print(f"WARNING: No filename extension found in URL {url}")
+ print(f'WARNING: No filename extension found in URL {url}')
sys.exit(1)
- outFile = f"{outDir}/{pageId}{extension}"
+ outFile = f'{outDir}/{pageId}{extension}'
headers = {
- "user-agent": "terryt.dev (terry06890@gmail.com)",
- "accept-encoding": "gzip",
+ 'user-agent': 'terryt.dev (terry06890@gmail.com)',
+ 'accept-encoding': 'gzip',
}
try:
response = requests.get(url, headers=headers)
with open(outFile, 'wb') as file:
file.write(response.content)
time.sleep(1)
- # https://en.wikipedia.org/wiki/Wikipedia:Database_download says to "throttle self to 1 cache miss per sec"
+ # https://en.wikipedia.org/wiki/Wikipedia:Database_download says to 'throttle self to 1 cache miss per sec'
# It's unclear how to properly check for cache misses, so this just aims for 1 per sec
except Exception as e:
- print(f"Error while downloading to {outFile}: {e}")
-print("Closing database")
+ print(f'Error while downloading to {outFile}: {e}')
+print('Closing database')
dbCon.close()
diff --git a/backend/tolData/enwiki/genDescData.py b/backend/tolData/enwiki/genDescData.py
index 0085d70..1698f5c 100755
--- a/backend/tolData/enwiki/genDescData.py
+++ b/backend/tolData/enwiki/genDescData.py
@@ -12,46 +12,46 @@ and add them to a database
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-dumpFile = "enwiki-20220501-pages-articles-multistream.xml.bz2" # Had about 22e6 pages
-enwikiDb = "descData.db"
+dumpFile = 'enwiki-20220501-pages-articles-multistream.xml.bz2' # Had about 22e6 pages
+enwikiDb = 'descData.db'
# In testing, this script took over 10 hours to run, and generated about 5GB
-descLineRegex = re.compile("^ *[A-Z'\"]")
-embeddedHtmlRegex = re.compile(r"<[^<]+/>|<!--[^<]+-->|<[^</]+>([^<]*|[^<]*<[^<]+>[^<]*)</[^<]+>|<[^<]+$")
+descLineRegex = re.compile('^ *[A-Z\'"]')
+embeddedHtmlRegex = re.compile(r'<[^<]+/>|<!--[^<]+-->|<[^</]+>([^<]*|[^<]*<[^<]+>[^<]*)</[^<]+>|<[^<]+$')
# Recognises a self-closing HTML tag, a tag with 0 children, tag with 1 child with 0 children, or unclosed tag
-convertTemplateRegex = re.compile(r"{{convert\|(\d[^|]*)\|(?:(to|-)\|(\d[^|]*)\|)?([a-z][^|}]*)[^}]*}}")
+convertTemplateRegex = re.compile(r'{{convert\|(\d[^|]*)\|(?:(to|-)\|(\d[^|]*)\|)?([a-z][^|}]*)[^}]*}}')
def convertTemplateReplace(match):
- if match.group(2) == None:
- return f"{match.group(1)} {match.group(4)}"
+ if match.group(2) is None:
+ return f'{match.group(1)} {match.group(4)}'
else:
- return f"{match.group(1)} {match.group(2)} {match.group(3)} {match.group(4)}"
-parensGroupRegex = re.compile(r" \([^()]*\)")
-leftoverBraceRegex = re.compile(r"(?:{\||{{).*")
+ return f'{match.group(1)} {match.group(2)} {match.group(3)} {match.group(4)}'
+parensGroupRegex = re.compile(r' \([^()]*\)')
+leftoverBraceRegex = re.compile(r'(?:{\||{{).*')
-def parseDesc(text):
+def parseDesc(text: str) -> str | None:
# Find first matching line outside {{...}}, [[...]], and block-html-comment constructs,
# and then accumulate lines until a blank one.
# Some cases not accounted for include: disambiguation pages, abstracts with sentences split-across-lines,
# nested embedded html, 'content significant' embedded-html, markup not removable with mwparsefromhell,
- lines = []
+ lines: list[str] = []
openBraceCount = 0
openBracketCount = 0
inComment = False
skip = False
for line in text.splitlines():
line = line.strip()
- if len(lines) == 0:
- if len(line) > 0:
- if openBraceCount > 0 or line[0] == "{":
- openBraceCount += line.count("{")
- openBraceCount -= line.count("}")
+ if not lines:
+ if line:
+ if openBraceCount > 0 or line[0] == '{':
+ openBraceCount += line.count('{')
+ openBraceCount -= line.count('}')
skip = True
- if openBracketCount > 0 or line[0] == "[":
- openBracketCount += line.count("[")
- openBracketCount -= line.count("]")
+ if openBracketCount > 0 or line[0] == '[':
+ openBracketCount += line.count('[')
+ openBracketCount -= line.count(']')
skip = True
- if inComment or line.find("<!--") != -1:
- if line.find("-->") != -1:
+ if inComment or line.find('<!--') != -1:
+ if line.find('-->') != -1:
if inComment:
inComment = False
skip = True
@@ -61,64 +61,64 @@ def parseDesc(text):
if skip:
skip = False
continue
- if line[-1] == ":": # Seems to help avoid disambiguation pages
+ if line[-1] == ':': # Seems to help avoid disambiguation pages
return None
- if descLineRegex.match(line) != None:
+ if descLineRegex.match(line) is not None:
lines.append(line)
else:
- if len(line) == 0:
- return removeMarkup(" ".join(lines))
+ if not line:
+ return removeMarkup(' '.join(lines))
lines.append(line)
- if len(lines) > 0:
- return removeMarkup(" ".join(lines))
+ if lines:
+ return removeMarkup(' '.join(lines))
return None
-def removeMarkup(content):
- content = embeddedHtmlRegex.sub("", content)
+def removeMarkup(content: str) -> str:
+ content = embeddedHtmlRegex.sub('', content)
content = convertTemplateRegex.sub(convertTemplateReplace, content)
content = mwparserfromhell.parse(content).strip_code() # Remove wikitext markup
- content = parensGroupRegex.sub("", content)
- content = leftoverBraceRegex.sub("", content)
+ content = parensGroupRegex.sub('', content)
+ content = leftoverBraceRegex.sub('', content)
return content
-def convertTitle(title):
- return html.unescape(title).replace("_", " ")
+def convertTitle(title: str) -> str:
+ return html.unescape(title).replace('_', ' ')
-print("Creating database")
+print('Creating database')
if os.path.exists(enwikiDb):
- raise Exception(f"ERROR: Existing {enwikiDb}")
+ raise Exception(f'ERROR: Existing {enwikiDb}')
dbCon = sqlite3.connect(enwikiDb)
dbCur = dbCon.cursor()
-dbCur.execute("CREATE TABLE pages (id INT PRIMARY KEY, title TEXT UNIQUE)")
-dbCur.execute("CREATE INDEX pages_title_idx ON pages(title COLLATE NOCASE)")
-dbCur.execute("CREATE TABLE redirects (id INT PRIMARY KEY, target TEXT)")
-dbCur.execute("CREATE INDEX redirects_idx ON redirects(target)")
-dbCur.execute("CREATE TABLE descs (id INT PRIMARY KEY, desc TEXT)")
+dbCur.execute('CREATE TABLE pages (id INT PRIMARY KEY, title TEXT UNIQUE)')
+dbCur.execute('CREATE INDEX pages_title_idx ON pages(title COLLATE NOCASE)')
+dbCur.execute('CREATE TABLE redirects (id INT PRIMARY KEY, target TEXT)')
+dbCur.execute('CREATE INDEX redirects_idx ON redirects(target)')
+dbCur.execute('CREATE TABLE descs (id INT PRIMARY KEY, desc TEXT)')
-print("Iterating through dump file")
+print('Iterating through dump file')
with bz2.open(dumpFile, mode='rt') as file:
dump = mwxml.Dump.from_file(file)
pageNum = 0
for page in dump:
pageNum += 1
if pageNum % 1e4 == 0:
- print(f"At page {pageNum}")
+ print(f'At page {pageNum}')
if pageNum > 3e4:
break
# Parse page
if page.namespace == 0:
try:
- dbCur.execute("INSERT INTO pages VALUES (?, ?)", (page.id, convertTitle(page.title)))
+ dbCur.execute('INSERT INTO pages VALUES (?, ?)', (page.id, convertTitle(page.title)))
except sqlite3.IntegrityError as e:
# Accounts for certain pages that have the same title
- print(f"Failed to add page with title \"{page.title}\": {e}", file=sys.stderr)
+ print(f'Failed to add page with title "{page.title}": {e}', file=sys.stderr)
continue
- if page.redirect != None:
- dbCur.execute("INSERT INTO redirects VALUES (?, ?)", (page.id, convertTitle(page.redirect)))
+ if page.redirect is not None:
+ dbCur.execute('INSERT INTO redirects VALUES (?, ?)', (page.id, convertTitle(page.redirect)))
else:
revision = next(page)
desc = parseDesc(revision.text)
- if desc != None:
- dbCur.execute("INSERT INTO descs VALUES (?, ?)", (page.id, desc))
+ if desc is not None:
+ dbCur.execute('INSERT INTO descs VALUES (?, ?)', (page.id, desc))
-print("Closing database")
+print('Closing database')
dbCon.commit()
dbCon.close()
diff --git a/backend/tolData/enwiki/genDumpIndexDb.py b/backend/tolData/enwiki/genDumpIndexDb.py
index 1bffb27..3bd129f 100755
--- a/backend/tolData/enwiki/genDumpIndexDb.py
+++ b/backend/tolData/enwiki/genDumpIndexDb.py
@@ -10,46 +10,47 @@ Adds data from the wiki dump index-file into a database
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-indexFile = "enwiki-20220501-pages-articles-multistream-index.txt.bz2" # Had about 22e6 lines
-indexDb = "dumpIndex.db"
+indexFile = 'enwiki-20220501-pages-articles-multistream-index.txt.bz2' # Had about 22e6 lines
+indexDb = 'dumpIndex.db'
if os.path.exists(indexDb):
- raise Exception(f"ERROR: Existing {indexDb}")
-print("Creating database")
+ raise Exception(f'ERROR: Existing {indexDb}')
+print('Creating database')
dbCon = sqlite3.connect(indexDb)
dbCur = dbCon.cursor()
-dbCur.execute("CREATE TABLE offsets (title TEXT PRIMARY KEY, id INT UNIQUE, offset INT, next_offset INT)")
+dbCur.execute('CREATE TABLE offsets (title TEXT PRIMARY KEY, id INT UNIQUE, offset INT, next_offset INT)')
-print("Iterating through index file")
-lineRegex = re.compile(r"([^:]+):([^:]+):(.*)")
+print('Iterating through index file')
+lineRegex = re.compile(r'([^:]+):([^:]+):(.*)')
lastOffset = 0
lineNum = 0
-entriesToAdd = []
+entriesToAdd: list[tuple[str, str]] = []
with bz2.open(indexFile, mode='rt') as file:
for line in file:
lineNum += 1
if lineNum % 1e5 == 0:
- print(f"At line {lineNum}")
+ print(f'At line {lineNum}')
#
match = lineRegex.fullmatch(line.rstrip())
- (offset, pageId, title) = match.group(1,2,3)
- offset = int(offset)
+ assert match is not None
+ offsetStr, pageId, title = match.group(1,2,3)
+ offset = int(offsetStr)
if offset > lastOffset:
- for (t, p) in entriesToAdd:
+ for t, p in entriesToAdd:
try:
- dbCur.execute("INSERT INTO offsets VALUES (?, ?, ?, ?)", (t, p, lastOffset, offset))
+ dbCur.execute('INSERT INTO offsets VALUES (?, ?, ?, ?)', (t, int(p), lastOffset, offset))
except sqlite3.IntegrityError as e:
# Accounts for certain entries in the file that have the same title
- print(f"Failed on title \"{t}\": {e}", file=sys.stderr)
+ print(f'Failed on title "{t}": {e}', file=sys.stderr)
entriesToAdd = []
lastOffset = offset
- entriesToAdd.append([title, pageId])
-for (title, pageId) in entriesToAdd:
+ entriesToAdd.append((title, pageId))
+for title, pageId in entriesToAdd:
try:
- dbCur.execute("INSERT INTO offsets VALUES (?, ?, ?, ?)", (title, pageId, lastOffset, -1))
+ dbCur.execute('INSERT INTO offsets VALUES (?, ?, ?, ?)', (title, int(pageId), lastOffset, -1))
except sqlite3.IntegrityError as e:
- print(f"Failed on title \"{t}\": {e}", file=sys.stderr)
+ print(f'Failed on title "{t}": {e}', file=sys.stderr)
-print("Closing database")
+print('Closing database')
dbCon.commit()
dbCon.close()
diff --git a/backend/tolData/enwiki/genImgData.py b/backend/tolData/enwiki/genImgData.py
index b5d546d..00140f6 100755
--- a/backend/tolData/enwiki/genImgData.py
+++ b/backend/tolData/enwiki/genImgData.py
@@ -1,6 +1,6 @@
#!/usr/bin/python3
-import sys, re
+import re
import bz2, html, urllib.parse
import sqlite3
@@ -15,117 +15,117 @@ will skip already-processed page IDs.
parser.parse_args()
def getInputPageIds():
- pageIds = set()
- dbCon = sqlite3.connect("../data.db")
+ pageIds: set[int] = set()
+ dbCon = sqlite3.connect('../data.db')
dbCur = dbCon.cursor()
- for (pageId,) in dbCur.execute("SELECT id from wiki_ids"):
+ for (pageId,) in dbCur.execute('SELECT id from wiki_ids'):
pageIds.add(pageId)
dbCon.close()
return pageIds
-dumpFile = "enwiki-20220501-pages-articles-multistream.xml.bz2"
-indexDb = "dumpIndex.db"
-imgDb = "imgData.db" # The database to create
-idLineRegex = re.compile(r"<id>(.*)</id>")
-imageLineRegex = re.compile(r".*\| *image *= *([^|]*)")
-bracketImageRegex = re.compile(r"\[\[(File:[^|]*).*]]")
-imageNameRegex = re.compile(r".*\.(jpg|jpeg|png|gif|tiff|tif)", flags=re.IGNORECASE)
-cssImgCropRegex = re.compile(r"{{css image crop\|image *= *(.*)", flags=re.IGNORECASE)
+dumpFile = 'enwiki-20220501-pages-articles-multistream.xml.bz2'
+indexDb = 'dumpIndex.db'
+imgDb = 'imgData.db' # The database to create
+idLineRegex = re.compile(r'<id>(.*)</id>')
+imageLineRegex = re.compile(r'.*\| *image *= *([^|]*)')
+bracketImageRegex = re.compile(r'\[\[(File:[^|]*).*]]')
+imageNameRegex = re.compile(r'.*\.(jpg|jpeg|png|gif|tiff|tif)', flags=re.IGNORECASE)
+cssImgCropRegex = re.compile(r'{{css image crop\|image *= *(.*)', flags=re.IGNORECASE)
-print("Getting input page-ids")
+print('Getting input page-ids')
pageIds = getInputPageIds()
-print(f"Found {len(pageIds)}")
+print(f'Found {len(pageIds)}')
-print("Opening databases")
+print('Opening databases')
indexDbCon = sqlite3.connect(indexDb)
indexDbCur = indexDbCon.cursor()
imgDbCon = sqlite3.connect(imgDb)
imgDbCur = imgDbCon.cursor()
-print("Checking tables")
-if imgDbCur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='page_imgs'").fetchone() == None:
+print('Checking tables')
+if imgDbCur.execute('SELECT name FROM sqlite_master WHERE type="table" AND name="page_imgs"').fetchone() is None:
# Create tables if not present
- imgDbCur.execute("CREATE TABLE page_imgs (page_id INT PRIMARY KEY, img_name TEXT)") # img_name may be NULL
- imgDbCur.execute("CREATE INDEX page_imgs_idx ON page_imgs(img_name)")
+ imgDbCur.execute('CREATE TABLE page_imgs (page_id INT PRIMARY KEY, img_name TEXT)') # img_name may be NULL
+ imgDbCur.execute('CREATE INDEX page_imgs_idx ON page_imgs(img_name)')
else:
# Check for already-processed page IDs
numSkipped = 0
- for (pid,) in imgDbCur.execute("SELECT page_id FROM page_imgs"):
+ for (pid,) in imgDbCur.execute('SELECT page_id FROM page_imgs'):
if pid in pageIds:
pageIds.remove(pid)
numSkipped += 1
else:
- print(f"WARNING: Found already-processed page ID {pid} which was not in input set")
- print(f"Will skip {numSkipped} already-processed page IDs")
+ print(f'WARNING: Found already-processed page ID {pid} which was not in input set')
+ print(f'Will skip {numSkipped} already-processed page IDs')
-print("Getting dump-file offsets")
-offsetToPageids = {}
-offsetToEnd = {} # Maps chunk-start offsets to their chunk-end offsets
+print('Getting dump-file offsets')
+offsetToPageids: dict[int, list[int]] = {}
+offsetToEnd: dict[int, int] = {} # Maps chunk-start offsets to their chunk-end offsets
iterNum = 0
for pageId in pageIds:
iterNum += 1
if iterNum % 1e4 == 0:
- print(f"At iteration {iterNum}")
+ print(f'At iteration {iterNum}')
#
- query = "SELECT offset, next_offset FROM offsets WHERE id = ?"
- row = indexDbCur.execute(query, (pageId,)).fetchone()
- if row == None:
- print(f"WARNING: Page ID {pageId} not found")
+ query = 'SELECT offset, next_offset FROM offsets WHERE id = ?'
+ row: tuple[int, int] | None = indexDbCur.execute(query, (pageId,)).fetchone()
+ if row is None:
+ print(f'WARNING: Page ID {pageId} not found')
continue
- (chunkOffset, endOffset) = row
+ chunkOffset, endOffset = row
offsetToEnd[chunkOffset] = endOffset
if chunkOffset not in offsetToPageids:
offsetToPageids[chunkOffset] = []
offsetToPageids[chunkOffset].append(pageId)
-print(f"Found {len(offsetToEnd)} chunks to check")
+print(f'Found {len(offsetToEnd)} chunks to check')
-print("Iterating through chunks in dump file")
-def getImageName(content):
- " Given an array of text-content lines, tries to return an infoxbox image name, or None "
+print('Iterating through chunks in dump file')
+def getImageName(content: list[str]) -> str | None:
+ """ Given an array of text-content lines, tries to return an infoxbox image name, or None """
# Doesn't try and find images in outside-infobox [[File:...]] and <imagemap> sections
for line in content:
match = imageLineRegex.match(line)
- if match != None:
+ if match is not None:
imageName = match.group(1).strip()
- if imageName == "":
+ if imageName == '':
return None
imageName = html.unescape(imageName)
# Account for {{...
- if imageName.startswith("{"):
+ if imageName.startswith('{'):
match = cssImgCropRegex.match(imageName)
- if match == None:
+ if match is None:
return None
imageName = match.group(1)
# Account for [[File:...|...]]
- if imageName.startswith("["):
+ if imageName.startswith('['):
match = bracketImageRegex.match(imageName)
- if match == None:
+ if match is None:
return None
imageName = match.group(1)
# Account for <!--
- if imageName.find("<!--") != -1:
+ if imageName.find('<!--') != -1:
return None
# Remove an initial 'File:'
- if imageName.startswith("File:"):
+ if imageName.startswith('File:'):
imageName = imageName[5:]
# Remove an initial 'Image:'
- if imageName.startswith("Image:"):
+ if imageName.startswith('Image:'):
imageName = imageName[6:]
# Check for extension
match = imageNameRegex.match(imageName)
- if match != None:
+ if match is not None:
imageName = match.group(0)
imageName = urllib.parse.unquote(imageName)
imageName = html.unescape(imageName) # Intentionally unescaping again (handles some odd cases)
- imageName = imageName.replace("_", " ")
+ imageName = imageName.replace('_', ' ')
return imageName
# Exclude lines like: | image = &lt;imagemap&gt;
return None
return None
with open(dumpFile, mode='rb') as file:
iterNum = 0
- for (pageOffset, endOffset) in offsetToEnd.items():
+ for pageOffset, endOffset in offsetToEnd.items():
iterNum += 1
if iterNum % 100 == 0:
- print(f"At iteration {iterNum}")
+ print(f'At iteration {iterNum}')
#
pageIds = offsetToPageids[pageOffset]
# Jump to chunk
@@ -137,14 +137,14 @@ with open(dumpFile, mode='rb') as file:
lineIdx = 0
while lineIdx < len(lines):
# Look for <page>
- if lines[lineIdx].lstrip() != "<page>":
+ if lines[lineIdx].lstrip() != '<page>':
lineIdx += 1
continue
# Check page id
lineIdx += 3
idLine = lines[lineIdx].lstrip()
match = idLineRegex.fullmatch(idLine)
- if match == None or int(match.group(1)) not in pageIds:
+ if match is None or int(match.group(1)) not in pageIds:
lineIdx += 1
continue
pageId = int(match.group(1))
@@ -152,35 +152,35 @@ with open(dumpFile, mode='rb') as file:
# Look for <text> in <page>
foundText = False
while lineIdx < len(lines):
- if not lines[lineIdx].lstrip().startswith("<text "):
+ if not lines[lineIdx].lstrip().startswith('<text '):
lineIdx += 1
continue
foundText = True
# Get text content
- content = []
+ content: list[str] = []
line = lines[lineIdx]
- content.append(line[line.find(">") + 1:])
+ content.append(line[line.find('>') + 1:])
lineIdx += 1
foundTextEnd = False
while lineIdx < len(lines):
line = lines[lineIdx]
- if not line.endswith("</text>"):
+ if not line.endswith('</text>'):
content.append(line)
lineIdx += 1
continue
foundTextEnd = True
- content.append(line[:line.rfind("</text>")])
+ content.append(line[:line.rfind('</text>')])
# Look for image-filename
imageName = getImageName(content)
- imgDbCur.execute("INSERT into page_imgs VALUES (?, ?)", (pageId, imageName))
+ imgDbCur.execute('INSERT into page_imgs VALUES (?, ?)', (pageId, imageName))
break
if not foundTextEnd:
- print(f"WARNING: Did not find </text> for page id {pageId}")
+ print(f'WARNING: Did not find </text> for page id {pageId}')
break
if not foundText:
- print(f"WARNING: Did not find <text> for page id {pageId}")
+ print(f'WARNING: Did not find <text> for page id {pageId}')
-print("Closing databases")
+print('Closing databases')
indexDbCon.close()
imgDbCon.commit()
imgDbCon.close()
diff --git a/backend/tolData/enwiki/genPageviewData.py b/backend/tolData/enwiki/genPageviewData.py
index f0901b2..6a5d79c 100755
--- a/backend/tolData/enwiki/genPageviewData.py
+++ b/backend/tolData/enwiki/genPageviewData.py
@@ -5,10 +5,10 @@ from collections import defaultdict
import bz2, sqlite3
import argparse
-parser = argparse.ArgumentParser(description='''
+parser = argparse.ArgumentParser(description="""
Reads through wikimedia files containing pageview counts,
computes average counts, and adds them to a database
-''', formatter_class=argparse.RawDescriptionHelpFormatter)
+""", formatter_class=argparse.RawDescriptionHelpFormatter)
args = parser.parse_args()
pageviewFiles = glob.glob('./pageviews/pageviews-*-user.bz2')
@@ -26,7 +26,7 @@ if os.path.exists(dbFile):
# platform (eg: mobile-web), monthly view count,
# hourly count string (eg: A1B2 means 1 view on day 1 and 2 views on day 2)
namespaceRegex = re.compile(r'[a-zA-Z]+:')
-titleToViews = defaultdict(int)
+titleToViews: dict[str, int] = defaultdict(int)
linePrefix = b'en.wikipedia '
for filename in pageviewFiles:
print(f'Reading from {filename}')
@@ -40,7 +40,7 @@ for filename in pageviewFiles:
line = line[len(linePrefix):line.rfind(b' ')] # Remove first and last fields
title = line[:line.find(b' ')].decode('utf-8')
viewCount = int(line[line.rfind(b' ')+1:])
- if namespaceRegex.match(title) != None:
+ if namespaceRegex.match(title) is not None:
continue
# Update map
titleToViews[title] += viewCount
@@ -54,7 +54,7 @@ idbCur = idbCon.cursor()
dbCur.execute('CREATE TABLE views (title TEXT PRIMARY KEY, id INT, views INT)')
for title, views in titleToViews.items():
row = idbCur.execute('SELECT id FROM offsets WHERE title = ?', (title,)).fetchone()
- if row != None:
+ if row is not None:
wikiId = int(row[0])
dbCur.execute('INSERT INTO views VALUES (?, ?, ?)', (title, wikiId, math.floor(views / len(pageviewFiles))))
dbCon.commit()
diff --git a/backend/tolData/enwiki/lookupPage.py b/backend/tolData/enwiki/lookupPage.py
index e7b95f0..427aa7a 100755
--- a/backend/tolData/enwiki/lookupPage.py
+++ b/backend/tolData/enwiki/lookupPage.py
@@ -1,6 +1,6 @@
#!/usr/bin/python3
-import sys, re
+import sys
import bz2
import sqlite3
@@ -12,24 +12,24 @@ db, and prints the corresponding <page>.
parser.add_argument("title", help="The title to look up")
args = parser.parse_args()
-dumpFile = "enwiki-20220501-pages-articles-multistream.xml.bz2"
-indexDb = "dumpIndex.db"
-pageTitle = args.title.replace("_", " ")
+dumpFile = 'enwiki-20220501-pages-articles-multistream.xml.bz2'
+indexDb = 'dumpIndex.db'
+pageTitle = args.title.replace('_', ' ')
-print("Looking up offset in index db")
+print('Looking up offset in index db')
dbCon = sqlite3.connect(indexDb)
dbCur = dbCon.cursor()
-query = "SELECT title, offset, next_offset FROM offsets WHERE title = ?"
+query = 'SELECT title, offset, next_offset FROM offsets WHERE title = ?'
row = dbCur.execute(query, (pageTitle,)).fetchone()
-if row == None:
- print("Title not found")
+if row is None:
+ print('Title not found')
sys.exit(0)
_, pageOffset, endOffset = row
dbCon.close()
-print(f"Found chunk at offset {pageOffset}")
+print(f'Found chunk at offset {pageOffset}')
-print("Reading from wiki dump")
-content = []
+print('Reading from wiki dump')
+content: list[str] = []
with open(dumpFile, mode='rb') as file:
# Get uncompressed chunk
file.seek(pageOffset)
@@ -42,25 +42,25 @@ with open(dumpFile, mode='rb') as file:
pageNum = 0
while not found:
line = lines[lineIdx]
- if line.lstrip() == "<page>":
+ if line.lstrip() == '<page>':
pageNum += 1
if pageNum > 100:
- print("ERROR: Did not find title after 100 pages")
+ print('ERROR: Did not find title after 100 pages')
break
lineIdx += 1
titleLine = lines[lineIdx]
if titleLine.lstrip() == '<title>' + pageTitle + '</title>':
found = True
- print(f"Found title in chunk as page {pageNum}")
+ print(f'Found title in chunk as page {pageNum}')
content.append(line)
content.append(titleLine)
while True:
lineIdx += 1
line = lines[lineIdx]
content.append(line)
- if line.lstrip() == "</page>":
+ if line.lstrip() == '</page>':
break
lineIdx += 1
-print("Content: ")
-print("\n".join(content))
+print('Content: ')
+print('\n'.join(content))
diff --git a/backend/tolData/eol/downloadImgs.py b/backend/tolData/eol/downloadImgs.py
index 4d658e7..5213aaf 100755
--- a/backend/tolData/eol/downloadImgs.py
+++ b/backend/tolData/eol/downloadImgs.py
@@ -22,53 +22,53 @@ highest EOL ID.
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-imagesListDb = "imagesList.db"
-def getInputEolIds():
- eolIds = set()
- dbCon = sqlite3.connect("../data.db")
+imagesListDb = 'imagesList.db'
+def getInputEolIds() -> set[int]:
+ eolIds: set[int] = set()
+ dbCon = sqlite3.connect('../data.db')
dbCur = dbCon.cursor()
- for (id,) in dbCur.execute("SELECT id FROM eol_ids"):
+ for (id,) in dbCur.execute('SELECT id FROM eol_ids'):
eolIds.add(id)
dbCon.close()
return eolIds
-outDir = "imgsForReview/"
+outDir = 'imgsForReview/'
MAX_IMGS_PER_ID = 3
MAX_THREADS = 5
POST_DL_DELAY_MIN = 2 # Minimum delay in seconds to pause after download before starting another (for each thread)
POST_DL_DELAY_MAX = 3
-LICENSE_REGEX = r"cc-by((-nc)?(-sa)?(-[234]\.[05])?)|cc-publicdomain|cc-0-1\.0|public domain"
+LICENSE_REGEX = r'cc-by((-nc)?(-sa)?(-[234]\.[05])?)|cc-publicdomain|cc-0-1\.0|public domain'
-print("Getting input EOL IDs")
+print('Getting input EOL IDs')
eolIds = getInputEolIds()
-print("Getting EOL IDs to download for")
+print('Getting EOL IDs to download for')
# Get IDs from images-list db
imgDbCon = sqlite3.connect(imagesListDb)
imgCur = imgDbCon.cursor()
-imgListIds = set()
-for (pageId,) in imgCur.execute("SELECT DISTINCT page_id FROM images"):
+imgListIds: set[int] = set()
+for (pageId,) in imgCur.execute('SELECT DISTINCT page_id FROM images'):
imgListIds.add(pageId)
# Get set intersection, and sort into list
eolIds = eolIds.intersection(imgListIds)
-eolIds = sorted(eolIds)
-print(f"Result: {len(eolIds)} EOL IDs")
+eolIdList = sorted(eolIds)
+print(f'Result: {len(eolIdList)} EOL IDs')
-print("Checking output directory")
+print('Checking output directory')
if not os.path.exists(outDir):
os.mkdir(outDir)
-print("Finding next ID to download for")
+print('Finding next ID to download for')
nextIdx = 0
fileList = os.listdir(outDir)
-ids = [int(filename.split(" ")[0]) for filename in fileList]
-if len(ids) > 0:
+ids = [int(filename.split(' ')[0]) for filename in fileList]
+if ids:
ids.sort()
- nextIdx = eolIds.index(ids[-1]) + 1
-if nextIdx == len(eolIds):
- print("No IDs left. Exiting...")
+ nextIdx = eolIdList.index(ids[-1]) + 1
+if nextIdx == len(eolIdList):
+ print('No IDs left. Exiting...')
sys.exit(0)
-print("Starting download threads")
+print('Starting download threads')
numThreads = 0
-threadException = None # Used for ending main thread after a non-main thread exception
+threadException: Exception | None = None # Used for ending main thread after a non-main thread exception
# Handle SIGINT signals
interrupted = False
oldHandler = None
@@ -86,29 +86,29 @@ def downloadImg(url, outFile):
file.write(data.content)
time.sleep(random.random() * (POST_DL_DELAY_MAX - POST_DL_DELAY_MIN) + POST_DL_DELAY_MIN)
except Exception as e:
- print(f"Error while downloading to {outFile}: {str(e)}", file=sys.stderr)
+ print(f'Error while downloading to {outFile}: {str(e)}', file=sys.stderr)
threadException = e
numThreads -= 1
# Manage downloading
-for idx in range(nextIdx, len(eolIds)):
- eolId = eolIds[idx]
+for idx in range(nextIdx, len(eolIdList)):
+ eolId = eolIdList[idx]
# Get image urls
- ownerSet = set() # Used to get images from different owners, for variety
+ ownerSet: set[str] = set() # Used to get images from different owners, for variety
exitLoop = False
- query = "SELECT content_id, copy_url, license, copyright_owner FROM images WHERE page_id = ?"
- for (contentId, url, license, copyrightOwner) in imgCur.execute(query, (eolId,)):
- if url.startswith("data/"):
- url = "https://content.eol.org/" + url
+ query = 'SELECT content_id, copy_url, license, copyright_owner FROM images WHERE page_id = ?'
+ for contentId, url, license, copyrightOwner in imgCur.execute(query, (eolId,)):
+ if url.startswith('data/'):
+ url = 'https://content.eol.org/' + url
urlParts = urllib.parse.urlparse(url)
extension = os.path.splitext(urlParts.path)[1]
if len(extension) <= 1:
- print(f"WARNING: No filename extension found in URL {url}", file=sys.stderr)
+ print(f'WARNING: No filename extension found in URL {url}', file=sys.stderr)
continue
# Check image-quantity limit
if len(ownerSet) == MAX_IMGS_PER_ID:
break
# Check for skip conditions
- if re.fullmatch(LICENSE_REGEX, license) == None:
+ if re.fullmatch(LICENSE_REGEX, license) is None:
continue
if len(copyrightOwner) > 100: # Avoid certain copyrightOwner fields that seem long and problematic
continue
@@ -116,27 +116,27 @@ for idx in range(nextIdx, len(eolIds)):
continue
ownerSet.add(copyrightOwner)
# Determine output filename
- outPath = f"{outDir}{eolId} {contentId}{extension}"
+ outPath = f'{outDir}{eolId} {contentId}{extension}'
if os.path.exists(outPath):
- print(f"WARNING: {outPath} already exists. Skipping download.")
+ print(f'WARNING: {outPath} already exists. Skipping download.')
continue
# Check thread limit
while numThreads == MAX_THREADS:
time.sleep(1)
# Wait for threads after an interrupt or thread-exception
- if interrupted or threadException != None:
- print("Waiting for existing threads to end")
+ if interrupted or threadException is not None:
+ print('Waiting for existing threads to end')
while numThreads > 0:
time.sleep(1)
exitLoop = True
break
# Perform download
- print(f"Downloading image to {outPath}")
+ print(f'Downloading image to {outPath}')
numThreads += 1
thread = Thread(target=downloadImg, args=(url, outPath), daemon=True)
thread.start()
if exitLoop:
break
# Close images-list db
-print("Finished downloading")
+print('Finished downloading')
imgDbCon.close()
diff --git a/backend/tolData/eol/genImagesListDb.py b/backend/tolData/eol/genImagesListDb.py
index 4dcb6d9..808292d 100755
--- a/backend/tolData/eol/genImagesListDb.py
+++ b/backend/tolData/eol/genImagesListDb.py
@@ -1,6 +1,6 @@
#!/usr/bin/python3
-import sys, os, re
+import os, re
import csv
import sqlite3
@@ -10,25 +10,25 @@ Generates a sqlite db from a directory of CSV files holding EOL image data
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-imagesListDir = "imagesList/"
-dbFile = "imagesList.db"
+imagesListDir = 'imagesList/'
+dbFile = 'imagesList.db'
-print("Creating database")
+print('Creating database')
dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
-dbCur.execute("CREATE TABLE images" \
- " (content_id INT PRIMARY KEY, page_id INT, source_url TEXT, copy_url TEXT, license TEXT, copyright_owner TEXT)")
-dbCur.execute("CREATE INDEX images_pid_idx ON images(page_id)")
-print("Reading CSV files")
+dbCur.execute('CREATE TABLE images' \
+ ' (content_id INT PRIMARY KEY, page_id INT, source_url TEXT, copy_url TEXT, license TEXT, copyright_owner TEXT)')
+dbCur.execute('CREATE INDEX images_pid_idx ON images(page_id)')
+print('Reading CSV files')
csvFilenames = os.listdir(imagesListDir)
for filename in csvFilenames:
- print(f"Processing {imagesListDir}{filename}")
- with open(imagesListDir + filename, newline="") as file:
- for (contentId, pageId, sourceUrl, copyUrl, license, owner) in csv.reader(file):
- if re.match(r"^[a-zA-Z]", contentId): # Skip header line
+ print(f'Processing {imagesListDir}{filename}')
+ with open(imagesListDir + filename, newline='') as file:
+ for contentId, pageId, sourceUrl, copyUrl, license, owner in csv.reader(file):
+ if re.match(r'^[a-zA-Z]', contentId): # Skip header line
continue
- dbCur.execute("INSERT INTO images VALUES (?, ?, ?, ?, ?, ?)",
+ dbCur.execute('INSERT INTO images VALUES (?, ?, ?, ?, ?, ?)',
(int(contentId), int(pageId), sourceUrl, copyUrl, license, owner))
-print("Closing database")
+print('Closing database')
dbCon.commit()
dbCon.close()
diff --git a/backend/tolData/eol/reviewImgs.py b/backend/tolData/eol/reviewImgs.py
index 979ed0e..e44fb3d 100755
--- a/backend/tolData/eol/reviewImgs.py
+++ b/backend/tolData/eol/reviewImgs.py
@@ -16,42 +16,42 @@ Chosen images are placed in another directory, and rejected ones are deleted.
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-imgDir = "imgsForReview/"
-outDir = "imgs/"
-extraInfoDbCon = sqlite3.connect("../data.db")
+imgDir = 'imgsForReview/'
+outDir = 'imgs/'
+extraInfoDbCon = sqlite3.connect('../data.db')
extraInfoDbCur = extraInfoDbCon.cursor()
-def getExtraInfo(eolId):
+def getExtraInfo(eolId: int) -> str:
global extraInfoDbCur
- query = "SELECT names.alt_name FROM" \
- " names INNER JOIN eol_ids ON eol_ids.name = names.name" \
- " WHERE id = ? and pref_alt = 1"
+ query = 'SELECT names.alt_name FROM' \
+ ' names INNER JOIN eol_ids ON eol_ids.name = names.name' \
+ ' WHERE id = ? and pref_alt = 1'
row = extraInfoDbCur.execute(query, (eolId,)).fetchone()
- if row != None:
- return f"Reviewing EOL ID {eolId}, aka \"{row[0]}\""
+ if row is not None:
+ return f'Reviewing EOL ID {eolId}, aka "{row[0]}"'
else:
- return f"Reviewing EOL ID {eolId}"
+ return f'Reviewing EOL ID {eolId}'
IMG_DISPLAY_SZ = 400
MAX_IMGS_PER_ID = 3
IMG_BG_COLOR = (88, 28, 135)
-PLACEHOLDER_IMG = Image.new("RGB", (IMG_DISPLAY_SZ, IMG_DISPLAY_SZ), IMG_BG_COLOR)
+PLACEHOLDER_IMG = Image.new('RGB', (IMG_DISPLAY_SZ, IMG_DISPLAY_SZ), IMG_BG_COLOR)
-print("Checking output directory")
+print('Checking output directory')
if not os.path.exists(outDir):
os.mkdir(outDir)
-print("Getting input image list")
+print('Getting input image list')
imgList = os.listdir(imgDir)
-imgList.sort(key=lambda s: int(s.split(" ")[0]))
-if len(imgList) == 0:
- print("No input images found")
+imgList.sort(key=lambda s: int(s.split(' ')[0]))
+if not imgList:
+ print('No input images found')
sys.exit(0)
class EolImgReviewer:
- " Provides the GUI for reviewing images "
+ """ Provides the GUI for reviewing images """
def __init__(self, root, imgList):
self.root = root
- root.title("EOL Image Reviewer")
+ root.title('EOL Image Reviewer')
# Setup main frame
- mainFrame = ttk.Frame(root, padding="5 5 5 5")
+ mainFrame = ttk.Frame(root, padding='5 5 5 5')
mainFrame.grid(column=0, row=0, sticky=(tki.N, tki.W, tki.E, tki.S))
root.columnconfigure(0, weight=1)
root.rowconfigure(0, weight=1)
@@ -59,7 +59,7 @@ class EolImgReviewer:
self.imgs = [PLACEHOLDER_IMG] * MAX_IMGS_PER_ID # Stored as fields for use in rotation
self.photoImgs = list(map(lambda img: ImageTk.PhotoImage(img), self.imgs)) # Image objects usable by tkinter
# These need a persistent reference for some reason (doesn't display otherwise)
- self.labels = []
+ self.labels: list[ttk.Label] = []
for i in range(MAX_IMGS_PER_ID):
frame = ttk.Frame(mainFrame, width=IMG_DISPLAY_SZ, height=IMG_DISPLAY_SZ)
frame.grid(column=i, row=0)
@@ -70,29 +70,29 @@ class EolImgReviewer:
for child in mainFrame.winfo_children():
child.grid_configure(padx=5, pady=5)
# Add keyboard bindings
- root.bind("<q>", self.quit)
- root.bind("<Key-j>", lambda evt: self.accept(0))
- root.bind("<Key-k>", lambda evt: self.accept(1))
- root.bind("<Key-l>", lambda evt: self.accept(2))
- root.bind("<Key-i>", lambda evt: self.reject())
- root.bind("<Key-a>", lambda evt: self.rotate(0))
- root.bind("<Key-s>", lambda evt: self.rotate(1))
- root.bind("<Key-d>", lambda evt: self.rotate(2))
- root.bind("<Key-A>", lambda evt: self.rotate(0, True))
- root.bind("<Key-S>", lambda evt: self.rotate(1, True))
- root.bind("<Key-D>", lambda evt: self.rotate(2, True))
+ root.bind('<q>', self.quit)
+ root.bind('<Key-j>', lambda evt: self.accept(0))
+ root.bind('<Key-k>', lambda evt: self.accept(1))
+ root.bind('<Key-l>', lambda evt: self.accept(2))
+ root.bind('<Key-i>', lambda evt: self.reject())
+ root.bind('<Key-a>', lambda evt: self.rotate(0))
+ root.bind('<Key-s>', lambda evt: self.rotate(1))
+ root.bind('<Key-d>', lambda evt: self.rotate(2))
+ root.bind('<Key-A>', lambda evt: self.rotate(0, True))
+ root.bind('<Key-S>', lambda evt: self.rotate(1, True))
+ root.bind('<Key-D>', lambda evt: self.rotate(2, True))
# Initialise images to review
self.imgList = imgList
self.imgListIdx = 0
self.nextEolId = 0
- self.nextImgNames = []
- self.rotations = []
+ self.nextImgNames: list[str] = []
+ self.rotations: list[int] = []
self.getNextImgs()
# For displaying extra info
self.numReviewed = 0
self.startTime = time.time()
def getNextImgs(self):
- " Updates display with new images to review, or ends program "
+ """ Updates display with new images to review, or ends program """
# Gather names of next images to review
for i in range(MAX_IMGS_PER_ID):
if self.imgListIdx == len(self.imgList):
@@ -101,7 +101,7 @@ class EolImgReviewer:
return
break
imgName = self.imgList[self.imgListIdx]
- eolId = int(re.match(r"(\d+) (\d+)", imgName).group(1))
+ eolId = int(re.match(r'(\d+) (\d+)', imgName).group(1))
if i == 0:
self.nextEolId = eolId
self.nextImgNames = [imgName]
@@ -131,19 +131,19 @@ class EolImgReviewer:
self.labels[idx].config(image=self.photoImgs[idx])
idx += 1
# Restart if all image files non-recognisable
- if len(self.nextImgNames) == 0:
+ if not self.nextImgNames:
self.getNextImgs()
return
# Update title
firstImgIdx = self.imgListIdx - len(self.nextImgNames) + 1
lastImgIdx = self.imgListIdx
title = getExtraInfo(self.nextEolId)
- title += f" (imgs {firstImgIdx} to {lastImgIdx} out of {len(self.imgList)})"
+ title += f' (imgs {firstImgIdx} to {lastImgIdx} out of {len(self.imgList)})'
self.root.title(title)
def accept(self, imgIdx):
- " React to a user selecting an image "
+ """ React to a user selecting an image """
if imgIdx >= len(self.nextImgNames):
- print("Invalid selection")
+ print('Invalid selection')
return
for i in range(len(self.nextImgNames)):
inFile = imgDir + self.nextImgNames[i]
@@ -160,13 +160,13 @@ class EolImgReviewer:
self.numReviewed += 1
self.getNextImgs()
def reject(self):
- " React to a user rejecting all images of a set "
+ """ React to a user rejecting all images of a set """
for i in range(len(self.nextImgNames)):
os.remove(imgDir + self.nextImgNames[i])
self.numReviewed += 1
self.getNextImgs()
def rotate(self, imgIdx, anticlockwise = False):
- " Respond to a user rotating an image "
+ """ Respond to a user rotating an image """
deg = -90 if not anticlockwise else 90
self.imgs[imgIdx] = self.imgs[imgIdx].rotate(deg)
self.photoImgs[imgIdx] = ImageTk.PhotoImage(self.imgs[imgIdx])
@@ -174,15 +174,15 @@ class EolImgReviewer:
self.rotations[imgIdx] = (self.rotations[imgIdx] + deg) % 360
def quit(self, e = None):
global extraInfoDbCon
- print(f"Number reviewed: {self.numReviewed}")
+ print(f'Number reviewed: {self.numReviewed}')
timeElapsed = time.time() - self.startTime
- print(f"Time elapsed: {timeElapsed:.2f} seconds")
+ print(f'Time elapsed: {timeElapsed:.2f} seconds')
if self.numReviewed > 0:
- print(f"Avg time per review: {timeElapsed/self.numReviewed:.2f} seconds")
+ print(f'Avg time per review: {timeElapsed/self.numReviewed:.2f} seconds')
extraInfoDbCon.close()
self.root.destroy()
def resizeImgForDisplay(self, img):
- " Returns a copy of an image, shrunk to fit in it's frame (keeps aspect ratio), and with a background "
+ """ Returns a copy of an image, shrunk to fit in it's frame (keeps aspect ratio), and with a background """
if max(img.width, img.height) > IMG_DISPLAY_SZ:
if (img.width > img.height):
newHeight = int(img.height * IMG_DISPLAY_SZ/img.width)
@@ -196,7 +196,7 @@ class EolImgReviewer:
int((IMG_DISPLAY_SZ - img.height) / 2)))
return bgImg
# Create GUI and defer control
-print("Starting GUI")
+print('Starting GUI')
root = tki.Tk()
EolImgReviewer(root, imgList)
root.mainloop()
diff --git a/backend/tolData/genDescData.py b/backend/tolData/genDescData.py
index 28971f4..bb1cbc8 100755
--- a/backend/tolData/genDescData.py
+++ b/backend/tolData/genDescData.py
@@ -1,13 +1,12 @@
#!/usr/bin/python3
-import sys, os, re
import sqlite3
import argparse
-parser = argparse.ArgumentParser(description='''
+parser = argparse.ArgumentParser(description="""
Maps nodes to short descriptions, using data from DBpedia and
Wikipedia, and stores results in the database.
-''', formatter_class=argparse.RawDescriptionHelpFormatter)
+""", formatter_class=argparse.RawDescriptionHelpFormatter)
args = parser.parse_args()
dbpediaDb = 'dbpedia/descData.db'
@@ -20,7 +19,7 @@ dbCur = dbCon.cursor()
dbCur.execute('CREATE TABLE descs (wiki_id INT PRIMARY KEY, desc TEXT, from_dbp INT)')
print('Getting node mappings')
-nodeToWikiId = {}
+nodeToWikiId: dict[str, int] = {}
for name, wikiId in dbCur.execute('SELECT name, id from wiki_ids'):
nodeToWikiId[name] = wikiId
@@ -28,7 +27,7 @@ print('Reading data from DBpedia')
dbpCon = sqlite3.connect(dbpediaDb)
dbpCur = dbpCon.cursor()
print('Getting node IRIs')
-nodeToIri = {}
+nodeToIri: dict[str, str] = {}
iterNum = 0
for name, wikiId in nodeToWikiId.items():
iterNum += 1
@@ -36,7 +35,7 @@ for name, wikiId in nodeToWikiId.items():
print(f'At iteration {iterNum}')
#
row = dbpCur.execute('SELECT iri FROM ids where id = ?', (wikiId,)).fetchone()
- if row != None:
+ if row is not None:
nodeToIri[name] = row[0]
print('Resolving redirects')
iterNum = 0
@@ -46,7 +45,7 @@ for name, iri in nodeToIri.items():
print(f'At iteration {iterNum}')
#
row = dbpCur.execute('SELECT target FROM redirects where iri = ?', (iri,)).fetchone()
- if row != None:
+ if row is not None:
nodeToIri[name] = row[0]
print('Adding descriptions')
iterNum = 0
@@ -56,7 +55,7 @@ for name, iri in nodeToIri.items():
print(f'At iteration {iterNum}')
#
row = dbpCur.execute('SELECT abstract FROM abstracts WHERE iri = ?', (iri,)).fetchone()
- if row != None:
+ if row is not None:
dbCur.execute('INSERT OR IGNORE INTO descs VALUES (?, ?, ?)', (nodeToWikiId[name], row[0], 1))
del nodeToWikiId[name]
dbpCon.close()
@@ -73,7 +72,7 @@ for name, wikiId in nodeToWikiId.items():
#
query = 'SELECT pages.id FROM redirects INNER JOIN pages ON redirects.target = pages.title WHERE redirects.id = ?'
row = enwikiCur.execute(query, (wikiId,)).fetchone()
- if row != None:
+ if row is not None:
nodeToWikiId[name] = row[0]
print('Adding descriptions')
iterNum = 0
@@ -83,7 +82,7 @@ for name, wikiId in nodeToWikiId.items():
print(f'At iteration {iterNum}')
#
row = enwikiCur.execute('SELECT desc FROM descs where id = ?', (wikiId,)).fetchone()
- if row != None:
+ if row is not None:
dbCur.execute('INSERT OR IGNORE INTO descs VALUES (?, ?, ?)', (wikiId, row[0], 0))
print('Closing databases')
diff --git a/backend/tolData/genImgs.py b/backend/tolData/genImgs.py
index 930990b..6f72b49 100755
--- a/backend/tolData/genImgs.py
+++ b/backend/tolData/genImgs.py
@@ -17,57 +17,65 @@ to skip.
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-imgListFile = "imgList.txt"
-outDir = "img/"
-eolImgDb = "eol/imagesList.db"
-enwikiImgDb = "enwiki/imgData.db"
-pickedImgsDir = "pickedImgs/"
-pickedImgsFilename = "imgData.txt"
-dbFile = "data.db"
+imgListFile = 'imgList.txt'
+outDir = 'img/'
+eolImgDb = 'eol/imagesList.db'
+enwikiImgDb = 'enwiki/imgData.db'
+pickedImgsDir = 'pickedImgs/'
+pickedImgsFilename = 'imgData.txt'
+dbFile = 'data.db'
IMG_OUT_SZ = 200
genImgFiles = True # Usable for debugging
+class PickedImg:
+ """ Represents a picked-image from pickedImgsDir """
+ def __init__(self, nodeName: str, id: int, filename: str, url: str, license: str, artist: str, credit: str):
+ self.nodeName = nodeName
+ self.id = id
+ self.filename = filename
+ self.url = url
+ self.license = license
+ self.artist = artist
+ self.credit = credit
+
if not os.path.exists(outDir):
os.mkdir(outDir)
-print("Opening databases")
+print('Opening databases')
dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
eolCon = sqlite3.connect(eolImgDb)
eolCur = eolCon.cursor()
enwikiCon = sqlite3.connect(enwikiImgDb)
enwikiCur = enwikiCon.cursor()
-print("Checking for picked-images")
-nodeToPickedImg = {}
+print('Checking for picked-images')
+nodeToPickedImg: dict[str, PickedImg] = {}
if os.path.exists(pickedImgsDir + pickedImgsFilename):
lineNum = 0
with open(pickedImgsDir + pickedImgsFilename) as file:
for line in file:
lineNum += 1
- (filename, url, license, artist, credit) = line.rstrip().split("|")
+ filename, url, license, artist, credit = line.rstrip().split('|')
nodeName = os.path.splitext(filename)[0] # Remove extension
- (otolId,) = dbCur.execute("SELECT id FROM nodes WHERE name = ?", (nodeName,)).fetchone()
- nodeToPickedImg[otolId] = {
- "nodeName": nodeName, "id": lineNum,
- "filename": filename, "url": url, "license": license, "artist": artist, "credit": credit,
- }
+ (otolId,) = dbCur.execute('SELECT id FROM nodes WHERE name = ?', (nodeName,)).fetchone()
+ nodeToPickedImg[otolId] = PickedImg(nodeName, lineNum, filename, url, license, artist, credit)
-print("Checking for image tables")
-nodesDone = set()
-imgsDone = set()
-if dbCur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='node_imgs'").fetchone() == None:
+print('Checking for image tables')
+nodesDone: set[str] = set()
+imgsDone: set[tuple[int, str]] = set()
+if dbCur.execute('SELECT name FROM sqlite_master WHERE type="table" AND name="node_imgs"').fetchone() is None:
# Add image tables if not present
- dbCur.execute("CREATE TABLE node_imgs (name TEXT PRIMARY KEY, img_id INT, src TEXT)")
- dbCur.execute("CREATE TABLE images" \
- " (id INT, src TEXT, url TEXT, license TEXT, artist TEXT, credit TEXT, PRIMARY KEY (id, src))")
+ dbCur.execute('CREATE TABLE node_imgs (name TEXT PRIMARY KEY, img_id INT, src TEXT)')
+ dbCur.execute('CREATE TABLE images' \
+ ' (id INT, src TEXT, url TEXT, license TEXT, artist TEXT, credit TEXT, PRIMARY KEY (id, src))')
else:
# Get existing image-associated nodes
- for (otolId,) in dbCur.execute("SELECT nodes.id FROM node_imgs INNER JOIN nodes ON node_imgs.name = nodes.name"):
+ for (otolId,) in dbCur.execute('SELECT nodes.id FROM node_imgs INNER JOIN nodes ON node_imgs.name = nodes.name'):
nodesDone.add(otolId)
# Get existing node-associated images
- for (imgId, imgSrc) in dbCur.execute("SELECT id, src from images"):
+ for imgId, imgSrc in dbCur.execute('SELECT id, src from images'):
imgsDone.add((imgId, imgSrc))
- print(f"Found {len(nodesDone)} nodes and {len(imgsDone)} images to skip")
+ print(f'Found {len(nodesDone)} nodes and {len(imgsDone)} images to skip')
# Set SIGINT handler
interrupted = False
@@ -76,18 +84,18 @@ def onSigint(sig, frame):
interrupted = True
signal.signal(signal.SIGINT, onSigint)
-print("Iterating through input images")
+print('Iterating through input images')
def quit():
- print("Closing databases")
+ print('Closing databases')
dbCon.commit()
dbCon.close()
eolCon.close()
enwikiCon.close()
sys.exit(0)
def convertImage(imgPath, outPath):
- print(f"Converting {imgPath} to {outPath}")
+ print(f'Converting {imgPath} to {outPath}')
if os.path.exists(outPath):
- print(f"ERROR: Output image already exists")
+ print('ERROR: Output image already exists')
return False
try:
completedProcess = subprocess.run(
@@ -95,94 +103,94 @@ def convertImage(imgPath, outPath):
stdout=subprocess.DEVNULL
)
except Exception as e:
- print(f"ERROR: Exception while attempting to run smartcrop: {e}")
+ print(f'ERROR: Exception while attempting to run smartcrop: {e}')
return False
if completedProcess.returncode != 0:
- print(f"ERROR: smartcrop had exit status {completedProcess.returncode}")
+ print(f'ERROR: smartcrop had exit status {completedProcess.returncode}')
return False
return True
-print("Processing picked-images")
-for (otolId, imgData) in nodeToPickedImg.items():
+print('Processing picked-images')
+for otolId, imgData in nodeToPickedImg.items():
# Check for SIGINT event
if interrupted:
- print("Exiting")
+ print('Exiting')
quit()
# Skip if already processed
if otolId in nodesDone:
continue
# Convert image
if genImgFiles:
- success = convertImage(pickedImgsDir + imgData["filename"], outDir + otolId + ".jpg")
+ success = convertImage(pickedImgsDir + imgData.filename, outDir + otolId + '.jpg')
if not success:
quit()
else:
- print(f"Processing {imgData['nodeName']}: {otolId}.jpg")
+ print(f'Processing {imgData.nodeName}: {otolId}.jpg')
# Add entry to db
- if (imgData["id"], "picked") not in imgsDone:
- dbCur.execute("INSERT INTO images VALUES (?, ?, ?, ?, ?, ?)",
- (imgData["id"], "picked", imgData["url"], imgData["license"], imgData["artist"], imgData["credit"]))
- imgsDone.add((imgData["id"], "picked"))
- dbCur.execute("INSERT INTO node_imgs VALUES (?, ?, ?)", (imgData["nodeName"], imgData["id"], "picked"))
+ if (imgData.id, 'picked') not in imgsDone:
+ dbCur.execute('INSERT INTO images VALUES (?, ?, ?, ?, ?, ?)',
+ (imgData.id, 'picked', imgData.url, imgData.license, imgData.artist, imgData.credit))
+ imgsDone.add((imgData.id, 'picked'))
+ dbCur.execute('INSERT INTO node_imgs VALUES (?, ?, ?)', (imgData.nodeName, imgData.id, 'picked'))
nodesDone.add(otolId)
-print("Processing images from eol and enwiki")
+print('Processing images from eol and enwiki')
iterNum = 0
with open(imgListFile) as file:
for line in file:
iterNum += 1
# Check for SIGINT event
if interrupted:
- print("Exiting")
+ print('Exiting')
break
# Skip lines without an image path
- if line.find(" ") == -1:
+ if line.find(' ') == -1:
continue
# Get filenames
- (otolId, _, imgPath) = line.rstrip().partition(" ")
+ otolId, _, imgPath = line.rstrip().partition(' ')
# Skip if already processed
if otolId in nodesDone:
continue
# Convert image
if genImgFiles:
- success = convertImage(imgPath, outDir + otolId + ".jpg")
+ success = convertImage(imgPath, outDir + otolId + '.jpg')
if not success:
break
else:
if iterNum % 1e4 == 0:
- print(f"At iteration {iterNum}")
+ print(f'At iteration {iterNum}')
# Add entry to db
- (nodeName,) = dbCur.execute("SELECT name FROM nodes WHERE id = ?", (otolId,)).fetchone()
- fromEol = imgPath.startswith("eol/")
+ (nodeName,) = dbCur.execute('SELECT name FROM nodes WHERE id = ?', (otolId,)).fetchone()
+ fromEol = imgPath.startswith('eol/')
imgName = os.path.basename(os.path.normpath(imgPath)) # Get last path component
imgName = os.path.splitext(imgName)[0] # Remove extension
if fromEol:
- eolId, _, contentId = imgName.partition(" ")
- eolId, contentId = (int(eolId), int(contentId))
- if (eolId, "eol") not in imgsDone:
- query = "SELECT source_url, license, copyright_owner FROM images WHERE content_id = ?"
+ eolIdStr, _, contentIdStr = imgName.partition(' ')
+ eolId, contentId = (int(eolIdStr), int(contentIdStr))
+ if (eolId, 'eol') not in imgsDone:
+ query = 'SELECT source_url, license, copyright_owner FROM images WHERE content_id = ?'
row = eolCur.execute(query, (contentId,)).fetchone()
- if row == None:
- print(f"ERROR: No image record for EOL ID {eolId}, content ID {contentId}")
+ if row is None:
+ print(f'ERROR: No image record for EOL ID {eolId}, content ID {contentId}')
break
- (url, license, owner) = row
- dbCur.execute("INSERT INTO images VALUES (?, ?, ?, ?, ?, ?)",
- (eolId, "eol", url, license, owner, ""))
- imgsDone.add((eolId, "eol"))
- dbCur.execute("INSERT INTO node_imgs VALUES (?, ?, ?)", (nodeName, eolId, "eol"))
+ url, license, owner = row
+ dbCur.execute('INSERT INTO images VALUES (?, ?, ?, ?, ?, ?)',
+ (eolId, 'eol', url, license, owner, ''))
+ imgsDone.add((eolId, 'eol'))
+ dbCur.execute('INSERT INTO node_imgs VALUES (?, ?, ?)', (nodeName, eolId, 'eol'))
else:
enwikiId = int(imgName)
- if (enwikiId, "enwiki") not in imgsDone:
- query = "SELECT name, license, artist, credit FROM" \
- " page_imgs INNER JOIN imgs ON page_imgs.img_name = imgs.name" \
- " WHERE page_imgs.page_id = ?"
+ if (enwikiId, 'enwiki') not in imgsDone:
+ query = 'SELECT name, license, artist, credit FROM' \
+ ' page_imgs INNER JOIN imgs ON page_imgs.img_name = imgs.name' \
+ ' WHERE page_imgs.page_id = ?'
row = enwikiCur.execute(query, (enwikiId,)).fetchone()
- if row == None:
- print(f"ERROR: No image record for enwiki ID {enwikiId}")
+ if row is None:
+ print(f'ERROR: No image record for enwiki ID {enwikiId}')
break
- (name, license, artist, credit) = row
- url = "https://en.wikipedia.org/wiki/File:" + urllib.parse.quote(name)
- dbCur.execute("INSERT INTO images VALUES (?, ?, ?, ?, ?, ?)",
- (enwikiId, "enwiki", url, license, artist, credit))
- imgsDone.add((enwikiId, "enwiki"))
- dbCur.execute("INSERT INTO node_imgs VALUES (?, ?, ?)", (nodeName, enwikiId, "enwiki"))
+ name, license, artist, credit = row
+ url = 'https://en.wikipedia.org/wiki/File:' + urllib.parse.quote(name)
+ dbCur.execute('INSERT INTO images VALUES (?, ?, ?, ?, ?, ?)',
+ (enwikiId, 'enwiki', url, license, artist, credit))
+ imgsDone.add((enwikiId, 'enwiki'))
+ dbCur.execute('INSERT INTO node_imgs VALUES (?, ?, ?)', (nodeName, enwikiId, 'enwiki'))
# Close dbs
quit()
diff --git a/backend/tolData/genLinkedImgs.py b/backend/tolData/genLinkedImgs.py
index eb991b9..6d2feff 100755
--- a/backend/tolData/genLinkedImgs.py
+++ b/backend/tolData/genLinkedImgs.py
@@ -1,6 +1,6 @@
#!/usr/bin/python3
-import sys, re
+import re
import sqlite3
import argparse
@@ -10,113 +10,115 @@ associate them with images from their children
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-dbFile = "data.db"
-compoundNameRegex = re.compile(r"\[(.+) \+ (.+)]")
+dbFile = 'data.db'
+compoundNameRegex = re.compile(r'\[(.+) \+ (.+)]')
upPropagateCompoundImgs = False
-print("Opening databases")
+print('Opening databases')
dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
-dbCur.execute("CREATE TABLE linked_imgs (name TEXT PRIMARY KEY, otol_ids TEXT)")
+dbCur.execute('CREATE TABLE linked_imgs (name TEXT PRIMARY KEY, otol_ids TEXT)')
-print("Getting nodes with images")
-resolvedNodes = {} # Will map node names to otol IDs with a usable image
-query = "SELECT nodes.name, nodes.id FROM nodes INNER JOIN node_imgs ON nodes.name = node_imgs.name"
-for (name, otolId) in dbCur.execute(query):
+print('Getting nodes with images')
+resolvedNodes: dict[str, str] = {} # Will map node names to otol IDs with a usable image
+query = 'SELECT nodes.name, nodes.id FROM nodes INNER JOIN node_imgs ON nodes.name = node_imgs.name'
+for name, otolId in dbCur.execute(query):
resolvedNodes[name] = otolId
-print(f"Found {len(resolvedNodes)}")
+print(f'Found {len(resolvedNodes)}')
-print("Iterating through nodes, trying to resolve images for ancestors")
-nodesToResolve = {} # Maps a node name to a list of objects that represent possible child images
-processedNodes = {} # Map a node name to an OTOL ID, representing a child node whose image is to be used
-parentToChosenTips = {} # Used to prefer images from children with more tips
+print('Iterating through nodes, trying to resolve images for ancestors')
+nodesToResolve: dict[str, list[dict[str, str | int | None]]] = {}
+ # Maps a node name to a list of objects that represent possible child images
+processedNodes: dict[str, str] = {} # Map a node name to an OTOL ID, representing a child node whose image is to be used
+parentToChosenTips: dict[str, int] = {} # Used to prefer images from children with more tips
iterNum = 0
-while len(resolvedNodes) > 0:
+while resolvedNodes:
iterNum += 1
if iterNum % 1e3 == 0:
- print(f"At iteration {iterNum}")
+ print(f'At iteration {iterNum}')
# Get next node
- (nodeName, otolId) = resolvedNodes.popitem()
+ nodeName, otolId = resolvedNodes.popitem()
processedNodes[nodeName] = otolId
# Traverse upwards, resolving ancestors if able
while True:
# Get parent
- row = dbCur.execute("SELECT parent FROM edges WHERE child = ?", (nodeName,)).fetchone()
- if row == None or row[0] in processedNodes or row[0] in resolvedNodes:
+ row = dbCur.execute('SELECT parent FROM edges WHERE child = ?', (nodeName,)).fetchone()
+ if row is None or row[0] in processedNodes or row[0] in resolvedNodes:
break
- parent = row[0]
+ parent: str = row[0]
# Get parent data
if parent not in nodesToResolve:
- childNames = [row[0] for row in dbCur.execute("SELECT child FROM edges WHERE parent = ?", (parent,))]
- query = "SELECT name, tips FROM nodes WHERE name IN ({})".format(",".join(["?"] * len(childNames)))
- childObjs = [{"name": row[0], "tips": row[1], "otolId": None} for row in dbCur.execute(query, childNames)]
- childObjs.sort(key=lambda x: x["tips"], reverse=True)
+ childNames: list[str] = [
+ row[0] for row in dbCur.execute('SELECT child FROM edges WHERE parent = ?', (parent,))]
+ query = 'SELECT name, tips FROM nodes WHERE name IN ({})'.format(','.join(['?'] * len(childNames)))
+ childObjs = [{'name': row[0], 'tips': row[1], 'otolId': None} for row in dbCur.execute(query, childNames)]
+ childObjs.sort(key=lambda x: x['tips'], reverse=True)
nodesToResolve[parent] = childObjs
else:
childObjs = nodesToResolve[parent]
# Check if highest-tips child
- if (childObjs[0]["name"] == nodeName):
+ if childObjs[0]['name'] == nodeName:
# Resolve parent, and continue from it
- dbCur.execute("INSERT INTO linked_imgs VALUES (?, ?)", (parent, otolId))
+ dbCur.execute('INSERT INTO linked_imgs VALUES (?, ?)', (parent, otolId))
del nodesToResolve[parent]
processedNodes[parent] = otolId
- parentToChosenTips[parent] = childObjs[0]["tips"]
+ parentToChosenTips[parent] = childObjs[0]['tips']
nodeName = parent
continue
else:
# Mark child as a potential choice
- childObj = next(c for c in childObjs if c["name"] == nodeName)
- childObj["otolId"] = otolId
+ childObj = next(c for c in childObjs if c['name'] == nodeName)
+ childObj['otolId'] = otolId
break
# When out of resolved nodes, resolve nodesToResolve nodes, possibly adding more nodes to resolve
- if len(resolvedNodes) == 0:
- for (name, childObjs) in nodesToResolve.items():
- childObj = next(c for c in childObjs if c["otolId"] != None)
- resolvedNodes[name] = childObj["otolId"]
- parentToChosenTips[name] = childObj["tips"]
- dbCur.execute("INSERT INTO linked_imgs VALUES (?, ?)", (name, childObj["otolId"]))
+ if not resolvedNodes:
+ for name, childObjs in nodesToResolve.items():
+ childObj = next(c for c in childObjs if c['otolId'] is not None)
+ resolvedNodes[name] = childObj['otolId']
+ parentToChosenTips[name] = childObj['tips']
+ dbCur.execute('INSERT INTO linked_imgs VALUES (?, ?)', (name, childObj['otolId']))
nodesToResolve.clear()
-print("Replacing linked-images for compound nodes")
+print('Replacing linked-images for compound nodes')
iterNum = 0
for nodeName in processedNodes.keys():
iterNum += 1
if iterNum % 1e4 == 0:
- print(f"At iteration {iterNum}")
+ print(f'At iteration {iterNum}')
#
match = compoundNameRegex.fullmatch(nodeName)
- if match != None:
+ if match is not None:
# Replace associated image with subname images
- (subName1, subName2) = match.group(1,2)
- otolIdPair = ["", ""]
+ subName1, subName2 = match.group(1,2)
+ otolIdPair = ['', '']
if subName1 in processedNodes:
otolIdPair[0] = processedNodes[subName1]
if subName2 in processedNodes:
otolIdPair[1] = processedNodes[subName2]
# Use no image if both subimages not found
- if otolIdPair[0] == "" and otolIdPair[1] == "":
- dbCur.execute("DELETE FROM linked_imgs WHERE name = ?", (nodeName,))
+ if otolIdPair[0] == '' and otolIdPair[1] == '':
+ dbCur.execute('DELETE FROM linked_imgs WHERE name = ?', (nodeName,))
continue
# Add to db
- dbCur.execute("UPDATE linked_imgs SET otol_ids = ? WHERE name = ?",
- (otolIdPair[0] + "," + otolIdPair[1], nodeName))
+ dbCur.execute('UPDATE linked_imgs SET otol_ids = ? WHERE name = ?',
+ (otolIdPair[0] + ',' + otolIdPair[1], nodeName))
# Possibly repeat operation upon parent/ancestors
if upPropagateCompoundImgs:
while True:
# Get parent
- row = dbCur.execute("SELECT parent FROM edges WHERE child = ?", (nodeName,)).fetchone()
- if row != None:
+ row = dbCur.execute('SELECT parent FROM edges WHERE child = ?', (nodeName,)).fetchone()
+ if row is not None:
parent = row[0]
# Check num tips
- (numTips,) = dbCur.execute("SELECT tips from nodes WHERE name = ?", (nodeName,)).fetchone()
+ (numTips,) = dbCur.execute('SELECT tips from nodes WHERE name = ?', (nodeName,)).fetchone()
if parent in parentToChosenTips and parentToChosenTips[parent] <= numTips:
# Replace associated image
- dbCur.execute("UPDATE linked_imgs SET otol_ids = ? WHERE name = ?",
- (otolIdPair[0] + "," + otolIdPair[1], parent))
+ dbCur.execute('UPDATE linked_imgs SET otol_ids = ? WHERE name = ?',
+ (otolIdPair[0] + ',' + otolIdPair[1], parent))
nodeName = parent
continue
break
-print("Closing databases")
+print('Closing databases')
dbCon.commit()
dbCon.close()
diff --git a/backend/tolData/genMappingData.py b/backend/tolData/genMappingData.py
index d562d7e..5339c4e 100755
--- a/backend/tolData/genMappingData.py
+++ b/backend/tolData/genMappingData.py
@@ -1,11 +1,11 @@
#!/usr/bin/python3
-import sys, re, os
+import os
from collections import defaultdict
-import gzip, bz2, csv, sqlite3
+import gzip, csv, sqlite3
import argparse
-parser = argparse.ArgumentParser(description='''
+parser = argparse.ArgumentParser(description="""
Maps otol IDs to EOL and enwiki titles, using IDs from various
other sources (like NCBI).
@@ -15,7 +15,7 @@ and in a wikidata dump, and stores results in the database.
Based on code from https://github.com/OneZoom/OZtree, located in
OZprivate/ServerScripts/TaxonMappingAndPopularity/ (22 Aug 2022).
-''', formatter_class=argparse.RawDescriptionHelpFormatter)
+""", formatter_class=argparse.RawDescriptionHelpFormatter)
args = parser.parse_args()
taxonomyFile = 'otol/taxonomy.tsv'
@@ -33,8 +33,8 @@ print('Reading taxonomy file')
# uid (otol-id, eg: 93302), parent_uid, name, rank,
# sourceinfo (comma-separated source specifiers, eg: ncbi:2952,gbif:3207147), uniqueName, flags
OTOL_SRCS = ['ncbi', 'if', 'worms', 'irmng', 'gbif'] # Earlier sources will get higher priority
-nodeToSrcIds = defaultdict(dict) # Maps otol ID to {src1: id1, src2: id2, ...}
-usedSrcIds = set() # {(src1, id1), ...} (used to avoid storing IDs that won't be used)
+nodeToSrcIds: dict[int, dict[str, int]] = defaultdict(dict) # Maps otol ID to {src1: id1, src2: id2, ...}
+usedSrcIds: set[tuple[str, int]] = set() # {(src1, id1), ...} (used to avoid storing IDs that won't be used)
with open(taxonomyFile) as file: # Had about 4.5e6 lines
lineNum = 0
for line in file:
@@ -51,12 +51,12 @@ with open(taxonomyFile) as file: # Had about 4.5e6 lines
except ValueError:
print(f'Skipping non-integral ID {fields[0]} on line {lineNum}')
continue
- srcInfo = fields[4]
+ srcsField = fields[4]
# Add source IDs
- for srcPair in srcInfo.split(','):
- src, srcId = srcPair.split(':', 1)
- if srcId.isdecimal() and src in OTOL_SRCS and src not in nodeToSrcIds[otolId]:
- srcId = int(srcId)
+ for srcPair in srcsField.split(','):
+ src, srcIdStr = srcPair.split(':', 1)
+ if srcIdStr.isdecimal() and src in OTOL_SRCS and src not in nodeToSrcIds[otolId]:
+ srcId = int(srcIdStr)
nodeToSrcIds[otolId][src] = srcId
usedSrcIds.add((src, srcId))
print(f'- Result has {sum([len(v) for v in nodeToSrcIds.values()]):,} entries') # Was about 6.7e6
@@ -66,7 +66,7 @@ print('Reading EOL provider_ids file')
# node_id, resource_pk (ID from external source), resource_id (int denoting external-source),
# page_id (eol ID), preferred_canonical_for_page
EOL_SRCS = {676: 'ncbi', 459: 'worms', 767: 'gbif'} # Maps ints to external-source names
-srcToEolId = {src: {} for src in EOL_SRCS.values()} # Maps src1 to {id1: eolId1, ...}
+srcToEolId: dict[str, dict[int, int]] = {src: {} for src in EOL_SRCS.values()} # Maps src1 to {id1: eolId1, ...}
with gzip.open(eolIdsFile, mode='rt') as file: # Had about 13e6 lines
for lineNum, row in enumerate(csv.reader(file), 1):
if lineNum % 1e6 == 0:
@@ -77,9 +77,9 @@ with gzip.open(eolIdsFile, mode='rt') as file: # Had about 13e6 lines
# Parse line
eolId = int(row[3])
srcVal = int(row[2])
- srcId = row[1]
- if srcId.isdecimal() and srcVal in EOL_SRCS:
- srcId = int(srcId)
+ srcIdStr = row[1]
+ if srcIdStr.isdecimal() and srcVal in EOL_SRCS:
+ srcId = int(srcIdStr)
src = EOL_SRCS[srcVal]
if (src, srcId) not in usedSrcIds:
continue
@@ -92,9 +92,9 @@ print(f'- Result has {sum([len(v) for v in srcToEolId.values()]):,} entries')
print('Resolving candidate EOL IDs')
# For each otol ID, find eol IDs with matching sources, and choose the 'best' one
-nodeToEolId = {} # Maps otol ID to eol ID
+nodeToEolId: dict[int, int] = {} # Maps otol ID to eol ID
for otolId, srcInfo in nodeToSrcIds.items():
- eolIdToCount = defaultdict(int)
+ eolIdToCount: dict[int, int] = defaultdict(int)
for src, srcId in srcInfo.items():
if src in srcToEolId and srcId in srcToEolId[src]:
eolId = srcToEolId[src][srcId]
@@ -109,9 +109,9 @@ for otolId, srcInfo in nodeToSrcIds.items():
print(f'- Result has {len(nodeToEolId):,} entries') # Was about 2.7e6
print('Reading from Wikidata db')
-srcToWikiTitle = defaultdict(dict) # Maps 'eol'/etc to {srcId1: title1, ...}
+srcToWikiTitle: dict[str, dict[int, str]] = defaultdict(dict) # Maps 'eol'/etc to {srcId1: title1, ...}
wikiTitles = set()
-titleToIucnStatus = {}
+titleToIucnStatus: dict[str, str] = {}
dbCon = sqlite3.connect(wikidataDb)
dbCur = dbCon.cursor()
for src, srcId, title in dbCur.execute('SELECT src, id, title from src_id_to_title'):
@@ -129,9 +129,9 @@ dbCon.close()
print('Resolving candidate Wikidata items')
# For each otol ID, find wikidata titles with matching sources, and choose the 'best' one
-nodeToWikiTitle = {}
+nodeToWikiTitle: dict[int, str] = {}
for otolId, srcInfo in nodeToSrcIds.items():
- titleToSrcs = defaultdict(list) # Maps candidate titles to {src1: srcId1, ...}
+ titleToSrcs: dict[str, list[str]] = defaultdict(list) # Maps candidate titles to list of sources
for src, srcId in srcInfo.items():
if src in srcToWikiTitle and srcId in srcToWikiTitle[src]:
title = srcToWikiTitle[src][srcId]
@@ -157,7 +157,7 @@ print(f'- Result has {len(nodeToWikiTitle):,} entries') # Was about 4e5
print('Adding extra EOL mappings from Wikidata')
eolIdToNode = {eolId: node for node, eolId in nodeToEolId.items()}
wikiTitleToNode = {title: node for node, title in nodeToWikiTitle.items()}
-addedEntries = {}
+addedEntries: dict[int, int] = {}
for eolId, title in srcToWikiTitle['eol'].items():
if title in wikiTitleToNode:
otolId = wikiTitleToNode[title]
@@ -173,8 +173,8 @@ for src in pickedMappings:
continue
with open(filename) as file:
for line in file:
- otolId, mappedVal = line.rstrip().split('|')
- otolId = int(otolId)
+ otolIdStr, mappedVal = line.rstrip().split('|')
+ otolId = int(otolIdStr)
if src == 'eol':
if mappedVal:
nodeToEolId[otolId] = int(mappedVal)
@@ -188,15 +188,15 @@ for src in pickedMappings:
if otolId in nodeToWikiTitle:
del nodeToWikiTitle[otolId]
-print(f'Getting enwiki page IDs')
-titleToPageId = {}
+print('Getting enwiki page IDs')
+titleToPageId: dict[str, int] = {}
numNotFound = 0
dbCon = sqlite3.connect(enwikiDumpIndexDb)
dbCur = dbCon.cursor()
for title in nodeToWikiTitle.values():
- row = dbCur.execute('SELECT id FROM offsets WHERE title = ?', (title,)).fetchone()
- if row != None:
- titleToPageId[title] = row[0]
+ record = dbCur.execute('SELECT id FROM offsets WHERE title = ?', (title,)).fetchone()
+ if record != None:
+ titleToPageId[title] = record[0]
else:
numNotFound += 1
dbCon.close()
@@ -206,7 +206,7 @@ print('Writing to db')
dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
# Get otol id-to-name map
-otolIdToName = {}
+otolIdToName: dict[int, str] = {}
for nodeName, nodeId in dbCur.execute('SELECT name, id from nodes'):
if nodeId.startswith('ott'):
otolIdToName[int(nodeId[3:])] = nodeName
diff --git a/backend/tolData/genNameData.py b/backend/tolData/genNameData.py
index 7e6c025..2df144d 100755
--- a/backend/tolData/genNameData.py
+++ b/backend/tolData/genNameData.py
@@ -1,13 +1,13 @@
#!/usr/bin/python3
-import sys, re, os
+import re, os
import html, csv, sqlite3
import argparse
-parser = argparse.ArgumentParser(description='''
+parser = argparse.ArgumentParser(description="""
Maps nodes to vernacular names, using data from EOL, enwiki, and a
picked-names file, and stores results in the database.
-''', formatter_class=argparse.RawDescriptionHelpFormatter)
+""", formatter_class=argparse.RawDescriptionHelpFormatter)
args = parser.parse_args()
eolNamesFile = 'eol/vernacularNames.csv'
@@ -19,9 +19,9 @@ dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
print('Getting node mappings')
-nodeToTips = {}
-eolIdToNode = {} # Maps eol ID to node name (if there are multiple, choose one with most tips)
-wikiIdToNode = {}
+nodeToTips: dict[str, int] = {}
+eolIdToNode: dict[int, str] = {} # Maps eol ID to node name (if there are multiple, choose one with most tips)
+wikiIdToNode: dict[int, str] = {}
for name, tips in dbCur.execute('SELECT name, tips from nodes'):
nodeToTips[name] = tips
for name, eolId in dbCur.execute('SELECT name, id from eol_ids'):
@@ -58,7 +58,8 @@ with open(eolNamesFile, newline='') as file:
# Add to db
if eolId in eolIdToNode and name not in namesToSkip and name not in nodeToTips \
and lang == 'eng' and len(name.split(' ')) <= 3: # Ignore names with >3 words
- cmd = 'INSERT OR IGNORE INTO names VALUES (?, ?, ?, \'eol\')' # The 'OR IGNORE' accounts for duplicate lines
+ cmd = 'INSERT OR IGNORE INTO names VALUES (?, ?, ?, \'eol\')'
+ # The 'OR IGNORE' accounts for duplicate lines
dbCur.execute(cmd, (eolIdToNode[eolId], name, isPreferred))
print('Getting names from Wikipedia')
@@ -76,7 +77,7 @@ for wikiId, nodeName in wikiIdToNode.items():
' INNER JOIN pages p2 ON r1.target = p2.title WHERE p2.id = ?'
for (name,) in enwikiCur.execute(query, (wikiId,)):
name = name.lower()
- if altNameRegex.fullmatch(name) != None and name != nodeName and name not in nodeToTips:
+ if altNameRegex.fullmatch(name) is not None and name != nodeName and name not in nodeToTips:
dbCur.execute('INSERT OR IGNORE INTO names VALUES (?, ?, ?, \'enwiki\')', (nodeName, name, 0))
print('Getting picked names')
@@ -84,16 +85,15 @@ print('Getting picked names')
# nodename1|altName1|isPreferred1 -> Add an alt-name
# nodename1|altName1| -> Remove an alt-name
# nodename1|nodeName1| -> Remove any preferred-alt status
-altNamesToSkip = {} # Maps node names to alt-names to exclude
if os.path.exists(pickedNamesFile):
with open(pickedNamesFile) as file:
for line in file:
- nodeName, altName, isPreferred = line.lower().rstrip().split('|')
+ nodeName, altName, isPreferredStr = line.lower().rstrip().split('|')
if nodeName not in nodeToTips:
- print(f"Skipping \"{nodeName}\", as no such node exists")
+ print(f'Skipping "{nodeName}", as no such node exists')
continue
- if isPreferred:
- isPreferred = 1 if isPreferred == '1' else 0
+ if isPreferredStr:
+ isPreferred = 1 if isPreferredStr == '1' else 0
if isPreferred == 1:
# Remove any existing preferred-alt status
cmd = 'UPDATE names SET pref_alt = 0 WHERE name = ? AND alt_name = ? AND pref_alt = 1'
@@ -101,7 +101,7 @@ if os.path.exists(pickedNamesFile):
# Remove any existing record
dbCur.execute('DELETE FROM names WHERE name = ? AND alt_name = ?', (nodeName, altName))
# Add record
- dbCur.execute("INSERT INTO names VALUES (?, ?, ?, 'picked')", (nodeName, altName, isPreferred))
+ dbCur.execute('INSERT INTO names VALUES (?, ?, ?, "picked")', (nodeName, altName, isPreferred))
elif nodeName != altName: # Remove any matching record
dbCur.execute('DELETE FROM names WHERE name = ? AND alt_name = ?', (nodeName, altName))
else: # Remove any preferred-alt status
diff --git a/backend/tolData/genOtolData.py b/backend/tolData/genOtolData.py
index 6310cc9..d4d6ee8 100755
--- a/backend/tolData/genOtolData.py
+++ b/backend/tolData/genOtolData.py
@@ -1,6 +1,6 @@
#!/usr/bin/python3
-import sys, re, os
+import re, os
import json, sqlite3
import argparse
@@ -26,16 +26,8 @@ Reads from a picked-names file, if present, which specifies name and node ID pai
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-treeFile = "otol/labelled_supertree_ottnames.tre" # Had about 2.5e9 nodes
-annFile = "otol/annotations.json"
-dbFile = "data.db"
-nodeMap = {} # Maps node IDs to node objects
-nameToFirstId = {} # Maps node names to first found ID (names might have multiple IDs)
-dupNameToIds = {} # Maps names of nodes with multiple IDs to those IDs
-pickedNamesFile = "pickedOtolNames.txt"
-
class Node:
- " Represents a tree-of-life node "
+ ' Represents a tree-of-life node '
def __init__(self, name, childIds, parentId, tips, pSupport):
self.name = name
self.childIds = childIds
@@ -43,35 +35,43 @@ class Node:
self.tips = tips
self.pSupport = pSupport
-print("Parsing tree file")
+treeFile = 'otol/labelled_supertree_ottnames.tre' # Had about 2.5e9 nodes
+annFile = 'otol/annotations.json'
+dbFile = 'data.db'
+nodeMap: dict[str, Node] = {} # Maps node IDs to node objects
+nameToFirstId: dict[str, str] = {} # Maps node names to first found ID (names might have multiple IDs)
+dupNameToIds: dict[str, list[str]] = {} # Maps names of nodes with multiple IDs to those IDs
+pickedNamesFile = 'pickedOtolNames.txt'
+
+print('Parsing tree file')
# Read file
-data = None
+data: str
with open(treeFile) as file:
data = file.read()
dataIdx = 0
# Parse content
iterNum = 0
-def parseNewick():
- " Parses a node using 'data' and 'dataIdx', updates nodeMap accordingly, and returns the node's ID "
+def parseNewick() -> str:
+ """ Parses a node using 'data' and 'dataIdx', updates nodeMap accordingly, and returns the node's ID """
global data, dataIdx, iterNum
iterNum += 1
if iterNum % 1e5 == 0:
- print(f"At iteration {iterNum}")
+ print(f'At iteration {iterNum}')
# Check for EOF
if dataIdx == len(data):
- raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}")
+ raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}')
# Check for node
- if data[dataIdx] == "(": # parse inner node
+ if data[dataIdx] == '(': # parse inner node
dataIdx += 1
- childIds = []
+ childIds: list[str] = []
while True:
# Read child
childId = parseNewick()
childIds.append(childId)
if (dataIdx == len(data)):
- raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}")
+ raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}')
# Check for next child
- if (data[dataIdx] == ","):
+ if (data[dataIdx] == ','):
dataIdx += 1
continue
else:
@@ -94,10 +94,10 @@ def parseNewick():
updateNameMaps(name, id)
nodeMap[id] = Node(name, [], None, 1, False)
return id
-def parseNewickName():
- " Parses a node name using 'data' and 'dataIdx', and returns a (name, id) pair "
+def parseNewickName() -> tuple[str, str]:
+ """ Parses a node name using 'data' and 'dataIdx', and returns a (name, id) pair """
global data, dataIdx
- name = None
+ name: str
end = dataIdx
# Get name
if (end < len(data) and data[end] == "'"): # Check for quoted name
@@ -114,33 +114,33 @@ def parseNewickName():
break
end += 1
if inQuote:
- raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}")
+ raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}')
name = data[dataIdx:end]
dataIdx = end
else:
- while end < len(data) and not re.match(r"[(),]", data[end]):
+ while end < len(data) and not re.match(r'[(),]', data[end]):
end += 1
if (end == dataIdx):
- raise Exception(f"ERROR: Unexpected EOF at index {dataIdx}")
+ raise Exception(f'ERROR: Unexpected EOF at index {dataIdx}')
name = data[dataIdx:end].rstrip()
if end == len(data): # Ignore trailing input semicolon
name = name[:-1]
dataIdx = end
# Convert to (name, id)
name = name.lower()
- if name.startswith("mrca"):
+ if name.startswith('mrca'):
return (name, name)
elif name[0] == "'":
- match = re.fullmatch(r"'([^\\\"]+) (ott\d+)'", name)
- if match == None:
- raise Exception(f"ERROR: invalid name \"{name}\"")
+ match = re.fullmatch(r"'([^\\\']+) (ott\d+)'", name)
+ if match is None:
+ raise Exception(f'ERROR: invalid name \'{name}\'')
name = match.group(1).replace("''", "'")
return (name, match.group(2))
else:
- match = re.fullmatch(r"([^\\\"]+)_(ott\d+)", name)
- if match == None:
- raise Exception(f"ERROR: invalid name \"{name}\"")
- return (match.group(1).replace("_", " "), match.group(2))
+ match = re.fullmatch(r"([^\\\']+)_(ott\d+)", name)
+ if match is None:
+ raise Exception(f'ERROR: invalid name \'{name}\'')
+ return (match.group(1).replace('_', ' '), match.group(2))
def updateNameMaps(name, id):
global nameToFirstId, dupNameToIds
if name not in nameToFirstId:
@@ -150,18 +150,18 @@ def updateNameMaps(name, id):
dupNameToIds[name] = [nameToFirstId[name], id]
else:
dupNameToIds[name].append(id)
-rootId = parseNewick()
+rootId: str = parseNewick()
-print("Resolving duplicate names")
+print('Resolving duplicate names')
# Read picked-names file
-nameToPickedId = {}
+nameToPickedId: dict[str, str] = {}
if os.path.exists(pickedNamesFile):
with open(pickedNamesFile) as file:
for line in file:
- (name, _, otolId) = line.rstrip().partition("|")
+ name, _, otolId = line.rstrip().partition('|')
nameToPickedId[name] = otolId
# Resolve duplicates
-for (dupName, ids) in dupNameToIds.items():
+for dupName, ids in dupNameToIds.items():
# Check for picked id
if dupName in nameToPickedId:
idToUse = nameToPickedId[dupName]
@@ -174,16 +174,16 @@ for (dupName, ids) in dupNameToIds.items():
counter = 2
for id in ids:
if id != idToUse:
- nodeMap[id].name += f" [{counter}]"
+ nodeMap[id].name += f' [{counter}]'
counter += 1
-print("Changing mrca* names")
-def convertMrcaName(id):
+print('Changing mrca* names')
+def convertMrcaName(id: str):
node = nodeMap[id]
name = node.name
childIds = node.childIds
if len(childIds) < 2:
- print(f"WARNING: MRCA node \"{name}\" has less than 2 children")
+ print(f'WARNING: MRCA node \'{name}\' has less than 2 children')
return
# Get 2 children with most tips
childTips = [nodeMap[id].tips for id in childIds]
@@ -195,53 +195,52 @@ def convertMrcaName(id):
childName1 = nodeMap[childId1].name
childName2 = nodeMap[childId2].name
# Check for mrca* child names
- if childName1.startswith("mrca"):
+ if childName1.startswith('mrca'):
childName1 = convertMrcaName(childId1)
- if childName2.startswith("mrca"):
+ if childName2.startswith('mrca'):
childName2 = convertMrcaName(childId2)
# Check for composite names
- match = re.fullmatch(r"\[(.+) \+ (.+)]", childName1)
- if match != None:
+ match = re.fullmatch(r'\[(.+) \+ (.+)]', childName1)
+ if match is not None:
childName1 = match.group(1)
- match = re.fullmatch(r"\[(.+) \+ (.+)]", childName2)
- if match != None:
+ match = re.fullmatch(r'\[(.+) \+ (.+)]', childName2)
+ if match is not None:
childName2 = match.group(1)
# Create composite name
- node.name = f"[{childName1} + {childName2}]"
+ node.name = f'[{childName1} + {childName2}]'
return childName1
-for (id, node) in nodeMap.items():
- if node.name.startswith("mrca"):
+for id, node in nodeMap.items():
+ if node.name.startswith('mrca'):
convertMrcaName(id)
-print("Parsing annotations file")
+print('Parsing annotations file')
# Read file
-data = None
with open(annFile) as file:
data = file.read()
obj = json.loads(data)
-nodeAnnsMap = obj["nodes"]
+nodeAnnsMap = obj['nodes']
# Find relevant annotations
-for (id, node) in nodeMap.items():
+for id, node in nodeMap.items():
# Set has-support value using annotations
if id in nodeAnnsMap:
nodeAnns = nodeAnnsMap[id]
- supportQty = len(nodeAnns["supported_by"]) if "supported_by" in nodeAnns else 0
- conflictQty = len(nodeAnns["conflicts_with"]) if "conflicts_with" in nodeAnns else 0
+ supportQty = len(nodeAnns['supported_by']) if 'supported_by' in nodeAnns else 0
+ conflictQty = len(nodeAnns['conflicts_with']) if 'conflicts_with' in nodeAnns else 0
node.pSupport = supportQty > 0 and conflictQty == 0
-print("Creating nodes and edges tables")
+print('Creating nodes and edges tables')
dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
-dbCur.execute("CREATE TABLE nodes (name TEXT PRIMARY KEY, id TEXT UNIQUE, tips INT)")
-dbCur.execute("CREATE INDEX nodes_idx_nc ON nodes(name COLLATE NOCASE)")
-dbCur.execute("CREATE TABLE edges (parent TEXT, child TEXT, p_support INT, PRIMARY KEY (parent, child))")
-dbCur.execute("CREATE INDEX edges_child_idx ON edges(child)")
-for (otolId, node) in nodeMap.items():
- dbCur.execute("INSERT INTO nodes VALUES (?, ?, ?)", (node.name, otolId, node.tips))
+dbCur.execute('CREATE TABLE nodes (name TEXT PRIMARY KEY, id TEXT UNIQUE, tips INT)')
+dbCur.execute('CREATE INDEX nodes_idx_nc ON nodes(name COLLATE NOCASE)')
+dbCur.execute('CREATE TABLE edges (parent TEXT, child TEXT, p_support INT, PRIMARY KEY (parent, child))')
+dbCur.execute('CREATE INDEX edges_child_idx ON edges(child)')
+for otolId, node in nodeMap.items():
+ dbCur.execute('INSERT INTO nodes VALUES (?, ?, ?)', (node.name, otolId, node.tips))
for childId in node.childIds:
childNode = nodeMap[childId]
- dbCur.execute("INSERT INTO edges VALUES (?, ?, ?)",
+ dbCur.execute('INSERT INTO edges VALUES (?, ?, ?)',
(node.name, childNode.name, 1 if childNode.pSupport else 0))
-print("Closing database")
+print('Closing database')
dbCon.commit()
dbCon.close()
diff --git a/backend/tolData/genPopData.py b/backend/tolData/genPopData.py
index 9c5382c..3bb1325 100755
--- a/backend/tolData/genPopData.py
+++ b/backend/tolData/genPopData.py
@@ -1,13 +1,12 @@
#!/usr/bin/python3
-import sys
import sqlite3
import argparse
-parser = argparse.ArgumentParser(description='''
+parser = argparse.ArgumentParser(description="""
Reads enwiki page view info from a database, and stores it
as node popularity values in the database.
-''', formatter_class=argparse.RawDescriptionHelpFormatter)
+""", formatter_class=argparse.RawDescriptionHelpFormatter)
args = parser.parse_args()
pageviewsDb = 'enwiki/pageviewData.db'
@@ -19,7 +18,7 @@ dbCur = dbCon.cursor()
print('Getting view counts')
pdbCon = sqlite3.connect(pageviewsDb)
pdbCur = pdbCon.cursor()
-nodeToViews = {} # Maps node names to counts
+nodeToViews: dict[str, int] = {} # Maps node names to counts
iterNum = 0
for wikiId, views in pdbCur.execute('SELECT id, views from views'):
iterNum += 1
@@ -27,7 +26,7 @@ for wikiId, views in pdbCur.execute('SELECT id, views from views'):
print(f'At iteration {iterNum}') # Reached 1.6e6
#
row = dbCur.execute('SELECT name FROM wiki_ids WHERE id = ?', (wikiId,)).fetchone()
- if row != None:
+ if row is not None:
nodeToViews[row[0]] = views
pdbCon.close()
diff --git a/backend/tolData/genReducedTrees.py b/backend/tolData/genReducedTrees.py
index a954fd3..66fef40 100755
--- a/backend/tolData/genReducedTrees.py
+++ b/backend/tolData/genReducedTrees.py
@@ -1,7 +1,7 @@
#!/usr/bin/python3
-import sys, os.path, re
-import json, sqlite3
+import sys, re
+import sqlite3
import argparse
parser = argparse.ArgumentParser(description="""
@@ -17,13 +17,13 @@ Creates reduced versions of the tree in the database:
presence in the 'picked' tree. And, for nodes with 'many' children,
removing some more, despite any node descriptions.
""", formatter_class=argparse.RawDescriptionHelpFormatter)
-parser.add_argument("--tree", choices=["picked", "images", "trimmed"], help="Only generate the specified tree")
+parser.add_argument('--tree', choices=['picked', 'images', 'trimmed'], help='Only generate the specified tree')
args = parser.parse_args()
tree = args.tree
-dbFile = "data.db"
-pickedNodesFile = "pickedNodes.txt"
-COMP_NAME_REGEX = re.compile(r"\[.+ \+ .+]") # Used to recognise composite nodes
+dbFile = 'data.db'
+pickedNodesFile = 'pickedNodes.txt'
+COMP_NAME_REGEX = re.compile(r'\[.+ \+ .+]') # Used to recognise composite nodes
class Node:
def __init__(self, id, children, parent, tips, pSupport):
@@ -33,144 +33,153 @@ class Node:
self.tips = tips
self.pSupport = pSupport
-print("Opening database")
+print('Opening database')
dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
-def genPickedNodeTree(dbCur, pickedNames, rootName):
+def genPickedNodeTree(dbCur: sqlite3.Cursor, pickedNames: set[str], rootName: str) -> None:
global COMP_NAME_REGEX
PREF_NUM_CHILDREN = 3 # Include extra children up to this limit
- nodeMap = {} # Maps node names to Nodes
- print("Getting ancestors")
+ print('Getting ancestors')
nodeMap = genNodeMap(dbCur, pickedNames, 100)
- print(f"Result has {len(nodeMap)} nodes")
- print("Removing composite nodes")
+ print(f'Result has {len(nodeMap)} nodes')
+ print('Removing composite nodes')
removedNames = removeCompositeNodes(nodeMap)
- print(f"Result has {len(nodeMap)} nodes")
- print("Removing 'collapsible' nodes")
+ print(f'Result has {len(nodeMap)} nodes')
+ print('Removing \'collapsible\' nodes')
temp = removeCollapsibleNodes(nodeMap, pickedNames)
removedNames.update(temp)
- print(f"Result has {len(nodeMap)} nodes")
- print("Adding some additional nearby children")
- namesToAdd = []
+ print(f'Result has {len(nodeMap)} nodes')
+ print('Adding some additional nearby children')
+ namesToAdd: list[str] = []
iterNum = 0
- for (name, node) in nodeMap.items():
+ for name, node in nodeMap.items():
iterNum += 1
if iterNum % 100 == 0:
- print(f"At iteration {iterNum}")
+ print(f'At iteration {iterNum}')
#
numChildren = len(node.children)
if numChildren < PREF_NUM_CHILDREN:
- children = [row[0] for row in dbCur.execute("SELECT child FROM edges where parent = ?", (name,))]
- newChildren = []
+ children = [row[0] for row in dbCur.execute('SELECT child FROM edges where parent = ?', (name,))]
+ newChildren: list[str] = []
for n in children:
if n in nodeMap or n in removedNames:
continue
- if COMP_NAME_REGEX.fullmatch(n) != None:
+ if COMP_NAME_REGEX.fullmatch(n) is not None:
continue
- if dbCur.execute("SELECT name from node_imgs WHERE name = ?", (n,)).fetchone() == None and \
- dbCur.execute("SELECT name from linked_imgs WHERE name = ?", (n,)).fetchone() == None:
+ if dbCur.execute('SELECT name from node_imgs WHERE name = ?', (n,)).fetchone() is None and \
+ dbCur.execute('SELECT name from linked_imgs WHERE name = ?', (n,)).fetchone() is None:
continue
newChildren.append(n)
newChildNames = newChildren[:(PREF_NUM_CHILDREN - numChildren)]
node.children.extend(newChildNames)
namesToAdd.extend(newChildNames)
for name in namesToAdd:
- parent, pSupport = dbCur.execute("SELECT parent, p_support from edges WHERE child = ?", (name,)).fetchone()
- (id,) = dbCur.execute("SELECT id FROM nodes WHERE name = ?", (name,)).fetchone()
- parent = None if parent == "" else parent
+ parent, pSupport = dbCur.execute('SELECT parent, p_support from edges WHERE child = ?', (name,)).fetchone()
+ (id,) = dbCur.execute('SELECT id FROM nodes WHERE name = ?', (name,)).fetchone()
+ parent = None if parent == '' else parent
nodeMap[name] = Node(id, [], parent, 0, pSupport == 1)
- print(f"Result has {len(nodeMap)} nodes")
- print("Updating 'tips' values")
+ print(f'Result has {len(nodeMap)} nodes')
+ print('Updating \'tips\' values')
updateTips(rootName, nodeMap)
- print("Creating table")
- addTreeTables(nodeMap, dbCur, "p")
-def genImagesOnlyTree(dbCur, nodesWithImgOrPicked, pickedNames, rootName):
- print("Getting ancestors")
+ print('Creating table')
+ addTreeTables(nodeMap, dbCur, 'p')
+def genImagesOnlyTree(
+ dbCur: sqlite3.Cursor,
+ nodesWithImgOrPicked: set[str],
+ pickedNames: set[str],
+ rootName: str) -> None:
+ print('Getting ancestors')
nodeMap = genNodeMap(dbCur, nodesWithImgOrPicked, 1e4)
- print(f"Result has {len(nodeMap)} nodes")
- print("Removing composite nodes")
+ print(f'Result has {len(nodeMap)} nodes')
+ print('Removing composite nodes')
removeCompositeNodes(nodeMap)
- print(f"Result has {len(nodeMap)} nodes")
- print("Removing 'collapsible' nodes")
+ print(f'Result has {len(nodeMap)} nodes')
+ print('Removing \'collapsible\' nodes')
removeCollapsibleNodes(nodeMap, pickedNames)
- print(f"Result has {len(nodeMap)} nodes")
- print(f"Updating 'tips' values") # Needed for next trimming step
+ print(f'Result has {len(nodeMap)} nodes')
+ print('Updating \'tips\' values') # Needed for next trimming step
updateTips(rootName, nodeMap)
- print(f"Trimming from nodes with 'many' children")
+ print('Trimming from nodes with \'many\' children')
trimIfManyChildren(nodeMap, rootName, 300, pickedNames)
- print(f"Result has {len(nodeMap)} nodes")
- print(f"Updating 'tips' values")
+ print(f'Result has {len(nodeMap)} nodes')
+ print('Updating \'tips\' values')
updateTips(rootName, nodeMap)
- print("Creating table")
- addTreeTables(nodeMap, dbCur, "i")
-def genWeaklyTrimmedTree(dbCur, nodesWithImgDescOrPicked, nodesWithImgOrPicked, rootName):
- print("Getting ancestors")
+ print('Creating table')
+ addTreeTables(nodeMap, dbCur, 'i')
+def genWeaklyTrimmedTree(
+ dbCur: sqlite3.Cursor,
+ nodesWithImgDescOrPicked: set[str],
+ nodesWithImgOrPicked: set[str],
+ rootName: str) -> None:
+ print('Getting ancestors')
nodeMap = genNodeMap(dbCur, nodesWithImgDescOrPicked, 1e5)
- print(f"Result has {len(nodeMap)} nodes")
- print("Getting nodes to 'strongly keep'")
+ print(f'Result has {len(nodeMap)} nodes')
+ print('Getting nodes to \'strongly keep\'')
iterNum = 0
- nodesFromImgOrPicked = set()
+ nodesFromImgOrPicked: set[str] = set()
for name in nodesWithImgOrPicked:
iterNum += 1
if iterNum % 1e4 == 0:
- print(f"At iteration {iterNum}")
+ print(f'At iteration {iterNum}')
#
- while name != None:
+ while name is not None:
if name not in nodesFromImgOrPicked:
nodesFromImgOrPicked.add(name)
name = nodeMap[name].parent
else:
break
- print(f"Node set has {len(nodesFromImgOrPicked)} nodes")
- print("Removing 'collapsible' nodes")
+ print(f'Node set has {len(nodesFromImgOrPicked)} nodes')
+ print('Removing \'collapsible\' nodes')
removeCollapsibleNodes(nodeMap, nodesWithImgDescOrPicked)
- print(f"Result has {len(nodeMap)} nodes")
- print(f"Updating 'tips' values") # Needed for next trimming step
+ print(f'Result has {len(nodeMap)} nodes')
+ print('Updating \'tips\' values') # Needed for next trimming step
updateTips(rootName, nodeMap)
- print(f"Trimming from nodes with 'many' children")
+ print('Trimming from nodes with \'many\' children')
trimIfManyChildren(nodeMap, rootName, 600, nodesFromImgOrPicked)
- print(f"Result has {len(nodeMap)} nodes")
- print(f"Updating 'tips' values")
+ print(f'Result has {len(nodeMap)} nodes')
+ print('Updating \'tips\' values')
updateTips(rootName, nodeMap)
- print("Creating table")
- addTreeTables(nodeMap, dbCur, "t")
+ print('Creating table')
+ addTreeTables(nodeMap, dbCur, 't')
# Helper functions
-def genNodeMap(dbCur, nameSet, itersBeforePrint = 1):
- " Returns a subtree that includes nodes in 'nameSet', as a name-to-Node map "
- nodeMap = {}
+def genNodeMap(dbCur: sqlite3.Cursor, nameSet: set[str], itersBeforePrint = 1) -> dict[str, Node]:
+ """ Returns a subtree that includes nodes in 'nameSet', as a name-to-Node map """
+ nodeMap: dict[str, Node] = {}
iterNum = 0
+ name: str | None
for name in nameSet:
iterNum += 1
if iterNum % itersBeforePrint == 0:
- print(f"At iteration {iterNum}")
+ print(f'At iteration {iterNum}')
#
- prevName = None
- while name != None:
+ prevName: str | None = None
+ while name is not None:
if name not in nodeMap:
# Add node
- (id, tips) = dbCur.execute("SELECT id, tips from nodes where name = ?", (name,)).fetchone()
- row = dbCur.execute("SELECT parent, p_support from edges where child = ?", (name,)).fetchone()
- parent = None if row == None or row[0] == "" else row[0]
- pSupport = row == None or row[1] == 1
- children = [] if prevName == None else [prevName]
+ id, tips = dbCur.execute('SELECT id, tips from nodes where name = ?', (name,)).fetchone()
+ row: None | tuple[str, int] = dbCur.execute(
+ 'SELECT parent, p_support from edges where child = ?', (name,)).fetchone()
+ parent = None if row is None or row[0] == '' else row[0]
+ pSupport = row is None or row[1] == 1
+ children = [] if prevName is None else [prevName]
nodeMap[name] = Node(id, children, parent, 0, pSupport)
# Iterate to parent
prevName = name
name = parent
else:
# Just add as child
- if prevName != None:
+ if prevName is not None:
nodeMap[name].children.append(prevName)
break
return nodeMap
-def removeCompositeNodes(nodeMap):
- " Given a tree, removes composite-name nodes, and returns the removed nodes' names "
+def removeCompositeNodes(nodeMap: dict[str, Node]) -> set[str]:
+ """ Given a tree, removes composite-name nodes, and returns the removed nodes' names """
global COMP_NAME_REGEX
- namesToRemove = set()
- for (name, node) in nodeMap.items():
+ namesToRemove: set[str] = set()
+ for name, node in nodeMap.items():
parent = node.parent
- if parent != None and COMP_NAME_REGEX.fullmatch(name) != None:
+ if parent is not None and COMP_NAME_REGEX.fullmatch(name) is not None:
# Connect children to parent
nodeMap[parent].children.remove(name)
nodeMap[parent].children.extend(node.children)
@@ -182,13 +191,13 @@ def removeCompositeNodes(nodeMap):
for name in namesToRemove:
del nodeMap[name]
return namesToRemove
-def removeCollapsibleNodes(nodeMap, nodesToKeep = {}):
+def removeCollapsibleNodes(nodeMap: dict[str, Node], nodesToKeep: set[str] = set()) -> set[str]:
""" Given a tree, removes single-child parents, then only-childs,
with given exceptions, and returns the set of removed nodes' names """
- namesToRemove = set()
+ namesToRemove: set[str] = set()
# Remove single-child parents
- for (name, node) in nodeMap.items():
- if len(node.children) == 1 and node.parent != None and name not in nodesToKeep:
+ for name, node in nodeMap.items():
+ if len(node.children) == 1 and node.parent is not None and name not in nodesToKeep:
# Connect parent and children
parent = node.parent
child = node.children[0]
@@ -202,8 +211,8 @@ def removeCollapsibleNodes(nodeMap, nodesToKeep = {}):
del nodeMap[name]
# Remove only-childs (not redundant because 'nodesToKeep' can cause single-child parents to be kept)
namesToRemove.clear()
- for (name, node) in nodeMap.items():
- isOnlyChild = node.parent != None and len(nodeMap[node.parent].children) == 1
+ for name, node in nodeMap.items():
+ isOnlyChild = node.parent is not None and len(nodeMap[node.parent].children) == 1
if isOnlyChild and name not in nodesToKeep:
# Connect parent and children
parent = node.parent
@@ -217,9 +226,10 @@ def removeCollapsibleNodes(nodeMap, nodesToKeep = {}):
del nodeMap[name]
#
return namesToRemove
-def trimIfManyChildren(nodeMap, rootName, childThreshold, nodesToKeep = {}):
- namesToRemove = set()
- def findTrimmables(nodeName):
+def trimIfManyChildren(
+ nodeMap: dict[str, Node], rootName: str, childThreshold: int, nodesToKeep: set[str] = set()) -> None:
+ namesToRemove: set[str] = set()
+ def findTrimmables(nodeName: str) -> None:
nonlocal nodeMap, nodesToKeep
node = nodeMap[nodeName]
if len(node.children) > childThreshold:
@@ -236,7 +246,7 @@ def trimIfManyChildren(nodeMap, rootName, childThreshold, nodesToKeep = {}):
# Recurse on children
for n in node.children:
findTrimmables(n)
- def markForRemoval(nodeName):
+ def markForRemoval(nodeName: str) -> None:
nonlocal nodeMap, namesToRemove
namesToRemove.add(nodeName)
for child in nodeMap[nodeName].children:
@@ -244,81 +254,81 @@ def trimIfManyChildren(nodeMap, rootName, childThreshold, nodesToKeep = {}):
findTrimmables(rootName)
for nodeName in namesToRemove:
del nodeMap[nodeName]
-def updateTips(nodeName, nodeMap):
- " Updates the 'tips' values for a node and it's descendants, returning the node's new 'tips' value "
+def updateTips(nodeName: str, nodeMap: dict[str, Node]) -> int:
+ """ Updates the 'tips' values for a node and it's descendants, returning the node's new 'tips' value """
node = nodeMap[nodeName]
tips = sum([updateTips(childName, nodeMap) for childName in node.children])
tips = max(1, tips)
node.tips = tips
return tips
-def addTreeTables(nodeMap, dbCur, suffix):
- " Adds a tree to the database, as tables nodes_X and edges_X, where X is the given suffix "
- nodesTbl = f"nodes_{suffix}"
- edgesTbl = f"edges_{suffix}"
- dbCur.execute(f"CREATE TABLE {nodesTbl} (name TEXT PRIMARY KEY, id TEXT UNIQUE, tips INT)")
- dbCur.execute(f"CREATE INDEX {nodesTbl}_idx_nc ON {nodesTbl}(name COLLATE NOCASE)")
- dbCur.execute(f"CREATE TABLE {edgesTbl} (parent TEXT, child TEXT, p_support INT, PRIMARY KEY (parent, child))")
- dbCur.execute(f"CREATE INDEX {edgesTbl}_child_idx ON {edgesTbl}(child)")
- for (name, node) in nodeMap.items():
- dbCur.execute(f"INSERT INTO {nodesTbl} VALUES (?, ?, ?)", (name, node.id, node.tips))
+def addTreeTables(nodeMap: dict[str, Node], dbCur: sqlite3.Cursor, suffix: str):
+ """ Adds a tree to the database, as tables nodes_X and edges_X, where X is the given suffix """
+ nodesTbl = f'nodes_{suffix}'
+ edgesTbl = f'edges_{suffix}'
+ dbCur.execute(f'CREATE TABLE {nodesTbl} (name TEXT PRIMARY KEY, id TEXT UNIQUE, tips INT)')
+ dbCur.execute(f'CREATE INDEX {nodesTbl}_idx_nc ON {nodesTbl}(name COLLATE NOCASE)')
+ dbCur.execute(f'CREATE TABLE {edgesTbl} (parent TEXT, child TEXT, p_support INT, PRIMARY KEY (parent, child))')
+ dbCur.execute(f'CREATE INDEX {edgesTbl}_child_idx ON {edgesTbl}(child)')
+ for name, node in nodeMap.items():
+ dbCur.execute(f'INSERT INTO {nodesTbl} VALUES (?, ?, ?)', (name, node.id, node.tips))
for childName in node.children:
pSupport = 1 if nodeMap[childName].pSupport else 0
- dbCur.execute(f"INSERT INTO {edgesTbl} VALUES (?, ?, ?)", (name, childName, pSupport))
+ dbCur.execute(f'INSERT INTO {edgesTbl} VALUES (?, ?, ?)', (name, childName, pSupport))
-print(f"Finding root node")
-query = "SELECT name FROM nodes LEFT JOIN edges ON nodes.name = edges.child WHERE edges.parent IS NULL LIMIT 1"
+print('Finding root node')
+query = 'SELECT name FROM nodes LEFT JOIN edges ON nodes.name = edges.child WHERE edges.parent IS NULL LIMIT 1'
(rootName,) = dbCur.execute(query).fetchone()
-print(f"Found \"{rootName}\"")
+print(f'Found \'{rootName}\'')
print('=== Getting picked-nodes ===')
-pickedNames = set()
+pickedNames: set[str] = set()
pickedTreeExists = False
-if dbCur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='nodes_p'").fetchone() == None:
- print(f"Reading from {pickedNodesFile}")
+if dbCur.execute('SELECT name FROM sqlite_master WHERE type="table" AND name="nodes_p"').fetchone() is None:
+ print(f'Reading from {pickedNodesFile}')
with open(pickedNodesFile) as file:
for line in file:
name = line.rstrip()
- row = dbCur.execute("SELECT name from nodes WHERE name = ?", (name,)).fetchone()
- if row == None:
- row = dbCur.execute("SELECT name from names WHERE alt_name = ?", (name,)).fetchone()
- if row != None:
+ row = dbCur.execute('SELECT name from nodes WHERE name = ?', (name,)).fetchone()
+ if row is None:
+ row = dbCur.execute('SELECT name from names WHERE alt_name = ?', (name,)).fetchone()
+ if row is not None:
pickedNames.add(row[0])
- if len(pickedNames) == 0:
- raise Exception("ERROR: No picked names found")
+ if not pickedNames:
+ raise Exception('ERROR: No picked names found')
else:
pickedTreeExists = True
- print("Picked-node tree already exists")
+ print('Picked-node tree already exists')
if tree == 'picked':
sys.exit()
- for (name,) in dbCur.execute("SELECT name FROM nodes_p"):
+ for (name,) in dbCur.execute('SELECT name FROM nodes_p'):
pickedNames.add(name)
-print(f"Found {len(pickedNames)} names")
+print(f'Found {len(pickedNames)} names')
-if (tree == 'picked' or tree == None) and not pickedTreeExists:
- print("=== Generating picked-nodes tree ===")
+if (tree == 'picked' or tree is None) and not pickedTreeExists:
+ print('=== Generating picked-nodes tree ===')
genPickedNodeTree(dbCur, pickedNames, rootName)
if tree != 'picked':
- print("=== Finding 'non-low significance' nodes ===")
- nodesWithImgOrPicked = set()
- nodesWithImgDescOrPicked = set()
- print("Finding nodes with descs")
- for (name,) in dbCur.execute("SELECT name FROM wiki_ids"): # Can assume the wiki_id has a desc
+ print('=== Finding \'non-low significance\' nodes ===')
+ nodesWithImgOrPicked: set[str] = set()
+ nodesWithImgDescOrPicked: set[str] = set()
+ print('Finding nodes with descs')
+ for (name,) in dbCur.execute('SELECT name FROM wiki_ids'): # Can assume the wiki_id has a desc
nodesWithImgDescOrPicked.add(name)
- print("Finding nodes with images")
- for (name,) in dbCur.execute("SELECT name FROM node_imgs"):
+ print('Finding nodes with images')
+ for (name,) in dbCur.execute('SELECT name FROM node_imgs'):
nodesWithImgDescOrPicked.add(name)
nodesWithImgOrPicked.add(name)
- print("Adding picked nodes")
+ print('Adding picked nodes')
for name in pickedNames:
nodesWithImgDescOrPicked.add(name)
nodesWithImgOrPicked.add(name)
- if tree == 'images' or tree == None:
- print("=== Generating images-only tree ===")
+ if tree == 'images' or tree is None:
+ print('=== Generating images-only tree ===')
genImagesOnlyTree(dbCur, nodesWithImgOrPicked, pickedNames, rootName)
- if tree == 'trimmed' or tree == None:
- print("=== Generating weakly-trimmed tree ===")
+ if tree == 'trimmed' or tree is None:
+ print('=== Generating weakly-trimmed tree ===')
genWeaklyTrimmedTree(dbCur, nodesWithImgDescOrPicked, nodesWithImgOrPicked, rootName)
-print("Closing database")
+print('Closing database')
dbCon.commit()
dbCon.close()
diff --git a/backend/tolData/reviewImgsToGen.py b/backend/tolData/reviewImgsToGen.py
index dcf18bc..f3791bc 100755
--- a/backend/tolData/reviewImgsToGen.py
+++ b/backend/tolData/reviewImgsToGen.py
@@ -1,6 +1,6 @@
#!/usr/bin/python3
-import sys, re, os, time
+import os, time
import sqlite3
import tkinter as tki
from tkinter import ttk
@@ -20,25 +20,25 @@ have already been made.
""", formatter_class=argparse.RawDescriptionHelpFormatter)
parser.parse_args()
-eolImgDir = "eol/imgs/"
-enwikiImgDir = "enwiki/imgs/"
-dbFile = "data.db"
-outFile = "imgList.txt"
+eolImgDir = 'eol/imgs/'
+enwikiImgDir = 'enwiki/imgs/'
+dbFile = 'data.db'
+outFile = 'imgList.txt'
IMG_DISPLAY_SZ = 400
-PLACEHOLDER_IMG = Image.new("RGB", (IMG_DISPLAY_SZ, IMG_DISPLAY_SZ), (88, 28, 135))
+PLACEHOLDER_IMG = Image.new('RGB', (IMG_DISPLAY_SZ, IMG_DISPLAY_SZ), (88, 28, 135))
onlyReviewPairs = True
-print("Opening database")
+print('Opening database')
dbCon = sqlite3.connect(dbFile)
dbCur = dbCon.cursor()
-nodeToImgs = {} # Maps otol-ids to arrays of image paths
-print("Iterating through images from EOL")
+nodeToImgs: dict[str, list[str]] = {} # Maps otol-ids to arrays of image paths
+print('Iterating through images from EOL')
if os.path.exists(eolImgDir):
for filename in os.listdir(eolImgDir):
# Get associated EOL ID
- eolId, _, _ = filename.partition(" ")
- query = "SELECT nodes.id FROM nodes INNER JOIN eol_ids ON nodes.name = eol_ids.name WHERE eol_ids.id = ?"
+ eolId, _, _ = filename.partition(' ')
+ query = 'SELECT nodes.id FROM nodes INNER JOIN eol_ids ON nodes.name = eol_ids.name WHERE eol_ids.id = ?'
# Get associated node IDs
found = False
for (otolId,) in dbCur.execute(query, (int(eolId),)):
@@ -47,15 +47,15 @@ if os.path.exists(eolImgDir):
nodeToImgs[otolId].append(eolImgDir + filename)
found = True
if not found:
- print(f"WARNING: No node found for {eolImgDir}{filename}")
-print(f"Result: {len(nodeToImgs)} nodes with images")
-print("Iterating through images from Wikipedia")
+ print(f'WARNING: No node found for {eolImgDir}{filename}')
+print(f'Result: {len(nodeToImgs)} nodes with images')
+print('Iterating through images from Wikipedia')
if os.path.exists(enwikiImgDir):
for filename in os.listdir(enwikiImgDir):
# Get associated page ID
- (wikiId, _, _) = filename.partition(".")
+ wikiId, _, _ = filename.partition('.')
# Get associated node IDs
- query = "SELECT nodes.id FROM nodes INNER JOIN wiki_ids ON nodes.name = wiki_ids.name WHERE wiki_ids.id = ?"
+ query = 'SELECT nodes.id FROM nodes INNER JOIN wiki_ids ON nodes.name = wiki_ids.name WHERE wiki_ids.id = ?'
found = False
for (otolId,) in dbCur.execute(query, (int(wikiId),)):
if otolId not in nodeToImgs:
@@ -63,34 +63,34 @@ if os.path.exists(enwikiImgDir):
nodeToImgs[otolId].append(enwikiImgDir + filename)
found = True
if not found:
- print(f"WARNING: No node found for {enwikiImgDir}{filename}")
-print(f"Result: {len(nodeToImgs)} nodes with images")
-print("Filtering out already-made image choices")
+ print(f'WARNING: No node found for {enwikiImgDir}{filename}')
+print(f'Result: {len(nodeToImgs)} nodes with images')
+print('Filtering out already-made image choices')
oldSz = len(nodeToImgs)
if os.path.exists(outFile):
with open(outFile) as file:
for line in file:
line = line.rstrip()
- if " " in line:
- line = line[:line.find(" ")]
+ if ' ' in line:
+ line = line[:line.find(' ')]
del nodeToImgs[line]
-print(f"Filtered out {oldSz - len(nodeToImgs)} entries")
+print(f'Filtered out {oldSz - len(nodeToImgs)} entries')
class ImgReviewer:
- " Provides the GUI for reviewing images "
+ """ Provides the GUI for reviewing images """
def __init__(self, root, nodeToImgs):
self.root = root
- root.title("Image Reviewer")
+ root.title('Image Reviewer')
# Setup main frame
- mainFrame = ttk.Frame(root, padding="5 5 5 5")
+ mainFrame = ttk.Frame(root, padding='5 5 5 5')
mainFrame.grid(column=0, row=0, sticky=(tki.N, tki.W, tki.E, tki.S))
root.columnconfigure(0, weight=1)
root.rowconfigure(0, weight=1)
# Set up images-to-be-reviewed frames
self.eolImg = ImageTk.PhotoImage(PLACEHOLDER_IMG)
self.enwikiImg = ImageTk.PhotoImage(PLACEHOLDER_IMG)
- self.labels = []
+ self.labels: list[ttk.Label] = []
for i in (0, 1):
frame = ttk.Frame(mainFrame, width=IMG_DISPLAY_SZ, height=IMG_DISPLAY_SZ)
frame.grid(column=i, row=0)
@@ -101,10 +101,10 @@ class ImgReviewer:
for child in mainFrame.winfo_children():
child.grid_configure(padx=5, pady=5)
# Add keyboard bindings
- root.bind("<q>", self.quit)
- root.bind("<Key-j>", lambda evt: self.accept(0))
- root.bind("<Key-k>", lambda evt: self.accept(1))
- root.bind("<Key-l>", lambda evt: self.reject())
+ root.bind('<q>', self.quit)
+ root.bind('<Key-j>', lambda evt: self.accept(0))
+ root.bind('<Key-k>', lambda evt: self.accept(1))
+ root.bind('<Key-l>', lambda evt: self.reject())
# Set fields
self.nodeImgsList = list(nodeToImgs.items())
self.listIdx = -1
@@ -116,94 +116,94 @@ class ImgReviewer:
# Initialise images to review
self.getNextImgs()
def getNextImgs(self):
- " Updates display with new images to review, or ends program "
+ """ Updates display with new images to review, or ends program """
# Get next image paths
while True:
self.listIdx += 1
if self.listIdx == len(self.nodeImgsList):
- print("No more images to review. Exiting program.")
+ print('No more images to review. Exiting program.')
self.quit()
return
self.otolId, imgPaths = self.nodeImgsList[self.listIdx]
# Potentially skip user choice
if onlyReviewPairs and len(imgPaths) == 1:
with open(outFile, 'a') as file:
- file.write(f"{self.otolId} {imgPaths[0]}\n")
+ file.write(f'{self.otolId} {imgPaths[0]}\n')
continue
break
# Update displayed images
self.eolImgPath = self.enwikiImgPath = None
imageOpenError = False
for imgPath in imgPaths:
- img = None
+ img: Image
try:
img = Image.open(imgPath)
img = ImageOps.exif_transpose(img)
except PIL.UnidentifiedImageError:
- print(f"UnidentifiedImageError for {imgPath}")
+ print(f'UnidentifiedImageError for {imgPath}')
imageOpenError = True
continue
- if imgPath.startswith("eol/"):
+ if imgPath.startswith('eol/'):
self.eolImgPath = imgPath
self.eolImg = ImageTk.PhotoImage(self.resizeImgForDisplay(img))
- elif imgPath.startswith("enwiki/"):
+ elif imgPath.startswith('enwiki/'):
self.enwikiImgPath = imgPath
self.enwikiImg = ImageTk.PhotoImage(self.resizeImgForDisplay(img))
else:
- print(f"Unexpected image path {imgPath}")
+ print(f'Unexpected image path {imgPath}')
self.quit()
return
# Re-iterate if all image paths invalid
- if self.eolImgPath == None and self.enwikiImgPath == None:
+ if self.eolImgPath is None and self.enwikiImgPath is None:
if imageOpenError:
self.reject()
self.getNextImgs()
return
# Add placeholder images
- if self.eolImgPath == None:
+ if self.eolImgPath is None:
self.eolImg = ImageTk.PhotoImage(self.resizeImgForDisplay(PLACEHOLDER_IMG))
- elif self.enwikiImgPath == None:
+ elif self.enwikiImgPath is None:
self.enwikiImg = ImageTk.PhotoImage(self.resizeImgForDisplay(PLACEHOLDER_IMG))
# Update image-frames
self.labels[0].config(image=self.eolImg)
self.labels[1].config(image=self.enwikiImg)
# Update title
- title = f"Images for otol ID {self.otolId}"
- query = "SELECT names.alt_name FROM" \
- " nodes INNER JOIN names ON nodes.name = names.name" \
- " WHERE nodes.id = ? and pref_alt = 1"
+ title = f'Images for otol ID {self.otolId}'
+ query = 'SELECT names.alt_name FROM' \
+ ' nodes INNER JOIN names ON nodes.name = names.name' \
+ ' WHERE nodes.id = ? and pref_alt = 1'
row = dbCur.execute(query, (self.otolId,)).fetchone()
- if row != None:
- title += f", aka {row[0]}"
- title += f" ({self.listIdx + 1} out of {len(self.nodeImgsList)})"
+ if row is not None:
+ title += f', aka {row[0]}'
+ title += f' ({self.listIdx + 1} out of {len(self.nodeImgsList)})'
self.root.title(title)
def accept(self, imgIdx):
- " React to a user selecting an image "
+ """ React to a user selecting an image """
imgPath = self.eolImgPath if imgIdx == 0 else self.enwikiImgPath
- if imgPath == None:
- print("Invalid selection")
+ if imgPath is None:
+ print('Invalid selection')
return
with open(outFile, 'a') as file:
- file.write(f"{self.otolId} {imgPath}\n")
+ file.write(f'{self.otolId} {imgPath}\n')
self.numReviewed += 1
self.getNextImgs()
def reject(self):
- " React to a user rejecting all images of a set "
+ """"" React to a user rejecting all images of a set """
with open(outFile, 'a') as file:
- file.write(f"{self.otolId}\n")
+ file.write(f'{self.otolId}\n')
self.numReviewed += 1
self.getNextImgs()
def quit(self, e = None):
global dbCon
- print(f"Number reviewed: {self.numReviewed}")
+ print(f'Number reviewed: {self.numReviewed}')
timeElapsed = time.time() - self.startTime
- print(f"Time elapsed: {timeElapsed:.2f} seconds")
+ print(f'Time elapsed: {timeElapsed:.2f} seconds')
if self.numReviewed > 0:
- print(f"Avg time per review: {timeElapsed/self.numReviewed:.2f} seconds")
+ print(f'Avg time per review: {timeElapsed/self.numReviewed:.2f} seconds')
dbCon.close()
self.root.destroy()
def resizeImgForDisplay(self, img):
- " Returns a copy of an image, shrunk to fit it's frame (keeps aspect ratio), and with a background "
+ """ Returns a copy of an image, shrunk to fit it's frame (keeps aspect ratio), and with a background """
if max(img.width, img.height) > IMG_DISPLAY_SZ:
if (img.width > img.height):
newHeight = int(img.height * IMG_DISPLAY_SZ/img.width)
@@ -217,7 +217,7 @@ class ImgReviewer:
int((IMG_DISPLAY_SZ - img.height) / 2)))
return bgImg
# Create GUI and defer control
-print("Starting GUI")
+print('Starting GUI')
root = tki.Tk()
ImgReviewer(root, nodeToImgs)
root.mainloop()
diff --git a/backend/tolData/wikidata/genTaxonSrcData.py b/backend/tolData/wikidata/genTaxonSrcData.py
index bd86172..5d10c71 100755
--- a/backend/tolData/wikidata/genTaxonSrcData.py
+++ b/backend/tolData/wikidata/genTaxonSrcData.py
@@ -6,7 +6,7 @@ import bz2, json, sqlite3
import multiprocessing, indexed_bzip2, pickle, tempfile
import argparse
-parser = argparse.ArgumentParser(description='''
+parser = argparse.ArgumentParser(description="""
Reads a wikidata JSON dump, looking for enwiki taxon items, and associated
IDs from sources like GBIF/etc, and IUCN conservation status. Writes results
into a database.
@@ -27,7 +27,7 @@ Wikidata item item1, and takes up it's own line.
Based on code from https://github.com/OneZoom/OZtree, located in
OZprivate/ServerScripts/TaxonMappingAndPopularity/ (22 Aug 2022).
-''', formatter_class=argparse.RawDescriptionHelpFormatter)
+""", formatter_class=argparse.RawDescriptionHelpFormatter)
args = parser.parse_args()
# On Linux, running on the full dataset caused the processes to hang after processing. This was resolved by:
@@ -54,12 +54,12 @@ IUCN_STATUS_IDS = {
# For filtering lines before parsing JSON
LINE_REGEX = re.compile(('"numeric-id":(?:' + '|'.join([s[1:] for s in TAXON_IDS + TAXON_ALT_IDS]) + ')\D').encode())
-def main():
+def main() -> None:
# Maps to populate
- srcIdToId = defaultdict(dict) # Maps 'eol'/etc to {srcId1: wikidataId1, ...} (IDs are ints)
- idToTitle = {} # Maps wikidata ID to enwiki title
- idToAltId = {} # Maps taxon-item wikidata ID to taxon-alt wikidata ID (eg: 'canis lupus familiaris' and 'dog')
- idToIucnStatus = {} # Maps wikidata ID to iucn-status string ('least concern', etc)
+ srcIdToId: dict[str, dict[int, int]] = defaultdict(dict) # Maps 'eol'/etc to {srcId1: wikidataId1, ...}
+ idToTitle: dict[int, str] = {} # Maps wikidata ID to enwiki title
+ idToAltId: dict[int, int] = {} # Maps taxon-item wikidata ID to taxon-alt ID (eg: 'canis lupus familiaris' -> 'dog')
+ idToIucnStatus: dict[int, str] = {} # Maps wikidata ID to iucn-status string ('least concern', etc)
# Check db
if os.path.exists(DB_FILE):
print('ERROR: Database already exists')
@@ -72,28 +72,27 @@ def main():
print(f'At line {lineNum}')
readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus)
else:
-
if not os.path.exists(OFFSETS_FILE):
print('Creating offsets file') # For indexed access for multiprocessing (creation took about 6.7 hours)
with indexed_bzip2.open(WD_FILE) as file:
with open(OFFSETS_FILE, 'wb') as file2:
pickle.dump(file.block_offsets(), file2)
print('Allocating file into chunks')
- fileSz = None # About 1.4 TB
+ fileSz: int # About 1.4 TB
with indexed_bzip2.open(WD_FILE) as file:
with open(OFFSETS_FILE, 'rb') as file2:
file.set_block_offsets(pickle.load(file2))
fileSz = file.seek(0, io.SEEK_END)
chunkSz = math.floor(fileSz / N_PROCS)
- chunkIdxs = [None] + [chunkSz * i for i in range(1, N_PROCS)] + [fileSz-1]
- # Each adjacent pair specifies a start+end byte for readDumpChunk()
+ chunkIdxs = [-1] + [chunkSz * i for i in range(1, N_PROCS)] + [fileSz-1]
+ # Each adjacent pair specifies a start+end byte index for readDumpChunk()
print(f'- Chunk size: {chunkSz:,}')
print('Starting processes to read dump')
with tempfile.TemporaryDirectory() as tempDirName:
# Using maxtasksperchild=1 to free resources on task completion
with multiprocessing.Pool(processes=N_PROCS, maxtasksperchild=1) as pool:
for outFilename in pool.map(readDumpChunkOneParam,
- [(i, chunkIdxs[i], chunkIdxs[i+1], f'{tempDirName}/{i}.pickle') for i in range(N_PROCS)]):
+ ((i, chunkIdxs[i], chunkIdxs[i+1], f'{tempDirName}/{i}.pickle') for i in range(N_PROCS))):
# Get map data from subprocess output file
with open(outFilename, 'rb') as file:
maps = pickle.load(file)
@@ -132,23 +131,28 @@ def main():
dbCon.commit()
dbCon.close()
-def readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus):
+def readDumpLine(
+ lineBytes: bytes,
+ srcIdToId: dict[str, dict[int, int]],
+ idToTitle: dict[int, str],
+ idToAltId: dict[int, int],
+ idToIucnStatus: dict[int, str]) -> None:
# Check if taxon item
- if LINE_REGEX.search(line) == None:
+ if LINE_REGEX.search(lineBytes) is None:
return
try:
- line = line.decode('utf-8').rstrip().rstrip(',')
+ line = lineBytes.decode('utf-8').rstrip().rstrip(',')
jsonItem = json.loads(line)
except json.JSONDecodeError:
- print(f'Unable to parse Line {lineNum} as JSON')
+ print(f'Unable to parse line {line} as JSON')
return
isTaxon = False
- altTaxa = [] # For a taxon-alt item, holds associated taxon-item IDs
+ altTaxa: list[int] = [] # For a taxon-alt item, holds associated taxon-item IDs
claims = None
try:
claims = jsonItem['claims']
for statement in claims['P31']: # Check for 'instance of' statements
- typeId = statement['mainsnak']['datavalue']['value']['id']
+ typeId: str = statement['mainsnak']['datavalue']['value']['id']
if typeId in TAXON_IDS:
isTaxon = True
break
@@ -161,18 +165,19 @@ def readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus):
if not isTaxon and not altTaxa:
return
# Get wikidata ID and enwiki title
- itemId, itemTitle = None, None
+ itemId: int | None = None
+ itemTitle: str | None = None
try:
itemId = int(jsonItem['id'][1:]) # Skips initial 'Q'
itemTitle = jsonItem['sitelinks']['enwiki']['title']
except KeyError:
# Allow taxon-items without titles (they might get one via a taxon-alt)
- if itemId != None and isTaxon:
+ if itemId is not None and isTaxon:
itemTitle = None
else:
return
# Update maps
- if itemTitle != None:
+ if itemTitle is not None:
idToTitle[itemId] = itemTitle
for altId in altTaxa:
idToAltId[altId] = itemId
@@ -187,19 +192,24 @@ def readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus):
# Check for IUCN status
if 'P141' in claims: # Check for 'iucn conservation status' statement
try:
- iucnStatusId = claims['P141'][0]['mainsnak']['datavalue']['value']['id']
+ iucnStatusId: str = claims['P141'][0]['mainsnak']['datavalue']['value']['id']
idToIucnStatus[itemId] = IUCN_STATUS_IDS[iucnStatusId]
except KeyError:
pass
-def readDumpChunkOneParam(params): # Forwards to readDumpChunk(), for use with pool.map()
+def readDumpChunkOneParam(params: tuple[int, int, int, str]) -> str:
+ """ Forwards to readDumpChunk(), for use with pool.map() """
return readDumpChunk(*params)
# Reads lines in the dump that begin after a start-byte, and not after an end byte
- # If startByte is None, start at the first line
-def readDumpChunk(procId, startByte, endByte, outFilename):
+ # If startByte is -1, start at the first line
+def readDumpChunk(procId: int, startByte: int, endByte: int, outFilename: str) -> str:
# Maps to populate
- maps = [defaultdict(dict), {}, {}, {}]
+ maps: tuple[
+ dict[str, dict[int, int]],
+ dict[int, str],
+ dict[int, int],
+ dict[int, str]] = (defaultdict(dict), {}, {}, {})
# Read dump
with indexed_bzip2.open(WD_FILE) as file:
# Load offsets file
@@ -207,7 +217,7 @@ def readDumpChunk(procId, startByte, endByte, outFilename):
offsets = pickle.load(file2)
file.set_block_offsets(offsets)
# Seek to chunk
- if startByte != None:
+ if startByte != -1:
file.seek(startByte)
file.readline()
else: