-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrouped_physics_object.py
629 lines (474 loc) · 23.6 KB
/
grouped_physics_object.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
"""
A Grouped Physics Object manages the data through a dataframe that is populated based on a join table
and the data table. It is a base class so child classes should populate self data during init
"""
__author__ = "William DeShazer"
__version__ = "0.1.0"
__license__ = "MIT"
from warnings import warn
from typing import NewType, List, NamedTuple, Optional
from pandas import DataFrame, Series, read_sql
from psycopg2.sql import SQL, Identifier, Placeholder, Composed
from psycopg2 import OperationalError
from psycopg2.extras import NamedTupleCursor
from db_utils import my_connect
from latex_data import LatexData
from time_logging import TimeLogger
class RecordIDTypeError(UserWarning):
"""UserWarning for EquationGroup"""
class ImageWithoutTemplateIDError(UserWarning):
"""UserWarning for EquationGroup"""
class NoRecordIDError(UserWarning):
"""UserWarning for EquationGroup"""
class NoGroupRecordAssociationsError(UserWarning):
"""UserWarning for EquationGroup"""
Record = NewType("Record", NamedTuple)
Records = NewType("Records", List[Record])
def generic_pull_grouped_data(table_name: str = None, parent_table_name: str = None,
verbose: bool = False, my_conn: Optional[dict] = None,
t_log: Optional[TimeLogger] = None) -> DataFrame:
"""Multi-index Extract DataFrame DB"""
table_id_name: str = table_name + '_id'
parent_id_name: str = parent_table_name + '_id'
join_table: str = table_name + '_' + parent_table_name
if verbose is True and t_log is None:
t_log = TimeLogger()
my_conn = my_connect(my_conn=my_conn, t_log=t_log, verbose=verbose)
conn = my_conn['conn']
sql = 'SELECT * FROM {join_table} RIGHT JOIN {table} USING({table_id})'
query = SQL(sql).format(
table=Identifier(table_name),
join_table=Identifier(join_table),
table_id=Identifier(table_id_name)
)
if verbose is True:
t_log.new_event('Loading Database: ' + table_name)
data_df = read_sql(query, con=conn, index_col=[parent_id_name, table_id_name])
# This was a good example of loading objects to file
# data_df['latex'] = data_df['latex'].apply(loads)
data_df['latex_obj'] = None
for row in data_df.itertuples():
data_df.loc[row.Index, 'latex_obj'] = \
LatexData(my_conn=my_conn, latex=row.latex, template_id=row.template_id,
image=row.image, compiled_at=row.compiled_at)
data_df.sort_values([parent_id_name, 'insertion_order', 'created_at'], inplace=True)
if verbose is True:
t_log.new_event('Database Loaded: ' + table_name)
return data_df
def generic_associate_parent(parent_id: int = None, child_id: int = None,
table_name: str = None, parent_table_name: str = None,
insertion_order: int = None, inserted_by: str = None,
new_record: dict = None, verbose: bool = False,
my_conn: Optional[dict] = None, t_log: Optional[TimeLogger] = None):
"""Associate the parent and child tables using parent id. Insertion_order and inserted_by are optional"""
parent_key = parent_table_name + '_id'
self_key = table_name + '_id'
join_table = table_name + '_' + parent_table_name
if verbose is True and t_log is None:
t_log = TimeLogger()
my_conn = my_connect(my_conn=my_conn, t_log=t_log, verbose=verbose)
conn = my_conn['conn']
db_params = my_conn['db_params']
if new_record is None:
new_record = {}
if inserted_by is None:
inserted_by = db_params['user']
new_record.update(
{parent_key: parent_id, self_key: child_id},
insertion_order=insertion_order,
inserted_by=inserted_by
)
sql = 'INSERT INTO {table} ({fields}) VALUES ({values})'
keys = new_record.keys()
query = SQL(sql).format(table=Identifier(join_table),
fields=SQL(', ').join(map(Identifier, keys)),
values=SQL(', ').join(map(Placeholder, keys)))
cur = conn.cursor(cursor_factory=NamedTupleCursor)
if verbose is True:
t_log.new_event('Associating Tables: ' + join_table)
try:
cur.execute(query, new_record)
except OperationalError as error:
print(error)
conn.commit()
cur.close()
data_df = \
generic_pull_grouped_data(table_name=table_name, parent_table_name=parent_table_name,
my_conn=my_conn, t_log=t_log, verbose=verbose)
if verbose is True:
t_log.new_event('Finished Associating: ' + join_table)
return data_df
def generic_disassociate_parent(parent_id: int = None, child_id: int = None,
table_name: str = None, parent_table_name: str = None,
my_conn: Optional[dict] = None, t_log: Optional[TimeLogger] = None,
verbose: bool = False):
"""Associate the parent and child tables using parent id. Insertion_order and inserted_by are optional"""
parent_key = parent_table_name + '_id'
self_key = table_name + '_id'
join_table = table_name + '_' + parent_table_name
if verbose is True and t_log is None:
t_log = TimeLogger()
my_conn = my_connect(my_conn=my_conn, t_log=t_log, verbose=verbose)
conn = my_conn['conn']
sql = 'DELETE FROM {table} WHERE ({self_id}, {parent_id}) = (%s, %s)'
query = SQL(sql).format(table=Identifier(join_table),
self_id=Identifier(self_key),
parent_id=Identifier(parent_key))
cur = conn.cursor(cursor_factory=NamedTupleCursor)
if verbose is True:
print(query.as_string(conn))
print(cur.mogrify(query, (child_id, parent_id)))
t_log.new_event('Disassociating Tables: ' + join_table)
try:
cur.execute(query, (child_id, parent_id))
except OperationalError as error:
print(error)
conn.commit()
data_df = \
generic_pull_grouped_data(table_name=table_name, parent_table_name=parent_table_name,
my_conn=my_conn, t_log=t_log, verbose=verbose)
if verbose is True:
t_log.new_event('Finished disassociating: ' + join_table)
return data_df
def generic_last_equation_number(table_data: DataFrame) -> int:
"""Record Count using dataframe"""
if len(table_data.index) == 0:
rcount = 1 # Starts at 1, because Genesis & Revelation are Equation and Variable 1, respectively
else:
numbers: Series = table_data.loc[slice(None), 'name'].str.extract(r'^[a-zA-Z]+\s([0-9]+)$')
rcount_s: Series = numbers.fillna(0).astype(int).max()
rcount = int(rcount_s.iloc[0])
return rcount
def generic_new_record_db(parent_id: int = None, table_name: str = None, parent_table_name: str = None,
data_df: Optional[DataFrame] = None, name: str = None, new_record=None,
latex: LatexData = None, notes: str = None,
dimensions: int = 1, insertion_order: int = None, created_by: str = None,
unit_id: int = 1, verbose: bool = None,
my_conn: Optional[dict] = None, t_log: Optional[TimeLogger] = None
) -> DataFrame:
"""Insert New Record Into math_object"""
if verbose is True and t_log is None:
t_log = TimeLogger()
if new_record is None:
new_record = {}
my_conn = my_connect(my_conn=my_conn, t_log=t_log, verbose=verbose)
conn = my_conn['conn']
db_params = my_conn['db_params']
table_id = table_name + '_id'
next_id: int = generic_last_equation_number(data_df) + 1
if name is None:
name = "{aTable} {ID:d}".format(ID=next_id, aTable=table_name)
if created_by is None:
created_by = db_params['user']
if latex is None:
latex = LatexData()
new_record.update(name=name, notes=notes, dimensions=dimensions, unit_id=unit_id, type_name='Unassigned',
latex=latex.latex, image=latex.image, compiled_at=latex.compiled_at,
template_id=latex.template_id, created_by=created_by)
query = SQL('INSERT INTO {table} ({fields}) VALUES ({values}) RETURNING *'
).format(table=Identifier(table_name),
fields=SQL(', ').join(map(Identifier, new_record.keys())),
values=SQL(', ').join(map(Placeholder, new_record.keys())))
if verbose:
print(query.as_string(conn))
cur = conn.cursor(cursor_factory=NamedTupleCursor)
if verbose:
print('Adding new record to Table: {aTable}'.format(aTable=table_name))
try:
cur.execute(query, new_record)
except OperationalError as error:
print(error)
new_records = cur.fetchall()
conn.commit()
cur.close()
updated_df: Optional[DataFrame] = None
if parent_id is not None:
for record in new_records:
updated_df = generic_associate_parent(my_conn=my_conn, t_log=t_log,
parent_id=parent_id, child_id=getattr(record, table_id),
insertion_order=insertion_order,
table_name=table_name, parent_table_name=parent_table_name,
inserted_by=created_by, verbose=verbose
)
else:
updated_df = \
generic_pull_grouped_data(table_name=table_name, parent_table_name=parent_table_name,
my_conn=my_conn, t_log=t_log, verbose=verbose)
return updated_df
def add_field(key: str = None, value=None):
"""Convenience Wrapper to Beautify If value is none"""
data = {}
if value is not None:
data = {key: value}
return data
class GroupedPhysicsObject:
"""Base class for Equations, Variables, and Units"""
def __init__(self, table_name: str, parent_table_name: str,
my_conn: Optional[dict] = None, t_log: Optional[TimeLogger] = None,
verbose: bool = False,):
"""Constructor for MathObject"""
self.table_name = table_name
self.parent_table_name = parent_table_name # For equations it is eqn_group. For variables it is equations
self.grouped_data: Optional[DataFrame] = None
self.all_records: Optional[DataFrame] = None
self.selected_parent_id: Optional[int] = None
self.selected_data_records: Optional[Records] = None
self.records_not_selected_unique: Optional[Records] = None
self.my_conn = my_connect(my_conn=my_conn, t_log=t_log, verbose=verbose)
self.pull_grouped_data(my_conn=my_conn, t_log=t_log, verbose=verbose)
def id_name(self):
"""Convenience method to return id_name"""
return self.table_name + '_id'
def parent_table_id_name(self):
"""Convenenience method to return parent_table_id_name"""
return self.parent_table_name + '_id'
def pull_grouped_data(self, my_conn: Optional[dict] = None, t_log: Optional[TimeLogger] = None,
verbose: bool = False):
"""Extract grouped data from database"""
if my_conn is None:
my_conn = self.my_conn
else:
self.my_conn = my_conn
self.grouped_data = \
generic_pull_grouped_data(table_name=self.table_name, parent_table_name=self.parent_table_name,
my_conn=my_conn, t_log=t_log, verbose=verbose)
self._set_all_records()
def associate_parent(self, parent_id: int = None, child_id: int = None, new_record: dict = None,
insertion_order: int = None, inserted_by: str = None,
my_conn: Optional[dict] = None, t_log: Optional[TimeLogger] = None, verbose: bool = False):
"""Associate a record with a group"""
table_name = self.table_name
parent_table_name = self.parent_table_name
if my_conn is None:
my_conn = self.my_conn
else:
self.my_conn = my_conn
self.grouped_data = \
generic_associate_parent(parent_id=parent_id, child_id=child_id, new_record=new_record,
table_name=table_name, parent_table_name=parent_table_name,
insertion_order=insertion_order, inserted_by=inserted_by,
my_conn=my_conn, t_log=t_log, verbose=verbose)
if self.selected_parent_id is not None:
self.set_records_for_parent(parent_id=int(self.selected_parent_id))
def disassociate_parent(self, parent_id: int = None, child_id: int = None,
my_conn: Optional[dict] = None, t_log: Optional[TimeLogger] = None,
verbose: bool = False):
"""Associate a record with a group"""
table_name = self.table_name
parent_table_name = self.parent_table_name
if my_conn is None:
my_conn = self.my_conn
else:
self.my_conn = my_conn
self.grouped_data = \
generic_disassociate_parent(parent_id=parent_id, child_id=child_id,
table_name=table_name, parent_table_name=parent_table_name,
my_conn=my_conn, t_log=t_log, verbose=verbose)
if self.selected_parent_id is not None:
self.set_records_for_parent(parent_id=int(self.selected_parent_id))
def new_record(self, parent_id: int = None, name: str = None, latex: LatexData = None,
new_record: dict = None, notes: str = None, dimensions: int = 1,
insertion_order: int = None, created_by: str = None,
my_conn: Optional[dict] = None, t_log: Optional[TimeLogger] = None,
unit_id: int = 1, verbose: bool = None):
"""Insert a new_record Into """
table_name = self.table_name
parent_table_name = self.parent_table_name
if my_conn is None:
my_conn = self.my_conn
else:
self.my_conn = my_conn
self.grouped_data = \
generic_new_record_db(
parent_id=parent_id, table_name=table_name, parent_table_name=parent_table_name,
name=name, latex=latex, notes=notes, new_record=new_record,
data_df=self.grouped_data, dimensions=dimensions, unit_id=unit_id,
insertion_order=insertion_order, created_by=created_by,
my_conn=my_conn, t_log=t_log, verbose=verbose
)
def _set_all_records(self):
self.all_records = self.grouped_data.drop_duplicates('name').droplevel(self.parent_table_id_name()).sort_index()
def update(self, an_id: id = None, where_key: str = None, name: str = None,
data=None, latex: LatexData = None, notes: str = None, unit_id: int = None,
dimensions: int = None, modified_by: str = None, created_by: str = None,
my_conn: Optional[dict] = None, t_log: Optional[TimeLogger] = None, verbose: bool = None):
"""Insert New Record Into grouped_physics_object"""
if where_key is None:
where_key = self.id_name()
if an_id is None:
warn("No Record ID Specified", NoRecordIDError)
else:
if data is None:
data = {}
if my_conn is None:
my_conn = self.my_conn
else:
self.my_conn = my_conn
my_conn = my_connect(my_conn=my_conn, t_log=t_log, verbose=verbose)
conn = my_conn['conn']
db_params = my_conn['db_params']
data.update(add_field('name', name))
data.update(add_field('notes', notes))
if latex is not None:
data.update(latex=latex.latex)
data.update(image=latex.image)
data.update(template_id=latex.template_id)
data.update(compiled_at=latex.compiled_at)
data.update(add_field('dimensions', dimensions))
data.update(add_field('unit_id', unit_id))
data.update(add_field('created_by', created_by))
# If there is no data, then skip. Of course one could still change modified by:
if len(data) > 0 or modified_by is not None:
# Always require a modified by and because one can change data without specifying a modifer,
# this is necessary. We don't check it before the previous if, because we don't want to create
# a modified_by if not data was set and no modified_by was set.
if modified_by is None:
modified_by = db_params['user']
data.update(modified_by=modified_by)
fields = data.keys()
sql = "UPDATE {table} SET {fields} WHERE {pkey} = {a_value}"
query = SQL(sql).format(
table=Identifier(self.table_name),
fields=SQL(', ').join(
Composed([Identifier(k), SQL(' = '), Placeholder(k)]) for k in fields
),
pkey=Identifier(where_key),
a_value=Placeholder('where_key')
)
data.update(where_key=an_id)
cur = conn.cursor(cursor_factory=NamedTupleCursor)
if verbose:
print(query.as_string(conn))
print(cur.mogrify(query, data))
try:
cur.execute(query, data)
except OperationalError as error:
print(error)
conn.commit()
cur.close()
self.pull_grouped_data(verbose=verbose)
def selected_data_df(self, parent_id: int = None) -> DataFrame:
"""Retern selected data in DataFrame form"""
if parent_id is None:
parent_id = self.selected_parent_id
else:
self.selected_parent_id = int(parent_id)
if parent_id is None:
warn('No Parent ID set', NoRecordIDError)
try:
df = self.grouped_data.loc[parent_id, :]
except KeyError:
df: Optional[DataFrame] = None
return df
def set_records_for_parent(self, parent_id: int = None):
"""Sets records for parents after an update"""
if parent_id is None:
parent_id = self.selected_parent_id
else:
self.selected_parent_id = int(parent_id)
if parent_id is None:
warn('No Parent ID set', NoRecordIDError)
selected_data_df = self.selected_data_df(parent_id)
if selected_data_df is None:
self.selected_data_records = None
else:
self.selected_data_records = Records(list(selected_data_df.itertuples()))
self._set_records_not_in_parent(parent_id=parent_id)
def data_not_selected_full_df(self, parent_id: int = None):
"""Method to return data not in selected set as DataFrame"""
if self.selected_data_records is None:
rcd_nums_in = []
else:
rcd_nums_in = self.selected_data_df(parent_id=parent_id).index
rcds_not_in = self.grouped_data.query(self.id_name() + '!=' + str(tuple(rcd_nums_in)))
return rcds_not_in
def data_not_selected_unique_df(self, parent_id: int = None):
"""Method to return unique data not in selected set as DataFrame"""
rcds_not_in = self.data_not_selected_full_df(parent_id=parent_id)
uniq = rcds_not_in.drop_duplicates('name').droplevel(self.parent_table_id_name()).sort_index()
return uniq
def data_not_selected_unique_rcds(self, parent_id: int = None):
"""Method to return unique data not in selected set as Records"""
uniq = self.data_not_selected_unique_df(parent_id=parent_id)
return Records(list(uniq.itertuples()))
def _set_records_not_in_parent(self, parent_id: int = None):
"""Store Unique Records to file"""
self.records_not_selected_unique = self.data_not_selected_unique_rcds(parent_id=parent_id)
def other_parents(self, child_id: int = None, my_conn: Optional[dict] = None,
t_log: Optional[TimeLogger] = None, verbose: bool = None):
"""Pulls list of other parents"""
gd = self.grouped_data
gd_inds = gd.index.dropna()
if len(gd_inds) > 0:
parent_df = gd.loc[(slice(None), child_id), :].droplevel(self.id_name())
pids = tuple(parent_df.index.to_list())
if verbose is True and t_log is None:
t_log = TimeLogger()
if my_conn is None:
my_conn = self.my_conn
else:
self.my_conn = my_conn
my_conn = my_connect(my_conn=my_conn, t_log=t_log, verbose=verbose)
conn = my_conn['conn']
sql = "SELECT * FROM {parent_table} WHERE {parent_id} IN %s;"
query = SQL(sql).format(
parent_table=Identifier(self.parent_table_name),
parent_id=Identifier(self.parent_table_id_name())
)
if verbose is True:
t_log.new_event('Loading Database: ' + self.parent_table_name)
cur = conn.cursor(cursor_factory=NamedTupleCursor)
cur.execute(query, (pids, ))
records = cur.fetchall()
if verbose is True:
t_log.new_event('Database Loaded: ' + self.parent_table_name)
else:
if verbose is True:
t_log.new_event('No Other Parents')
records = []
return records
def latest_record(self):
"""Returns latest record"""
return list(self.grouped_data.sort_values('created_at').tail(1).itertuples())
def update_insertion_order_for_selected(self, order: dict, my_conn: Optional[dict] = None,
t_log: Optional[TimeLogger] = None, verbose: bool = False):
"""Populates insertion_order attribute"""
self.pull_grouped_data()
df = self.selected_data_df()
join_table: str = self.table_name + '_' + self.parent_table_name
if verbose is True and t_log is None:
t_log = TimeLogger()
if my_conn is None:
my_conn = self.my_conn
else:
self.my_conn = my_conn
my_conn = my_connect(my_conn=my_conn, t_log=t_log, verbose=verbose)
self.my_conn = my_conn
conn = my_conn['conn']
sql = 'UPDATE {table} SET insertion_order = %s WHERE ({c_table_id}, {p_table_id}) = (%s, %s)'
query = SQL(sql).format(table=Identifier(join_table),
c_table_id=Identifier(self.id_name()),
p_table_id=Identifier(self.parent_table_id_name())
)
cur = conn.cursor(cursor_factory=NamedTupleCursor)
if verbose is True:
t_log.new_event('Updating Insertion order for: ' + join_table)
print(query.as_string(conn))
for eq_name, i in order.items():
p_id = self.selected_parent_id
c_id = int(df.name[df.name == eq_name].index[0])
if verbose:
print(cur.mogrify(query, (i, c_id, p_id)))
try:
cur.execute(query, (i, c_id, p_id))
except OperationalError as error:
print(error)
conn.commit()
self.pull_grouped_data()
if verbose is True:
t_log.new_event('Finished Updating Insertion Order: ' + join_table)
# def __repr__(self):
# return self.grouped_data.__repr__()
#
# def __str__(self):
# return self.grouped_data.__str__()