aboutsummaryrefslogtreecommitdiff
path: root/backend/hist_data/wikidata/gen_events_data.py
blob: 84dbb5f880d5ad2fa2d5600aaeefcd264c27dc2e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
#!/usr/bin/python3

"""
Reads a wikidata JSON dump, looking for entities usable as historical events.
Writes results into a database.

The JSON dump contains an array of objects, each of which describes a
Wikidata item item1, and takes up it's own line.
- Getting item1's Wikidata ID: item1['id'] (eg: "Q144")
- Checking for a property: item1['claims'][prop1] == array1
- Getting a property statement value: item1['claims'][prop1][idx1]['mainsnak']['datavalue']
	'idx1' indexes an array of statements
"""

# On Linux, running on the full dataset seems to make the processes hang when done. This was resolved by:
# - Using set_start_method('spawn'). Apparently 'fork' can cause unexpected copying of lock/semaphore/etc state.
#   Related: https://bugs.python.org/issue6721
# - Using pool.map() instead of pool.imap_unordered(), which seems to hang in some cases (was using python 3.8).
#   Possibly related: https://github.com/python/cpython/issues/72882

import os, io, re, argparse
import bz2, json, sqlite3
import multiprocessing, indexed_bzip2, pickle, tempfile

WIKIDATA_FILE = 'latest-all.json.bz2'
OFFSETS_FILE = 'offsets.dat'
DB_FILE = 'events.db'
N_PROCS = 6

# For handling Wikidata entity IDs
INSTANCE_OF = 'P31'
EVENT_CTG: dict[str, dict[str, str]] = {
	# Map from event-categories to dicts that map event-indicative entity names to their IDs
		# If the ID starts with 'Q', it expects entities to be an 'instance of' that ID
		# If the ID starts with 'P', it expects entities to have a property with that ID
	'event': {
		'occurrence': 'Q1190554',
		'time interval': 'Q186081',
		'historical period': 'Q11514315',
		'era': 'Q6428674',
		'event': 'Q1656682',
		'recurring event': 'Q15275719',
		'event sequence': 'Q15900616',
		'incident': 'Q18669875',
	},
	'human': {
		'human': 'Q5',
	},
	'country': {
		'country': 'Q6256',
		'state': 'Q7275',
		'sovereign state': 'Q3624078',
	},
	'discovery': {
		'time of discovery or invention': 'P575',
	},
	'media': {
		'work of art': 'Q4502142',
		'literary work': 'Q7725634',
		'comic book series': 'Q14406742',
		'painting': 'Q3305213',
		'musical work/composition': 'Q105543609',
		'film': 'Q11424',
		'animated film': 'Q202866',
		'television series': 'Q16401',
		'anime television series': 'Q63952888',
		'video game': 'Q7889',
		'video game series': 'Q7058673',
	},
}
ID_TO_CTG = {id: ctg for ctg, nmToId in EVENT_CTG.items() for name, id in nmToId.items()}
EVENT_PROP: dict[str, str] = { # Maps event-start/end-indicative property names to their IDs
	'start time': 'P580',
	'end time': 'P582',
	'point in time': 'P585',
	'inception': 'P571',
	'age estimated by a dating method': 'P7584',
	'temporal range start': 'P523',
	'temporal range end': 'P524',
	'earliest date': 'P1319',
	'latest date': 'P1326',
	'date of birth': 'P569',
	'date of death': 'P570',
	'time of discovery or invention': 'P575',
	'publication date': 'P577',
}
PROP_RULES: list[tuple[str] | tuple[str, str] | tuple[str, str, bool]] = [
	# Indicates how event start/end data should be obtained from EVENT_PROP props
		# Each tuple starts with a start-time prop to check for, followed by an optional
		# end-time prop, and an optional 'both props must be present' boolean indicator
	('start time', 'end time'),
	('point in time',),
	('inception',),
	('age estimated by a dating method',),
	('temporal range start', 'temporal range end'),
	('earliest date', 'latest date', True),
	('date of birth', 'date of death'),
	('time of discovery or invention',),
	('publication date',),
]
# For filtering lines before parsing JSON
TYPE_ID_REGEX = re.compile(
	('"id":(?:"' + '"|"'.join([id for id in ID_TO_CTG if id.startswith('Q')]) + '")').encode())
PROP_ID_REGEX = re.compile(
	('(?:"' + '"|"'.join([id for id in ID_TO_CTG if id.startswith('P')]) + '"):\[{"mainsnak"').encode())

