diff options
Diffstat (limited to 'backend/tolData/wikidata/genTaxonSrcData.py')
| -rwxr-xr-x | backend/tolData/wikidata/genTaxonSrcData.py | 64 |
1 files changed, 37 insertions, 27 deletions
diff --git a/backend/tolData/wikidata/genTaxonSrcData.py b/backend/tolData/wikidata/genTaxonSrcData.py index bd86172..5d10c71 100755 --- a/backend/tolData/wikidata/genTaxonSrcData.py +++ b/backend/tolData/wikidata/genTaxonSrcData.py @@ -6,7 +6,7 @@ import bz2, json, sqlite3 import multiprocessing, indexed_bzip2, pickle, tempfile import argparse -parser = argparse.ArgumentParser(description=''' +parser = argparse.ArgumentParser(description=""" Reads a wikidata JSON dump, looking for enwiki taxon items, and associated IDs from sources like GBIF/etc, and IUCN conservation status. Writes results into a database. @@ -27,7 +27,7 @@ Wikidata item item1, and takes up it's own line. Based on code from https://github.com/OneZoom/OZtree, located in OZprivate/ServerScripts/TaxonMappingAndPopularity/ (22 Aug 2022). -''', formatter_class=argparse.RawDescriptionHelpFormatter) +""", formatter_class=argparse.RawDescriptionHelpFormatter) args = parser.parse_args() # On Linux, running on the full dataset caused the processes to hang after processing. This was resolved by: @@ -54,12 +54,12 @@ IUCN_STATUS_IDS = { # For filtering lines before parsing JSON LINE_REGEX = re.compile(('"numeric-id":(?:' + '|'.join([s[1:] for s in TAXON_IDS + TAXON_ALT_IDS]) + ')\D').encode()) -def main(): +def main() -> None: # Maps to populate - srcIdToId = defaultdict(dict) # Maps 'eol'/etc to {srcId1: wikidataId1, ...} (IDs are ints) - idToTitle = {} # Maps wikidata ID to enwiki title - idToAltId = {} # Maps taxon-item wikidata ID to taxon-alt wikidata ID (eg: 'canis lupus familiaris' and 'dog') - idToIucnStatus = {} # Maps wikidata ID to iucn-status string ('least concern', etc) + srcIdToId: dict[str, dict[int, int]] = defaultdict(dict) # Maps 'eol'/etc to {srcId1: wikidataId1, ...} + idToTitle: dict[int, str] = {} # Maps wikidata ID to enwiki title + idToAltId: dict[int, int] = {} # Maps taxon-item wikidata ID to taxon-alt ID (eg: 'canis lupus familiaris' -> 'dog') + idToIucnStatus: dict[int, str] = {} # Maps wikidata ID to iucn-status string ('least concern', etc) # Check db if os.path.exists(DB_FILE): print('ERROR: Database already exists') @@ -72,28 +72,27 @@ def main(): print(f'At line {lineNum}') readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus) else: - if not os.path.exists(OFFSETS_FILE): print('Creating offsets file') # For indexed access for multiprocessing (creation took about 6.7 hours) with indexed_bzip2.open(WD_FILE) as file: with open(OFFSETS_FILE, 'wb') as file2: pickle.dump(file.block_offsets(), file2) print('Allocating file into chunks') - fileSz = None # About 1.4 TB + fileSz: int # About 1.4 TB with indexed_bzip2.open(WD_FILE) as file: with open(OFFSETS_FILE, 'rb') as file2: file.set_block_offsets(pickle.load(file2)) fileSz = file.seek(0, io.SEEK_END) chunkSz = math.floor(fileSz / N_PROCS) - chunkIdxs = [None] + [chunkSz * i for i in range(1, N_PROCS)] + [fileSz-1] - # Each adjacent pair specifies a start+end byte for readDumpChunk() + chunkIdxs = [-1] + [chunkSz * i for i in range(1, N_PROCS)] + [fileSz-1] + # Each adjacent pair specifies a start+end byte index for readDumpChunk() print(f'- Chunk size: {chunkSz:,}') print('Starting processes to read dump') with tempfile.TemporaryDirectory() as tempDirName: # Using maxtasksperchild=1 to free resources on task completion with multiprocessing.Pool(processes=N_PROCS, maxtasksperchild=1) as pool: for outFilename in pool.map(readDumpChunkOneParam, - [(i, chunkIdxs[i], chunkIdxs[i+1], f'{tempDirName}/{i}.pickle') for i in range(N_PROCS)]): + ((i, chunkIdxs[i], chunkIdxs[i+1], f'{tempDirName}/{i}.pickle') for i in range(N_PROCS))): # Get map data from subprocess output file with open(outFilename, 'rb') as file: maps = pickle.load(file) @@ -132,23 +131,28 @@ def main(): dbCon.commit() dbCon.close() -def readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus): +def readDumpLine( + lineBytes: bytes, + srcIdToId: dict[str, dict[int, int]], + idToTitle: dict[int, str], + idToAltId: dict[int, int], + idToIucnStatus: dict[int, str]) -> None: # Check if taxon item - if LINE_REGEX.search(line) == None: + if LINE_REGEX.search(lineBytes) is None: return try: - line = line.decode('utf-8').rstrip().rstrip(',') + line = lineBytes.decode('utf-8').rstrip().rstrip(',') jsonItem = json.loads(line) except json.JSONDecodeError: - print(f'Unable to parse Line {lineNum} as JSON') + print(f'Unable to parse line {line} as JSON') return isTaxon = False - altTaxa = [] # For a taxon-alt item, holds associated taxon-item IDs + altTaxa: list[int] = [] # For a taxon-alt item, holds associated taxon-item IDs claims = None try: claims = jsonItem['claims'] for statement in claims['P31']: # Check for 'instance of' statements - typeId = statement['mainsnak']['datavalue']['value']['id'] + typeId: str = statement['mainsnak']['datavalue']['value']['id'] if typeId in TAXON_IDS: isTaxon = True break @@ -161,18 +165,19 @@ def readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus): if not isTaxon and not altTaxa: return # Get wikidata ID and enwiki title - itemId, itemTitle = None, None + itemId: int | None = None + itemTitle: str | None = None try: itemId = int(jsonItem['id'][1:]) # Skips initial 'Q' itemTitle = jsonItem['sitelinks']['enwiki']['title'] except KeyError: # Allow taxon-items without titles (they might get one via a taxon-alt) - if itemId != None and isTaxon: + if itemId is not None and isTaxon: itemTitle = None else: return # Update maps - if itemTitle != None: + if itemTitle is not None: idToTitle[itemId] = itemTitle for altId in altTaxa: idToAltId[altId] = itemId @@ -187,19 +192,24 @@ def readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus): # Check for IUCN status if 'P141' in claims: # Check for 'iucn conservation status' statement try: - iucnStatusId = claims['P141'][0]['mainsnak']['datavalue']['value']['id'] + iucnStatusId: str = claims['P141'][0]['mainsnak']['datavalue']['value']['id'] idToIucnStatus[itemId] = IUCN_STATUS_IDS[iucnStatusId] except KeyError: pass -def readDumpChunkOneParam(params): # Forwards to readDumpChunk(), for use with pool.map() +def readDumpChunkOneParam(params: tuple[int, int, int, str]) -> str: + """ Forwards to readDumpChunk(), for use with pool.map() """ return readDumpChunk(*params) # Reads lines in the dump that begin after a start-byte, and not after an end byte - # If startByte is None, start at the first line -def readDumpChunk(procId, startByte, endByte, outFilename): + # If startByte is -1, start at the first line +def readDumpChunk(procId: int, startByte: int, endByte: int, outFilename: str) -> str: # Maps to populate - maps = [defaultdict(dict), {}, {}, {}] + maps: tuple[ + dict[str, dict[int, int]], + dict[int, str], + dict[int, int], + dict[int, str]] = (defaultdict(dict), {}, {}, {}) # Read dump with indexed_bzip2.open(WD_FILE) as file: # Load offsets file @@ -207,7 +217,7 @@ def readDumpChunk(procId, startByte, endByte, outFilename): offsets = pickle.load(file2) file.set_block_offsets(offsets) # Seek to chunk - if startByte != None: + if startByte != -1: file.seek(startByte) file.readline() else: |
