diff options
Diffstat (limited to 'backend/tol_data/wikidata')
| -rw-r--r-- | backend/tol_data/wikidata/README.md | 18 | ||||
| -rw-r--r-- | backend/tol_data/wikidata/__init__.py | 0 | ||||
| -rwxr-xr-x | backend/tol_data/wikidata/gen_taxon_src_data.py | 239 |
3 files changed, 257 insertions, 0 deletions
diff --git a/backend/tol_data/wikidata/README.md b/backend/tol_data/wikidata/README.md new file mode 100644 index 0000000..7b3105e --- /dev/null +++ b/backend/tol_data/wikidata/README.md @@ -0,0 +1,18 @@ +This directory holds files obtained via [Wikidata](https://www.wikidata.org/). + +# Downloaded Files +- `latest-all.json.bz2` <br> + Obtained from <https://dumps.wikimedia.org/wikidatawiki/entities/> (on 23/08/22). + Format info can be found at <https://doc.wikimedia.org/Wikibase/master/php/md_docs_topics_json.html>. + +# Other Files +- `gen_taxon_src_data.py` <br> + Used to generate a database holding taxon information from the dump. +- `offsets.dat` <br> + Holds bzip2 block offsets for the dump. Generated and used by + genTaxonSrcData.py for parallel processing of the dump. +- `taxon_srcs.db` <br> + Generated by `gen_taxon_src_data.py`. <br> + Tables: <br> + - `src_id_to_title`: `src TEXT, id INT, title TEXT, PRIMARY KEY(src, id)` + - `title_iucn`: `title TEXT PRIMARY KEY, status TEXT` diff --git a/backend/tol_data/wikidata/__init__.py b/backend/tol_data/wikidata/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/backend/tol_data/wikidata/__init__.py diff --git a/backend/tol_data/wikidata/gen_taxon_src_data.py b/backend/tol_data/wikidata/gen_taxon_src_data.py new file mode 100755 index 0000000..50ed917 --- /dev/null +++ b/backend/tol_data/wikidata/gen_taxon_src_data.py @@ -0,0 +1,239 @@ +#!/usr/bin/python3 + +""" +Reads a wikidata JSON dump, looking for enwiki taxon items, and associated +IDs from sources like GBIF/etc, and IUCN conservation status. Writes results +into a database. + +The JSON dump contains an array of objects, each of which describes a +Wikidata item item1, and takes up it's own line. +- Getting item1's Wikidata ID: item1['id'] (eg: "Q144") +- Checking if item1 is a taxon: item1['claims']['P31'][idx1]['mainsnak']['datavalue']['value']['id'] == id1 + 'idx1' indexes an array of statements + 'id1' is a Wikidata ID denoting a taxon item type (eg: Q310890 means 'monotypic taxon') +- Checking if item1 is a taxon-alt: item1['claims']['P31'][idx1]['mainsnak']['datavalue']['value']['id'] == id1 + 'id1' denotes a common-name-alternative item type (eg: Q55983715 means 'organisms known by a particular common name') + Getting the ID of the item that item1 is an alternative for: + item1['claims']['P31'][idx1]['qualifiers']['P642'][idx2]['datavalue']['value']['numeric-id'] +- Checking for an EOL/NCBI/etc ID: item['claims'][prop1][idx1]['mainsnak']['datavalue']['value'] (eg: "328672") + 'prop1' denotes a 'has ID from source *' property (eg: 'P830' means 'has EOL ID') +- Checking for an IUCN status: item['claims']['P141'][idx1]['mainsnak']['datavalue']['value']['id'] (eg: "Q219127") + +Based on code from https://github.com/OneZoom/OZtree, located in +OZprivate/ServerScripts/TaxonMappingAndPopularity/ (22 Aug 2022). +""" + +# On Linux, running on the full dataset caused the processes to hang after processing. This was resolved by: +# - Storing subprocess results in temp files. Apparently passing large objects through pipes can cause deadlock. +# - Using set_start_method('spawn'). Apparently 'fork' can cause unexpected copying of lock/semaphore/etc state. +# Related: https://bugs.python.org/issue6721 +# - Using pool.map() instead of pool.imap_unordered(), which seems to hang in some cases (was using python 3.8). +# Possibly related: https://github.com/python/cpython/issues/72882 + +import sys, os, re, math, io +from collections import defaultdict +import bz2, json, sqlite3 +import multiprocessing, indexed_bzip2, pickle, tempfile + +WIKIDATA_FILE = 'latest-all.json.bz2' +OFFSETS_FILE = 'offsets.dat' +DB_FILE = 'taxon_srcs.db' +N_PROCS = 6 # Took about 3 hours with N_PROCS=6 + +# Wikidata entity IDs +TAXON_IDS = ['Q16521', 'Q310890', 'Q23038290', 'Q713623'] # 'taxon', 'monotypic taxon', 'fossil taxon', 'clade' +TAXON_ALT_IDS = ['Q55983715', 'Q502895'] # 'organisms known by a particular common name', 'common name' +SRC_PROP_IDS = {'P830': 'eol', 'P685': 'ncbi', 'P1391': 'if', 'P850': 'worms', 'P5055': 'irmng', 'P846': 'gbif'} +IUCN_STATUS_IDS = { + 'Q211005': 'least concern', 'Q719675': 'near threatened', 'Q278113': 'vulnerable', + 'Q11394': 'endangered', 'Q219127': 'critically endangered', 'Q239509': 'extinct in the wild', + 'Q237350': 'extinct species', 'Q3245245': 'data deficient' +} +# For filtering lines before parsing JSON +LINE_REGEX = re.compile(('"id":(?:"' + '"|"'.join([s for s in TAXON_IDS + TAXON_ALT_IDS]) + '")\D').encode()) + +def genData(wikidataFile: str, offsetsFile: str, dbFile: str, nProcs: int) -> None: + """ Reads the dump and writes source/iucn info to db """ + # Maps to populate + srcIdToId: dict[str, dict[int, int]] = defaultdict(dict) # Maps 'eol'/etc to {srcId1: wikidataId1, ...} + idToTitle: dict[int, str] = {} # Maps wikidata ID to enwiki title + idToAltId: dict[int, int] = {} # Maps taxon-item wikidata ID to taxon-alt ID (eg: 'canis lupus familiaris' -> 'dog') + idToIucnStatus: dict[int, str] = {} # Maps wikidata ID to iucn-status string ('least concern', etc) + # Check db + if os.path.exists(dbFile): + print('ERROR: Database already exists') + sys.exit(1) + # Read dump + if nProcs == 1: + with bz2.open(wikidataFile, mode='rb') as file: + for lineNum, line in enumerate(file, 1): + if lineNum % 1e4 == 0: + print(f'At line {lineNum}') + readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus) + else: + if not os.path.exists(offsetsFile): + print('Creating offsets file') # For indexed access for multiprocessing (creation took about 6.7 hours) + with indexed_bzip2.open(wikidataFile) as file: + with open(offsetsFile, 'wb') as file2: + pickle.dump(file.block_offsets(), file2) + print('Allocating file into chunks') + fileSz: int # About 1.4 TB + with indexed_bzip2.open(wikidataFile) as file: + with open(offsetsFile, 'rb') as file2: + file.set_block_offsets(pickle.load(file2)) + fileSz = file.seek(0, io.SEEK_END) + chunkSz = math.floor(fileSz / nProcs) + chunkIdxs = [-1] + [chunkSz * i for i in range(1, nProcs)] + [fileSz-1] + # Each adjacent pair specifies a start+end byte index for readDumpChunk() + print(f'- Chunk size: {chunkSz:,}') + print('Starting processes to read dump') + with tempfile.TemporaryDirectory() as tempDirName: + # Using maxtasksperchild=1 to free resources on task completion + with multiprocessing.Pool(processes=nProcs, maxtasksperchild=1) as pool: + for outFilename in pool.map( + readDumpChunkOneParam, + ((i, wikidataFile, offsetsFile, chunkIdxs[i], chunkIdxs[i+1], + os.path.join(tempDirName, f'{i}.pickle')) for i in range(nProcs))): + # Get map data from subprocess output file + with open(outFilename, 'rb') as file: + maps = pickle.load(file) + # Add to maps + for src, idMap in maps[0].items(): + srcIdToId[src].update(idMap) + idToTitle.update(maps[1]) + idToAltId.update(maps[2]) + idToIucnStatus.update(maps[3]) + # + print('Writing to db') + dbCon = sqlite3.connect(dbFile) + dbCur = dbCon.cursor() + dbCur.execute('CREATE TABLE src_id_to_title (src TEXT, id INT, title TEXT, PRIMARY KEY(src, id))') + for src, submap in srcIdToId.items(): + for srcId, wId in submap.items(): + if wId not in idToTitle: # Check for a title, possibly via an alt-taxon + if wId in idToAltId: + wId = idToAltId[wId] + else: + continue + dbCur.execute('INSERT INTO src_id_to_title VALUES (?, ?, ?)', (src, srcId, idToTitle[wId])) + dbCur.execute('CREATE TABLE title_iucn (title TEXT PRIMARY KEY, status TEXT)') + for wId, status in idToIucnStatus.items(): + if wId not in idToTitle: # Check for a title, possibly via an alt-taxon + if wId in idToAltId and idToAltId[wId] not in idToIucnStatus: + wId = idToAltId[wId] + else: + continue + dbCur.execute('INSERT OR IGNORE INTO title_iucn VALUES (?, ?)', (idToTitle[wId], status)) + # The 'OR IGNORE' allows for multiple taxons using the same alt + dbCon.commit() + dbCon.close() +def readDumpLine( + lineBytes: bytes, + srcIdToId: dict[str, dict[int, int]], + idToTitle: dict[int, str], + idToAltId: dict[int, int], + idToIucnStatus: dict[int, str]) -> None: + # Check if taxon item + if LINE_REGEX.search(lineBytes) is None: + return + try: + line = lineBytes.decode('utf-8').rstrip().rstrip(',') + jsonItem = json.loads(line) + except json.JSONDecodeError: + print(f'Unable to parse line {line} as JSON') + return + isTaxon = False + altTaxa: list[int] = [] # For a taxon-alt item, holds associated taxon-item IDs + claims = None + try: + claims = jsonItem['claims'] + for statement in claims['P31']: # Check for 'instance of' statements + typeId: str = statement['mainsnak']['datavalue']['value']['id'] + if typeId in TAXON_IDS: + isTaxon = True + break + elif typeId in TAXON_ALT_IDS: + snaks = statement['qualifiers']['P642'] # Check for 'of' qualifiers + altTaxa.extend([int(s['datavalue']['value']['numeric-id']) for s in snaks]) + break + except (KeyError, ValueError): + return + if not isTaxon and not altTaxa: + return + # Get wikidata ID and enwiki title + itemId: int | None = None + itemTitle: str | None = None + try: + itemId = int(jsonItem['id'][1:]) # Skips initial 'Q' + itemTitle = jsonItem['sitelinks']['enwiki']['title'] + except KeyError: + # Allow taxon-items without titles (they might get one via a taxon-alt) + if itemId is not None and isTaxon: + itemTitle = None + else: + return + # Update maps + if itemTitle is not None: + idToTitle[itemId] = itemTitle + for altId in altTaxa: + idToAltId[altId] = itemId + # Check for source IDs + for srcPropId, src in SRC_PROP_IDS.items(): + if srcPropId in claims: + try: + srcId = int(claims[srcPropId][0]['mainsnak']['datavalue']['value']) + srcIdToId[src][srcId] = itemId + except (KeyError, ValueError): + continue + # Check for IUCN status + if 'P141' in claims: # Check for 'iucn conservation status' statement + try: + iucnStatusId: str = claims['P141'][0]['mainsnak']['datavalue']['value']['id'] + idToIucnStatus[itemId] = IUCN_STATUS_IDS[iucnStatusId] + except KeyError: + pass +def readDumpChunkOneParam(params: tuple[int, str, str, int, int, str]) -> str: + """ Forwards to readDumpChunk(), for use with pool.map() """ + return readDumpChunk(*params) +def readDumpChunk( + procId: int, wikidataFile: str, offsetsFile: str, startByte: int, endByte: int, outFilename: str) -> str: + """ Reads lines in the dump that begin after a start-byte, and not after an end byte. + If startByte is -1, start at the first line. """ + # Maps to populate + maps: tuple[ + dict[str, dict[int, int]], + dict[int, str], + dict[int, int], + dict[int, str]] = (defaultdict(dict), {}, {}, {}) + # Read dump + with indexed_bzip2.open(wikidataFile) as file: + # Load offsets file + with open(offsetsFile, 'rb') as file2: + offsets = pickle.load(file2) + file.set_block_offsets(offsets) + # Seek to chunk + if startByte != -1: + file.seek(startByte) + file.readline() + else: + startByte = 0 # Used for progress calculation + # Read lines + count = 0 + while file.tell() <= endByte: + count += 1 + if count % 1e4 == 0: + perc = (file.tell() - startByte) / (endByte - startByte) * 100 + print(f'Thread {procId}: {perc:.2f}%') + readDumpLine(file.readline(), *maps) + # Output results into file + with open(outFilename, 'wb') as file: + pickle.dump(maps, file) + return outFilename + +if __name__ == '__main__': # Guard needed for multiprocessing + import argparse + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + args = parser.parse_args() + # + multiprocessing.set_start_method('spawn') + genData(WIKIDATA_FILE, OFFSETS_FILE, DB_FILE, N_PROCS) |
