aboutsummaryrefslogtreecommitdiff
path: root/backend/tolData/wikidata/genTaxonSrcData.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/tolData/wikidata/genTaxonSrcData.py')
-rwxr-xr-xbackend/tolData/wikidata/genTaxonSrcData.py64
1 files changed, 37 insertions, 27 deletions
diff --git a/backend/tolData/wikidata/genTaxonSrcData.py b/backend/tolData/wikidata/genTaxonSrcData.py
index bd86172..5d10c71 100755
--- a/backend/tolData/wikidata/genTaxonSrcData.py
+++ b/backend/tolData/wikidata/genTaxonSrcData.py
@@ -6,7 +6,7 @@ import bz2, json, sqlite3
import multiprocessing, indexed_bzip2, pickle, tempfile
import argparse
-parser = argparse.ArgumentParser(description='''
+parser = argparse.ArgumentParser(description="""
Reads a wikidata JSON dump, looking for enwiki taxon items, and associated
IDs from sources like GBIF/etc, and IUCN conservation status. Writes results
into a database.
@@ -27,7 +27,7 @@ Wikidata item item1, and takes up it's own line.
Based on code from https://github.com/OneZoom/OZtree, located in
OZprivate/ServerScripts/TaxonMappingAndPopularity/ (22 Aug 2022).
-''', formatter_class=argparse.RawDescriptionHelpFormatter)
+""", formatter_class=argparse.RawDescriptionHelpFormatter)
args = parser.parse_args()
# On Linux, running on the full dataset caused the processes to hang after processing. This was resolved by:
@@ -54,12 +54,12 @@ IUCN_STATUS_IDS = {
# For filtering lines before parsing JSON
LINE_REGEX = re.compile(('"numeric-id":(?:' + '|'.join([s[1:] for s in TAXON_IDS + TAXON_ALT_IDS]) + ')\D').encode())
-def main():
+def main() -> None:
# Maps to populate
- srcIdToId = defaultdict(dict) # Maps 'eol'/etc to {srcId1: wikidataId1, ...} (IDs are ints)
- idToTitle = {} # Maps wikidata ID to enwiki title
- idToAltId = {} # Maps taxon-item wikidata ID to taxon-alt wikidata ID (eg: 'canis lupus familiaris' and 'dog')
- idToIucnStatus = {} # Maps wikidata ID to iucn-status string ('least concern', etc)
+ srcIdToId: dict[str, dict[int, int]] = defaultdict(dict) # Maps 'eol'/etc to {srcId1: wikidataId1, ...}
+ idToTitle: dict[int, str] = {} # Maps wikidata ID to enwiki title
+ idToAltId: dict[int, int] = {} # Maps taxon-item wikidata ID to taxon-alt ID (eg: 'canis lupus familiaris' -> 'dog')
+ idToIucnStatus: dict[int, str] = {} # Maps wikidata ID to iucn-status string ('least concern', etc)
# Check db
if os.path.exists(DB_FILE):
print('ERROR: Database already exists')
@@ -72,28 +72,27 @@ def main():
print(f'At line {lineNum}')
readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus)
else:
-
if not os.path.exists(OFFSETS_FILE):
print('Creating offsets file') # For indexed access for multiprocessing (creation took about 6.7 hours)
with indexed_bzip2.open(WD_FILE) as file:
with open(OFFSETS_FILE, 'wb') as file2:
pickle.dump(file.block_offsets(), file2)
print('Allocating file into chunks')
- fileSz = None # About 1.4 TB
+ fileSz: int # About 1.4 TB
with indexed_bzip2.open(WD_FILE) as file:
with open(OFFSETS_FILE, 'rb') as file2:
file.set_block_offsets(pickle.load(file2))
fileSz = file.seek(0, io.SEEK_END)
chunkSz = math.floor(fileSz / N_PROCS)
- chunkIdxs = [None] + [chunkSz * i for i in range(1, N_PROCS)] + [fileSz-1]
- # Each adjacent pair specifies a start+end byte for readDumpChunk()
+ chunkIdxs = [-1] + [chunkSz * i for i in range(1, N_PROCS)] + [fileSz-1]
+ # Each adjacent pair specifies a start+end byte index for readDumpChunk()
print(f'- Chunk size: {chunkSz:,}')
print('Starting processes to read dump')
with tempfile.TemporaryDirectory() as tempDirName:
# Using maxtasksperchild=1 to free resources on task completion
with multiprocessing.Pool(processes=N_PROCS, maxtasksperchild=1) as pool:
for outFilename in pool.map(readDumpChunkOneParam,
- [(i, chunkIdxs[i], chunkIdxs[i+1], f'{tempDirName}/{i}.pickle') for i in range(N_PROCS)]):
+ ((i, chunkIdxs[i], chunkIdxs[i+1], f'{tempDirName}/{i}.pickle') for i in range(N_PROCS))):
# Get map data from subprocess output file
with open(outFilename, 'rb') as file:
maps = pickle.load(file)
@@ -132,23 +131,28 @@ def main():
dbCon.commit()
dbCon.close()
-def readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus):
+def readDumpLine(
+ lineBytes: bytes,
+ srcIdToId: dict[str, dict[int, int]],
+ idToTitle: dict[int, str],
+ idToAltId: dict[int, int],
+ idToIucnStatus: dict[int, str]) -> None:
# Check if taxon item
- if LINE_REGEX.search(line) == None:
+ if LINE_REGEX.search(lineBytes) is None:
return
try:
- line = line.decode('utf-8').rstrip().rstrip(',')
+ line = lineBytes.decode('utf-8').rstrip().rstrip(',')
jsonItem = json.loads(line)
except json.JSONDecodeError:
- print(f'Unable to parse Line {lineNum} as JSON')
+ print(f'Unable to parse line {line} as JSON')
return
isTaxon = False
- altTaxa = [] # For a taxon-alt item, holds associated taxon-item IDs
+ altTaxa: list[int] = [] # For a taxon-alt item, holds associated taxon-item IDs
claims = None
try:
claims = jsonItem['claims']
for statement in claims['P31']: # Check for 'instance of' statements
- typeId = statement['mainsnak']['datavalue']['value']['id']
+ typeId: str = statement['mainsnak']['datavalue']['value']['id']
if typeId in TAXON_IDS:
isTaxon = True
break
@@ -161,18 +165,19 @@ def readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus):
if not isTaxon and not altTaxa:
return
# Get wikidata ID and enwiki title
- itemId, itemTitle = None, None
+ itemId: int | None = None
+ itemTitle: str | None = None
try:
itemId = int(jsonItem['id'][1:]) # Skips initial 'Q'
itemTitle = jsonItem['sitelinks']['enwiki']['title']
except KeyError:
# Allow taxon-items without titles (they might get one via a taxon-alt)
- if itemId != None and isTaxon:
+ if itemId is not None and isTaxon:
itemTitle = None
else:
return
# Update maps
- if itemTitle != None:
+ if itemTitle is not None:
idToTitle[itemId] = itemTitle
for altId in altTaxa:
idToAltId[altId] = itemId
@@ -187,19 +192,24 @@ def readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus):
# Check for IUCN status
if 'P141' in claims: # Check for 'iucn conservation status' statement
try:
- iucnStatusId = claims['P141'][0]['mainsnak']['datavalue']['value']['id']
+ iucnStatusId: str = claims['P141'][0]['mainsnak']['datavalue']['value']['id']
idToIucnStatus[itemId] = IUCN_STATUS_IDS[iucnStatusId]
except KeyError:
pass
-def readDumpChunkOneParam(params): # Forwards to readDumpChunk(), for use with pool.map()
+def readDumpChunkOneParam(params: tuple[int, int, int, str]) -> str:
+ """ Forwards to readDumpChunk(), for use with pool.map() """
return readDumpChunk(*params)
# Reads lines in the dump that begin after a start-byte, and not after an end byte
- # If startByte is None, start at the first line
-def readDumpChunk(procId, startByte, endByte, outFilename):
+ # If startByte is -1, start at the first line
+def readDumpChunk(procId: int, startByte: int, endByte: int, outFilename: str) -> str:
# Maps to populate
- maps = [defaultdict(dict), {}, {}, {}]
+ maps: tuple[
+ dict[str, dict[int, int]],
+ dict[int, str],
+ dict[int, int],
+ dict[int, str]] = (defaultdict(dict), {}, {}, {})
# Read dump
with indexed_bzip2.open(WD_FILE) as file:
# Load offsets file
@@ -207,7 +217,7 @@ def readDumpChunk(procId, startByte, endByte, outFilename):
offsets = pickle.load(file2)
file.set_block_offsets(offsets)
# Seek to chunk
- if startByte != None:
+ if startByte != -1:
file.seek(startByte)
file.readline()
else: