-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrun-merge.py
282 lines (254 loc) · 9.04 KB
/
run-merge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import os
import sys
import json
from dotenv import load_dotenv
from pipeline.config import Config
from pipeline.process.reidentifier import Reidentifier
from pipeline.process.merger import MergeHandler
from pipeline.process.reference_manager import ReferenceManager
from pipeline.storage.cache.postgres import poolman
import datetime
import io
import cProfile
import pstats
from pstats import SortKey
load_dotenv()
basepath = os.getenv("LUX_BASEPATH", "")
cfgs = Config(basepath=basepath)
idmap = cfgs.get_idmap()
cfgs.cache_globals()
cfgs.instantiate_all()
with open(os.path.join(cfgs.data_dir, "idmap_update_token.txt")) as fh:
token = fh.read()
token = token.strip()
if not token.startswith("__") or not token.endswith("__"):
print("Idmap Update Token is badly formed, should be 8 character date with leading/trailing __")
raise ValueError("update token")
sys.exit(0)
else:
idmap.update_token = token
if "--profile" in sys.argv:
sys.argv.remove("--profile")
profiling = True
else:
profiling = False
if "--norefs" in sys.argv:
sys.argv.remove("--norefs")
DO_REFERENCES = False
else:
DO_REFERENCES = True
NAME = None
max_slice = -1
my_slice = -1
recids = []
if "--all" in sys.argv:
to_do = list(cfgs.internal.items())
elif "--onlyrefs" in sys.argv:
to_do = []
else:
to_do = []
for src, scfg in cfgs.internal.items():
if f"--{src}" in sys.argv:
to_do.append((src, scfg))
for src, scfg in cfgs.external.items():
if f"--{src}" in sys.argv:
to_do.append((src, scfg))
while "--recid" in sys.argv:
idx = sys.argv.index("--recid")
recid = sys.argv[idx + 1]
recids.append(recid)
sys.argv.pop(idx)
sys.argv.pop(idx)
if len(sys.argv) > 2 and sys.argv[1].isnumeric() and sys.argv[2].isnumeric():
my_slice = int(sys.argv[1])
max_slice = int(sys.argv[2])
if len(to_do) == 1:
NAME = to_do[0][0]
MAX_DISTANCE = cfgs.max_distance
order = sorted([(x["namespace"], x.get("merge_order", -1)) for x in cfgs.external.values()], key=lambda x: x[1])
PREF_ORDER = [x[0] for x in order if x[1] >= 0]
reider = Reidentifier(cfgs, idmap)
ref_mgr = ReferenceManager(cfgs, idmap)
merger = MergeHandler(cfgs, idmap, ref_mgr)
merged_cache = cfgs.results["merged"]["recordcache"]
merged_cache.config["overwrite"] = True
final = cfgs.results["merged"]["mapper"]
# if merged is not empty, then only want to write to it if the record
# hasn't already been written this build
# OTOH, if merged starts off empty, it must have been this build
start_time = datetime.datetime.now()
# merge only reads, so enable AAT memory cache
idmap.enable_memory_cache()
# -------------------------------------------------
if profiling:
pr = cProfile.Profile()
pr.enable()
for src_name, src in to_do:
rcache = src["recordcache"]
if not recids:
if my_slice > -1:
print(f"*** {src['name']}: slice {my_slice} ***")
recids = rcache.iter_keys_slice(my_slice, max_slice)
else:
print(f"*** {src['name']} ***")
recids = rcache.iter_keys()
for recid in recids:
distance = 0
rec = rcache[recid]
if not rec:
print(f"Couldn't find {src['name']} / {recid}")
continue
recuri = f"{src['namespace']}{recid}"
qrecid = cfgs.make_qua(recuri, rec["data"]["type"])
yuid = idmap[qrecid]
if not yuid:
print(f" !!! Couldn't find YUID for internal record: {qrecid}")
continue
yuid = yuid.rsplit("/", 1)[1]
ins_time = merged_cache.metadata(yuid, "insert_time")
if ins_time is not None and ins_time["insert_time"] > start_time:
# Already processed this record this build
continue
else:
# Always reidentify in case the UUIDs have changed during
# subsequent reconcile phases
rec2 = reider.reidentify(rec)
src["recordcache2"][rec2["yuid"]] = rec2["data"]
if NAME is not None and ins_time is not None:
# We're in merged previously
curr_name = merged_cache.metadata(yuid, "change")["change"]
if curr_name.lower() in ["create", "update"]:
curr_name = ""
else:
curr_name = ""
equivs = idmap[rec2["data"]["id"]]
if equivs:
if qrecid in equivs:
equivs.remove(qrecid)
if recuri in equivs:
equivs.remove(recuri)
if idmap.update_token in equivs:
equivs.remove(idmap.update_token)
else:
equivs = []
sys.stdout.write(".")
sys.stdout.flush()
rec3 = merger.merge(rec2, equivs)
# Final tidy up after merges
try:
rec3 = final.transform(rec3, rec3["data"]["type"])
except:
print(f"*** Final transform raised exception for {rec2['identifier']}")
raise
# Store it
if rec3 is not None:
try:
del rec3["identifier"]
except:
pass
merged_cache[rec3["yuid"]] = rec3
if NAME:
if curr_name:
if NAME in curr_name:
new_name = None
else:
new_name = f"{curr_name}|{NAME}"
else:
new_name = NAME
if new_name:
merged_cache.set_metadata(yuid, "change", new_name)
else:
print(f"*** Final transform returned None")
recids = []
if profiling:
pr.disable()
s = io.StringIO()
sortby = SortKey.CUMULATIVE
# sortby = SortKey.TIME
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())
raise ValueError()
if DO_REFERENCES:
item = 1
for dist, ext_uri in ref_mgr.iter_done_refs(my_slice, max_slice):
uri = idmap[ext_uri]
if not uri:
print(f" *** No YUID for reference {ext_uri} from done_refs")
continue
yuid = uri.rsplit("/", 1)[-1]
ins_time = merged_cache.metadata(yuid, "insert_time")
if ins_time is not None and ins_time["insert_time"] > start_time:
# Already processed this record this build
continue
if NAME is not None and ins_time is not None:
# We're in merged previously case
curr_name = merged_cache.metadata(yuid, "change")["change"]
if curr_name in ["create", "update"]:
curr_name = ""
else:
# No record in merged
curr_name = ""
equivs = idmap[uri]
if not equivs:
print(f"FAILED TO BUILD: {uri}")
continue
# get a base record
rec2 = None
stop = False
for pref in PREF_ORDER:
for eq in equivs:
if pref in eq:
baseUri = eq
(src, recid) = cfgs.split_uri(baseUri)
if recid in src["recordcache"]:
rec = src["recordcache"][recid]
if rec is not None:
rec2 = reider.reidentify(rec)
if rec2:
equivs.remove(baseUri)
del rec2["identifier"]
src["recordcache2"][rec2["yuid"]] = rec2
stop = True
break
else:
print(f" *** Could not reidentify {src['name']} {recid}")
if stop:
break
if rec2 is None:
print(f" *** Could not find ANY record for {uri} in {equivs}")
# raise ValueError()
else:
# print(f" ... Processing equivs for {recid}")
sys.stdout.write("+")
sys.stdout.flush()
rec3 = merger.merge(rec2, equivs)
# Final tidy up
try:
rec3 = final.transform(rec3, rec3["data"]["type"])
except:
print(f"*** Final transform raised exception for {rec2['identifier']}")
# Store it
if rec3 is not None:
try:
del rec3["identifier"]
except:
pass
merged_cache[rec3["yuid"]] = rec3
if NAME:
if curr_name:
if NAME in curr_name:
# Already added don't make name|name
new_name = None
else:
new_name = f"{curr_name}|{NAME}"
else:
new_name = NAME
if new_name:
merged_cache.set_metadata(yuid, "change", new_name)
else:
print(f"*** Final transform returned None")
# force all postgres connections to close
poolman.put_all("localsocket")
with open(os.path.join(cfgs.log_dir, "flags", f"merge_is_done-{my_slice}.txt"), "w") as fh:
fh.write("1\n")