diff options
Diffstat (limited to 'backend/tol_data/wikidata/gen_taxon_src_data.py')
| -rwxr-xr-x | backend/tol_data/wikidata/gen_taxon_src_data.py | 42 |
1 files changed, 36 insertions, 6 deletions
diff --git a/backend/tol_data/wikidata/gen_taxon_src_data.py b/backend/tol_data/wikidata/gen_taxon_src_data.py index 1bddb6e..d2a3811 100755 --- a/backend/tol_data/wikidata/gen_taxon_src_data.py +++ b/backend/tol_data/wikidata/gen_taxon_src_data.py @@ -30,10 +30,21 @@ OZprivate/ServerScripts/TaxonMappingAndPopularity/ (22 Aug 2022). # - Using pool.map() instead of pool.imap_unordered(), which seems to hang in some cases (was using python 3.8). # Possibly related: https://github.com/python/cpython/issues/72882 -import sys, os, re, math, io +import argparse +import sys +import os +import re +import math +import io from collections import defaultdict -import bz2, json, sqlite3 -import multiprocessing, indexed_bzip2, pickle, tempfile +import bz2 +import json +import sqlite3 + +import multiprocessing +import indexed_bzip2 +import pickle +import tempfile WIKIDATA_FILE = 'latest-all.json.bz2' OFFSETS_FILE = 'offsets.dat' @@ -49,9 +60,12 @@ IUCN_STATUS_IDS = { 'Q11394': 'endangered', 'Q219127': 'critically endangered', 'Q239509': 'extinct in the wild', 'Q237350': 'extinct species', 'Q3245245': 'data deficient' } + # For filtering lines before parsing JSON LINE_REGEX = re.compile(('"id":(?:"' + '"|"'.join([s for s in TAXON_IDS + TAXON_ALT_IDS]) + '")').encode()) +# ========== For data generation ========== + def genData(wikidataFile: str, offsetsFile: str, dbFile: str, nProcs: int) -> None: """ Reads the dump and writes source/iucn info to db """ # Maps to populate @@ -59,10 +73,12 @@ def genData(wikidataFile: str, offsetsFile: str, dbFile: str, nProcs: int) -> No idToTitle: dict[int, str] = {} # Maps wikidata ID to enwiki title idToAltId: dict[int, int] = {} # Maps taxon-item wikidata ID to taxon-alt ID (eg: 'canis lupus familiaris' -> 'dog') idToIucnStatus: dict[int, str] = {} # Maps wikidata ID to iucn-status string ('least concern', etc) + # Check db if os.path.exists(dbFile): print('ERROR: Database already exists') sys.exit(1) + # Read dump if nProcs == 1: with bz2.open(wikidataFile, mode='rb') as file: @@ -76,6 +92,7 @@ def genData(wikidataFile: str, offsetsFile: str, dbFile: str, nProcs: int) -> No with indexed_bzip2.open(wikidataFile) as file: with open(offsetsFile, 'wb') as file2: pickle.dump(file.block_offsets(), file2) + print('Allocating file into chunks') fileSz: int # About 1.4 TB with indexed_bzip2.open(wikidataFile) as file: @@ -86,6 +103,7 @@ def genData(wikidataFile: str, offsetsFile: str, dbFile: str, nProcs: int) -> No chunkIdxs = [-1] + [chunkSz * i for i in range(1, nProcs)] + [fileSz-1] # Each adjacent pair specifies a start+end byte index for readDumpChunk() print(f'- Chunk size: {chunkSz:,}') + print('Starting processes to read dump') with tempfile.TemporaryDirectory() as tempDirName: # Using maxtasksperchild=1 to free resources on task completion @@ -103,7 +121,7 @@ def genData(wikidataFile: str, offsetsFile: str, dbFile: str, nProcs: int) -> No idToTitle.update(maps[1]) idToAltId.update(maps[2]) idToIucnStatus.update(maps[3]) - # + print('Writing to db') dbCon = sqlite3.connect(dbFile) dbCur = dbCon.cursor() @@ -127,6 +145,7 @@ def genData(wikidataFile: str, offsetsFile: str, dbFile: str, nProcs: int) -> No # The 'OR IGNORE' allows for multiple taxons using the same alt dbCon.commit() dbCon.close() + def readDumpLine( lineBytes: bytes, srcIdToId: dict[str, dict[int, int]], @@ -160,6 +179,7 @@ def readDumpLine( return if not isTaxon and not altTaxa: return + # Get wikidata ID and enwiki title itemId: int | None = None itemTitle: str | None = None @@ -172,11 +192,13 @@ def readDumpLine( itemTitle = None else: return + # Update maps if itemTitle is not None: idToTitle[itemId] = itemTitle for altId in altTaxa: idToAltId[altId] = itemId + # Check for source IDs for srcPropId, src in SRC_PROP_IDS.items(): if srcPropId in claims: @@ -185,6 +207,7 @@ def readDumpLine( srcIdToId[src][srcId] = itemId except (KeyError, ValueError): continue + # Check for IUCN status if 'P141' in claims: # Check for 'iucn conservation status' statement try: @@ -192,9 +215,11 @@ def readDumpLine( idToIucnStatus[itemId] = IUCN_STATUS_IDS[iucnStatusId] except KeyError: pass + def readDumpChunkOneParam(params: tuple[int, str, str, int, int, str]) -> str: """ Forwards to readDumpChunk(), for use with pool.map() """ return readDumpChunk(*params) + def readDumpChunk( procId: int, wikidataFile: str, offsetsFile: str, startByte: int, endByte: int, outFilename: str) -> str: """ Reads lines in the dump that begin after a start-byte, and not after an end byte. @@ -205,18 +230,21 @@ def readDumpChunk( dict[int, str], dict[int, int], dict[int, str]] = (defaultdict(dict), {}, {}, {}) + # Read dump with indexed_bzip2.open(wikidataFile) as file: # Load offsets file with open(offsetsFile, 'rb') as file2: offsets = pickle.load(file2) file.set_block_offsets(offsets) + # Seek to chunk if startByte != -1: file.seek(startByte) file.readline() else: startByte = 0 # Used for progress calculation + # Read lines count = 0 while file.tell() <= endByte: @@ -225,15 +253,17 @@ def readDumpChunk( perc = (file.tell() - startByte) / (endByte - startByte) * 100 print(f'Thread {procId}: {perc:.2f}%') readDumpLine(file.readline(), *maps) + # Output results into file with open(outFilename, 'wb') as file: pickle.dump(maps, file) return outFilename +# ========== Main block ========== + if __name__ == '__main__': # Guard needed for multiprocessing - import argparse parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) args = parser.parse_args() - # + multiprocessing.set_start_method('spawn') genData(WIKIDATA_FILE, OFFSETS_FILE, DB_FILE, N_PROCS) |