def genData(wikidataFile: str, offsetsFile: str, dbFile: str, nProcs: int) -> None:
	""" Reads the dump and writes info to db """
	# Check db
	if os.path.exists(dbFile):
		print('ERROR: Database already exists')
		return
	# Read dump, and write to db
	print('Writing to db')
	dbCon = sqlite3.connect(dbFile)
	dbCon.execute('CREATE TABLE events (' \
		'id INT PRIMARY KEY, title TEXT, start TEXT, end TEXT, time_type TEXT, ctg TEXT)')
	dbCon.commit()
	dbCon.close()
	if nProcs == 1:
		dbCon = sqlite3.connect(dbFile)
		dbCur = dbCon.cursor()
		with bz2.open(wikidataFile, mode='rb') as file:
			for lineNum, line in enumerate(file, 1):
				if lineNum % 1e4 == 0:
					print(f'At line {lineNum}')
				entry = readDumpLine(line)
				if entry:
					dbCur.execute('INSERT INTO events VALUES (?, ?, ?, ?, ?, ?)', entry)
		dbCon.commit()
		dbCon.close()
	else:
		if not os.path.exists(offsetsFile):
			print('Creating offsets file') # For indexed access for multiprocessing (creation took about 6.7 hours)
			with indexed_bzip2.open(wikidataFile) as file:
				with open(offsetsFile, 'wb') as file2:
					pickle.dump(file.block_offsets(), file2)
		print('Allocating file into chunks')
		fileSz: int # About 1.4 TB
		with indexed_bzip2.open(wikidataFile) as file:
			with open(offsetsFile, 'rb') as file2:
				file.set_block_offsets(pickle.load(file2))
				fileSz = file.seek(0, io.SEEK_END)
		chunkSz = fileSz // nProcs
		chunkIdxs = [-1] + [chunkSz * i for i in range(1, nProcs)] + [fileSz-1]
			# Each adjacent pair specifies a start+end byte index for readDumpChunk()
		print(f'- Chunk size: {chunkSz:,}')
		print('Starting processes to read dump')
		dbCon = sqlite3.connect(dbFile)
		dbCur = dbCon.cursor()
		with tempfile.TemporaryDirectory() as tempDirName:
			with multiprocessing.Pool(processes=nProcs, maxtasksperchild=1) as pool:
				# Used maxtasksperchild=1 to free resources on task completion
				for outFile in pool.map(readDumpChunkOneParam,
					((i, wikidataFile, offsetsFile, os.path.join(tempDirName, f'{i}.pickle'),
						chunkIdxs[i], chunkIdxs[i+1]) for i in range(nProcs))):
					# Add entries from subprocess output file
					with open(outFile, 'rb') as file:
						for e in pickle.load(file):
							dbCur.execute('INSERT INTO events VALUES (?, ?, ?, ?, ?, ?)', e)
		dbCon.commit()
		dbCon.close()
def readDumpLine(lineBytes: bytes) -> tuple[int, str, str, str, str, str] | None:
	# Check with regex
	if TYPE_ID_REGEX.search(lineBytes) is None and PROP_ID_REGEX.search(lineBytes) is None:
		return None
	# Decode
	try:
		line = lineBytes.decode('utf-8').rstrip().rstrip(',')
		jsonItem = json.loads(line)
	except json.JSONDecodeError:
		print(f'Unable to parse line {line} as JSON')
		return None
	if 'claims' not in jsonItem:
		return None
	claims = jsonItem['claims']
	# Get event category
	eventCtg: str | None = None
	if INSTANCE_OF not in claims:
		return None
	for statement in claims[INSTANCE_OF]:
		try:
			itemType = statement['mainsnak']['datavalue']['value']['id']
		except KeyError:
			return None
		if itemType in ID_TO_CTG:
			eventCtg = ID_TO_CTG[itemType]
			break
	if not eventCtg:
		for prop in claims:
			if prop in ID_TO_CTG:
				eventCtg = ID_TO_CTG[prop]
		if not eventCtg:
			return None
	# Check for event props
	start: str
	end: str | None
	timeType: str
	found = False
	for props in PROP_RULES:
		startProp: str = EVENT_PROP[props[0]]
		endProp = None if len(props) < 2 else EVENT_PROP[props[1]] # type: ignore
		needBoth = False if len(props) < 3 else props[2] # type: ignore
		if startProp not in claims:
			continue
		try:
			start = json.dumps(claims[startProp][0]['mainsnak']['datavalue'], separators=(',', ':'))
			end = None
			if endProp and endProp in claims:
				end = json.dumps(claims[endProp][0]['mainsnak']['datavalue'], separators=(',', ':'))
			if needBoth and end == None:
				continue
		except (KeyError, ValueError):
			continue
		timeType = props[0]
		found = True
		break
	if not found:
		return None
	# Get wikidata ID, enwiki title
	try:
		itemId = int(jsonItem['id'][1:]) # Skip initial 'Q'
		itemTitle: str = jsonItem['sitelinks']['enwiki']['title']
	except (KeyError, ValueError):
		return None
	# Return result
	return (itemId, itemTitle, start, end, timeType, eventCtg) # type: ignore
def readDumpChunkOneParam(params: tuple[int, str, str, str, int, int]) -> str:
	""" Forwards to readDumpChunk() (for use with pool.map()) """
	return readDumpChunk(*params)
def readDumpChunk(procId: int, wikidataFile: str, offsetsFile: str, outFile: str, startByte: int, endByte: int) -> str:
	""" Reads lines in the dump that begin after a start-byte, and not after an end byte.
		If startByte is -1, start at the first line. """
	# Read dump
	entries = []
	with indexed_bzip2.open(wikidataFile) as file:
		# Load offsets file
		with open(offsetsFile, 'rb') as file2:
			offsets = pickle.load(file2)
			file.set_block_offsets(offsets)
		# Seek to chunk
		if startByte != -1:
			file.seek(startByte)
			file.readline()
		else:
			startByte = 0 # Used for progress calculation
		# Read lines
		count = 0
		while file.tell() <= endByte:
			count += 1
			if count % 1e4 == 0:
				perc = (file.tell() - startByte) / (endByte - startByte) * 100
				print(f'Thread {procId}: {perc:.2f}%')
			entry = readDumpLine(file.readline())
			if entry:
				entries.append(entry)
		# Output results into file
		with open(outFile, 'wb') as file:
			pickle.dump(entries, file)
		return outFile

if __name__ == '__main__':
	parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
	args = parser.parse_args()
	#
	multiprocessing.set_start_method('spawn')
	genData(WIKIDATA_FILE, OFFSETS_FILE, DB_FILE, N_PROCS)