1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
|
#!/usr/bin/python3
import sys, os, re, math, io
from collections import defaultdict
import bz2, json, sqlite3
import multiprocessing, indexed_bzip2, pickle, tempfile
import argparse
parser = argparse.ArgumentParser(description='''
Reads a wikidata JSON dump, looking for enwiki taxon items, and associated
IDs from sources like GBIF/etc, and IUCN conservation status. Writes results
into a database.
The JSON dump contains an array of objects, each of which describes a
Wikidata item item1, and takes up it's own line.
- Getting item1's Wikidata ID: item1['id'] (eg: "Q144")
- Checking if item1 is a taxon: item1['claims']['P31'][idx1]['mainsnak']['datavalue']['value']['numeric-id'] == id1
'idx1' indexes an array of statements
'id1' is a Wikidata ID denoting a taxon item type (eg: 310890 means 'monotypic taxon')
- Checking if item1 is a taxon-alt: item1['claims']['P31'][idx1]['mainsnak']['datavalue']['value']['numeric-id'] == id1
'id1' denotes a common-name-alternative item type (eg: 55983715 means 'organisms known by a particular common name')
Getting the ID of the item that item1 is an alternative for:
item1['claims']['P31'][idx1]['qualifiers']['P642'][idx2]['datavalue']['value']['numeric-id']
- Checking for an EOL/NCBI/etc ID: item['claims'][prop1][idx1]['mainsnak']['datavalue']['value'] (eg: "328672")
'prop1' denotes a 'has ID from source *' property (eg: 'P830' means 'has EOL ID')
- Checking for an IUCN status: item['claims']['P141'][idx1]['mainsnak']['datavalue']['value']['id'] (eg: "Q219127")
Based on code from https://github.com/OneZoom/OZtree, located in
OZprivate/ServerScripts/TaxonMappingAndPopularity/ (22 Aug 2022).
''', formatter_class=argparse.RawDescriptionHelpFormatter)
args = parser.parse_args()
# On Linux, running on the full dataset caused the processes to hang after processing. This was resolved by:
# - Storing subprocess results in temp files. Apparently passing large objects through pipes can cause deadlock.
# - Using set_start_method('spawn'). Apparently 'fork' can cause unexpected copying of lock/semaphore/etc state.
# Related: https://bugs.python.org/issue6721
# - Using pool.map() instead of pool.imap_unordered(), which seems to hang in some cases (was using python 3.8).
# Possibly related: https://github.com/python/cpython/issues/72882
WD_FILE = 'latest-all.json.bz2'
OFFSETS_FILE = 'offsets.dat'
DB_FILE = 'taxonSrcs.db'
N_PROCS = 6 # Took about 3 hours (probably would've taken 6-12 with N_PROCS=1)
# Wikidata entity IDs
TAXON_IDS = ['Q16521', 'Q310890', 'Q23038290', 'Q713623'] # 'taxon', 'monotypic taxon', 'fossil taxon', 'clade'
TAXON_ALT_IDS = ['Q55983715', 'Q502895'] # 'organisms known by a particular common name', 'common name'
SRC_PROP_IDS = {'P830': 'eol', 'P685': 'ncbi', 'P1391': 'if', 'P850': 'worms', 'P5055': 'irmng', 'P846': 'gbif'}
IUCN_STATUS_IDS = {
'Q211005': 'least concern', 'Q719675': 'near threatened', 'Q278113': 'vulnerable',
'Q11394': 'endangered', 'Q219127': 'critically endangered', 'Q239509': 'extinct in the wild',
'Q237350': 'extinct species', 'Q3245245': 'data deficient'
}
# For filtering lines before parsing JSON
LINE_REGEX = re.compile(('"numeric-id":(?:' + '|'.join([s[1:] for s in TAXON_IDS + TAXON_ALT_IDS]) + ')\D').encode())
def main():
# Maps to populate
srcIdToId = defaultdict(dict) # Maps 'eol'/etc to {srcId1: wikidataId1, ...} (IDs are ints)
idToTitle = {} # Maps wikidata ID to enwiki title
idToAltId = {} # Maps taxon-item wikidata ID to taxon-alt wikidata ID (eg: 'canis lupus familiaris' and 'dog')
idToIucnStatus = {} # Maps wikidata ID to iucn-status string ('least concern', etc)
# Check db
if os.path.exists(DB_FILE):
print('ERROR: Database already exists')
sys.exit(1)
# Read dump
if N_PROCS == 1:
with bz2.open(WD_FILE, mode='rb') as file:
for lineNum, line in enumerate(file, 1):
if lineNum % 1e4 == 0:
print(f'At line {lineNum}')
readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus)
else:
if not os.path.exists(OFFSETS_FILE):
print('Creating offsets file') # For indexed access for multiprocessing (creation took about 6.7 hours)
with indexed_bzip2.open(WD_FILE) as file:
with open(OFFSETS_FILE, 'wb') as file2:
pickle.dump(file.block_offsets(), file2)
print('Allocating file into chunks')
fileSz = None # About 1.4 TB
with indexed_bzip2.open(WD_FILE) as file:
with open(OFFSETS_FILE, 'rb') as file2:
file.set_block_offsets(pickle.load(file2))
fileSz = file.seek(0, io.SEEK_END)
chunkSz = math.floor(fileSz / N_PROCS)
chunkIdxs = [None] + [chunkSz * i for i in range(1, N_PROCS)] + [fileSz-1]
# Each adjacent pair specifies a start+end byte for readDumpChunk()
print(f'- Chunk size: {chunkSz:,}')
print('Starting processes to read dump')
with tempfile.TemporaryDirectory() as tempDirName:
# Using maxtasksperchild=1 to free resources on task completion
with multiprocessing.Pool(processes=N_PROCS, maxtasksperchild=1) as pool:
for outFilename in pool.map(readDumpChunkOneParam,
[(i, chunkIdxs[i], chunkIdxs[i+1], f'{tempDirName}/{i}.pickle') for i in range(N_PROCS)]):
# Get map data from subprocess output file
with open(outFilename, 'rb') as file:
maps = pickle.load(file)
# Add to maps
for src, idMap in maps[0].items():
srcIdToId[src].update(idMap)
idToTitle.update(maps[1])
idToAltId.update(maps[2])
idToIucnStatus.update(maps[3])
#
print('Writing to db')
dbCon = sqlite3.connect(DB_FILE)
dbCur = dbCon.cursor()
dbCur.execute('CREATE TABLE src_id_to_title (src TEXT, id INT, title TEXT, PRIMARY KEY(src, id))')
for src, submap in srcIdToId.items():
for srcId, wId in submap.items():
if wId not in idToTitle: # Check for a title, possibly via an alt-taxon
if wId in idToAltId:
wId = idToAltId[wId]
else:
continue
dbCur.execute('INSERT INTO src_id_to_title VALUES (?, ?, ?)', (src, srcId, idToTitle[wId]))
dbCur.execute('CREATE TABLE title_iucn (title TEXT PRIMARY KEY, status TEXT)')
for wId, status in idToIucnStatus.items():
if wId not in idToTitle: # Check for a title, possibly via an alt-taxon
if wId in idToAltId and idToAltId[wId] not in idToIucnStatus:
wId = idToAltId[wId]
else:
continue
dbCur.execute('INSERT OR IGNORE INTO title_iucn VALUES (?, ?)', (idToTitle[wId], status))
# The 'OR IGNORE' allows for multiple taxons using the same alt
#dbCur.execute('CREATE TABLE id_to_alt_title (id TEXT PRIMARY KEY, title TEXT, alt TEXT)')
#for wId, altId in idToAltId.items():
# dbCur.execute('INSERT INTO id_to_alt_title VALUES (?, ?, ?)',
# (wId, idToTitle[wId] if wId in idToTitle else None, idToTitle[altId]))
dbCon.commit()
dbCon.close()
def readDumpLine(line, srcIdToId, idToTitle, idToAltId, idToIucnStatus):
# Check if taxon item
if LINE_REGEX.search(line) == None:
return
try:
line = line.decode('utf-8').rstrip().rstrip(',')
jsonItem = json.loads(line)
except json.JSONDecodeError:
print(f'Unable to parse Line {lineNum} as JSON')
return
isTaxon = False
altTaxa = [] # For a taxon-alt item, holds associated taxon-item IDs
claims = None
try:
claims = jsonItem['claims']
for statement in claims['P31']: # Check for 'instance of' statements
typeId = statement['mainsnak']['datavalue']['value']['id']
if typeId in TAXON_IDS:
isTaxon = True
break
elif typeId in TAXON_ALT_IDS:
snaks = statement['qualifiers']['P642'] # Check for 'of' qualifiers
altTaxa.extend([int(s['datavalue']['value']['numeric-id']) for s in snaks])
break
except (KeyError, ValueError):
return
if not isTaxon and not altTaxa:
return
# Get wikidata ID and enwiki title
itemId, itemTitle = None, None
try:
itemId = int(jsonItem['id'][1:]) # Skips initial 'Q'
itemTitle = jsonItem['sitelinks']['enwiki']['title']
except KeyError:
# Allow taxon-items without titles (they might get one via a taxon-alt)
if itemId != None and isTaxon:
itemTitle = None
else:
return
# Update maps
if itemTitle != None:
idToTitle[itemId] = itemTitle
for altId in altTaxa:
idToAltId[altId] = itemId
# Check for source IDs
for srcPropId, src in SRC_PROP_IDS.items():
if srcPropId in claims:
try:
srcId = int(claims[srcPropId][0]['mainsnak']['datavalue']['value'])
srcIdToId[src][srcId] = itemId
except (KeyError, ValueError):
continue
# Check for IUCN status
if 'P141' in claims: # Check for 'iucn conservation status' statement
try:
iucnStatusId = claims['P141'][0]['mainsnak']['datavalue']['value']['id']
idToIucnStatus[itemId] = IUCN_STATUS_IDS[iucnStatusId]
except KeyError:
pass
def readDumpChunkOneParam(params): # Forwards to readDumpChunk(), for use with pool.map()
return readDumpChunk(*params)
# Reads lines in the dump that begin after a start-byte, and not after an end byte
# If startByte is None, start at the first line
def readDumpChunk(procId, startByte, endByte, outFilename):
# Maps to populate
maps = [defaultdict(dict), {}, {}, {}]
# Read dump
with indexed_bzip2.open(WD_FILE) as file:
# Load offsets file
with open(OFFSETS_FILE, 'rb') as file2:
offsets = pickle.load(file2)
file.set_block_offsets(offsets)
# Seek to chunk
if startByte != None:
file.seek(startByte)
file.readline()
else:
startByte = 0 # Used for progress calculation
# Read lines
count = 0
while file.tell() <= endByte:
count += 1
if count % 1e4 == 0:
perc = (file.tell() - startByte) / (endByte - startByte) * 100
print(f'Thread {procId}: {perc:.2f}%')
readDumpLine(file.readline(), *maps)
# Output results into file
with open(outFilename, 'wb') as file:
pickle.dump(maps, file)
return outFilename
if __name__ == '__main__': # Guard needed for multiprocessing
multiprocessing.set_start_method('spawn')
main()
|