diff --git a/kcidb/db/__init__.py b/kcidb/db/__init__.py index 5cd62791..87de6ca5 100644 --- a/kcidb/db/__init__.py +++ b/kcidb/db/__init__.py @@ -186,18 +186,23 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ assert self.is_initialized() + io_schema = self.get_schema()[1] first_modified = self.driver.get_first_modified() - assert first_modified is None or \ - isinstance(first_modified, datetime.datetime) and \ - first_modified.tzinfo + assert isinstance(first_modified, dict) + assert all( + obj_list_name in io_schema.id_fields and + isinstance(timestamp, datetime.datetime) and timestamp.tzinfo + for obj_list_name, timestamp in first_modified.items() + ) return first_modified def get_last_modified(self): @@ -206,18 +211,23 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ assert self.is_initialized() + io_schema = self.get_schema()[1] last_modified = self.driver.get_last_modified() - assert last_modified is None or \ - isinstance(last_modified, datetime.datetime) and \ - last_modified.tzinfo + assert isinstance(last_modified, dict) + assert all( + obj_list_name in io_schema.id_fields and + isinstance(timestamp, datetime.datetime) and timestamp.tzinfo + for obj_list_name, timestamp in last_modified.items() + ) return last_modified def get_schemas(self): @@ -310,16 +320,29 @@ def dump_iter(self, objects_per_report=0, with_metadata=True, report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. Can be a single datetime + object, one for all object types, or None to + not limit the dump by this parameter + (equivalent to empty dictionary). + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. + Object types missing from this dictionary will + not be limited. Can be a single datetime + object, one for all object types, or None to + not limit the dump by this parameter + (equivalent to empty dictionary). Returns: An iterator returning report JSON data adhering to the current I/O @@ -327,17 +350,36 @@ def dump_iter(self, objects_per_report=0, with_metadata=True, objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and - the database doesn't have row timestamps. + NoTimestamps - Either "after" or "until" are not None/empty, + and the database doesn't have row timestamps. """ + assert self.is_initialized() + id_fields = self.get_schema()[1].id_fields assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) assert after is None or \ - isinstance(after, datetime.datetime) and after.tzinfo + isinstance(after, datetime.datetime) and after.tzinfo or \ + isinstance(after, dict) and all( + obj_list_name in id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in after.items() + ) assert until is None or \ - isinstance(until, datetime.datetime) and until.tzinfo - assert self.is_initialized() + isinstance(until, datetime.datetime) and until.tzinfo or \ + isinstance(until, dict) and all( + obj_list_name in id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in until.items() + ) + if after is None: + after = {} + elif not isinstance(after, dict): + after = {n: after for n in id_fields} + if until is None: + until = {} + elif not isinstance(until, dict): + until = {n: until for n in id_fields} yield from self.driver.dump_iter( objects_per_report=objects_per_report, with_metadata=with_metadata, @@ -351,31 +393,55 @@ def dump(self, with_metadata=True, after=None, until=None): Args: with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. Can be a single datetime + object, one for all object types, or None to + not limit the dump by this parameter + (equivalent to empty dictionary). + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. + Object types missing from this dictionary will + not be limited. Can be a single datetime + object, one for all object types, or None to + not limit the dump by this parameter + (equivalent to empty dictionary). Returns: The JSON data from the database adhering to the current I/O schema version. Raises: - NoTimestamps - Either "after" or "until" are not None, and - the database doesn't have row timestamps. + NoTimestamps - Either "after" or "until" are not None/empty, + and the database doesn't have row timestamps. """ + assert self.is_initialized() + io_schema = self.get_schema()[1] assert isinstance(with_metadata, bool) assert after is None or \ - isinstance(after, datetime.datetime) and after.tzinfo + isinstance(after, datetime.datetime) and after.tzinfo or \ + isinstance(after, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in after.items() + ) assert until is None or \ - isinstance(until, datetime.datetime) and until.tzinfo - assert self.is_initialized() + isinstance(until, datetime.datetime) and until.tzinfo or \ + isinstance(until, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in until.items() + ) try: return next(self.dump_iter(objects_per_report=0, with_metadata=with_metadata, @@ -1077,6 +1143,11 @@ def time_main(): sys.excepthook = kcidb.misc.log_and_print_excepthook description = 'kcidb-db-time - Fetch various timestamps from a KCIDB DB' parser = ArgumentParser(description=description) + parser.add_argument( + '-v', '--verbose', + help='Output timestamp breakdown, when available', + action='store_true' + ) parser.add_argument( 'type', choices=['first_modified', 'last_modified', 'current'], @@ -1088,9 +1159,19 @@ def time_main(): raise Exception(f"Database {args.database!r} is not initialized") ts = None if args.type == 'first_modified': - ts = client.get_first_modified() + ts = None + for obj_list_name, obj_list_ts in client.get_first_modified().items(): + if args.verbose: + print(obj_list_name + ": " + + obj_list_ts.isoformat(timespec='microseconds')) + ts = min(ts or obj_list_ts, obj_list_ts) elif args.type == 'last_modified': - ts = client.get_last_modified() + ts = None + for obj_list_name, obj_list_ts in client.get_last_modified().items(): + if args.verbose: + print(obj_list_name + ": " + + obj_list_ts.isoformat(timespec='microseconds')) + ts = min(ts or obj_list_ts, obj_list_ts) elif args.type == 'current': ts = client.get_current_time() if ts is None: diff --git a/kcidb/db/abstract.py b/kcidb/db/abstract.py index 31ecfbac..e885c837 100644 --- a/kcidb/db/abstract.py +++ b/kcidb/db/abstract.py @@ -117,8 +117,9 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -132,8 +133,9 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -197,16 +199,22 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: An iterator returning report JSON data adhering to the current @@ -217,14 +225,21 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): NoTimestamps - Either "after" or "until" are not None, and the database doesn't have row timestamps. """ + assert self.is_initialized() + io_schema = self.get_schema()[1] assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) - assert after is None or \ - isinstance(after, datetime.datetime) and after.tzinfo - assert until is None or \ - isinstance(until, datetime.datetime) and until.tzinfo - assert self.is_initialized() + assert isinstance(after, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in after.items() + ) + assert isinstance(until, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in until.items() + ) # No, it's not, pylint: disable=too-many-return-statements def query_ids_are_valid(self, ids): diff --git a/kcidb/db/bigquery/v04_00.py b/kcidb/db/bigquery/v04_00.py index 72a4c4b7..cf25d694 100644 --- a/kcidb/db/bigquery/v04_00.py +++ b/kcidb/db/bigquery/v04_00.py @@ -741,26 +741,33 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the I/O version - of the database schema, each containing at most the specified - number of objects. + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the + specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ + # We'll try to manage, pylint: disable=too-many-locals assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) @@ -772,7 +779,9 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): (f for f in table_schema if f.name == "_timestamp"), None ) - if (after or until) and not ts_field: + table_after = after.get(obj_list_name) + table_until = until.get(obj_list_name) + if (table_after or table_until) and not ts_field: raise NoTimestamps( f"Table {obj_list_name!r} has no {ts_field.name!r} column" ) @@ -787,13 +796,15 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): (( " WHERE " + " AND ".join( f"{ts_field.name} {op} ?" - for op, v in ((">", after), ("<=", until)) if v + for op, v in ( + (">", table_after), ("<=", table_until) + ) if v ) - ) if (after or until) else "") + ) if (table_after or table_until) else "") ) query_parameters = [ bigquery.ScalarQueryParameter(None, ts_field.field_type, v) - for v in (after, until) if v + for v in (table_after, table_until) if v ] query_job = self.conn.query_create(query_string, query_parameters) obj_list = None @@ -1205,8 +1216,9 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -1218,14 +1230,22 @@ def get_first_modified(self): ): raise NoTimestamps("Database is missing timestamps in its schema") - return next(iter(self.conn.query_create( - "SELECT MIN(first_modified) AS first_modified FROM(\n" + - "UNION ALL\n".join( - f"SELECT MIN(_timestamp) AS first_modified FROM {table_name}\n" - for table_name in self.TABLE_MAP - ) + - ")\n" - ).result()))[0] + return { + table_name: first_modified + for table_name, first_modified in self.conn.query_create( + "\nUNION ALL\n".join( + f"SELECT ? AS table_name, " + f"MIN(_timestamp) AS first_modified " + f"FROM _{table_name}" + for table_name in self.TABLE_MAP + ), + [ + bigquery.ScalarQueryParameter(None, "STRING", table_name) + for table_name in self.TABLE_MAP + ] + ).result() + if first_modified + } def get_last_modified(self): """ @@ -1233,8 +1253,9 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -1246,11 +1267,19 @@ def get_last_modified(self): ): raise NoTimestamps("Database is missing timestamps in its schema") - return next(iter(self.conn.query_create( - "SELECT MAX(last_modified) AS last_modified FROM(\n" + - "UNION ALL\n".join( - f"SELECT MAX(_timestamp) AS last_modified FROM {table_name}\n" - for table_name in self.TABLE_MAP - ) + - ")\n" - ).result()))[0] + return { + table_name: last_modified + for table_name, last_modified in self.conn.query_create( + "\nUNION ALL\n".join( + f"SELECT ? AS table_name, " + f"MAX(_timestamp) AS last_modified " + f"FROM _{table_name}" + for table_name in self.TABLE_MAP + ), + [ + bigquery.ScalarQueryParameter(None, "STRING", table_name) + for table_name in self.TABLE_MAP + ] + ).result() + if last_modified + } diff --git a/kcidb/db/mux.py b/kcidb/db/mux.py index b6bc8789..9b4e61ea 100644 --- a/kcidb/db/mux.py +++ b/kcidb/db/mux.py @@ -291,13 +291,13 @@ def get_current_time(self): def get_first_modified(self): """ - Get the time data has arrived first into the driven database. Can - return the minimum timestamp constant, if the database is empty. + Get the time data has arrived first into the driven database. The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -305,20 +305,25 @@ def get_first_modified(self): """ assert self.is_initialized() max_ts = datetime.datetime.max.replace(tzinfo=datetime.timezone.utc) - return min( - (driver.get_first_modified() for driver in self.drivers), - key=lambda ts: ts or max_ts - ) + merged_first_modified = {} + for driver in self.drivers: + first_modified = driver.get_first_modified() + for obj_list_name, timestamp in first_modified.items(): + merged_first_modified[obj_list_name] = min( + merged_first_modified.get(obj_list_name, max_ts), + timestamp + ) + return merged_first_modified def get_last_modified(self): """ - Get the time data has arrived last into the driven database. Can - return the minimum timestamp constant, if the database is empty. + Get the time data has arrived last into the driven database. The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -326,10 +331,15 @@ def get_last_modified(self): """ assert self.is_initialized() min_ts = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) - return max( - (driver.get_last_modified() for driver in self.drivers), - key=lambda ts: ts or min_ts - ) + merged_last_modified = {} + for driver in self.drivers: + last_modified = driver.get_last_modified() + for obj_list_name, timestamp in last_modified.items(): + merged_last_modified[obj_list_name] = max( + merged_last_modified.get(obj_list_name, min_ts), + timestamp + ) + return merged_last_modified def get_schemas(self): """ @@ -393,24 +403,30 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the current I/O - schema version, each containing at most the specified number of - objects. + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the + specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ yield from self.drivers[0].dump_iter(objects_per_report, diff --git a/kcidb/db/null.py b/kcidb/db/null.py index bcfc0dff..1427821e 100644 --- a/kcidb/db/null.py +++ b/kcidb/db/null.py @@ -122,14 +122,15 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - return None + return {} def get_last_modified(self): """ @@ -137,14 +138,15 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - return None + return {} def dump_iter(self, objects_per_report, with_metadata, after, until): """ @@ -155,24 +157,30 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the current I/O - schema version, each containing at most the specified number of - objects. + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the + specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ del objects_per_report diff --git a/kcidb/db/postgresql/v04_00.py b/kcidb/db/postgresql/v04_00.py index 981f07aa..029239ee 100644 --- a/kcidb/db/postgresql/v04_00.py +++ b/kcidb/db/postgresql/v04_00.py @@ -5,6 +5,7 @@ import textwrap from collections import namedtuple from itertools import chain +from functools import reduce import psycopg2 import psycopg2.extras import psycopg2.errors @@ -18,6 +19,8 @@ Constraint, BoolColumn, FloatColumn, IntegerColumn, TimestampColumn, \ VarcharColumn, TextColumn, TextArrayColumn, JSONColumn, Table +# It's OK for now, pylint: disable=too-many-lines + # Module's logger LOGGER = logging.getLogger(__name__) @@ -564,16 +567,22 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: An iterator returning report JSON data adhering to the I/O @@ -581,7 +590,7 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) @@ -593,9 +602,10 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): with self.conn, self.conn.cursor() as cursor: for table_name, table_schema in self.TABLES.items(): obj_list = None - cursor.execute(*table_schema.format_dump(table_name, - with_metadata, - after, until)) + cursor.execute(*table_schema.format_dump( + table_name, with_metadata, + after.get(table_name), until.get(table_name) + )) for obj in table_schema.unpack_iter(cursor, with_metadata): if obj_list is None: obj_list = [] @@ -943,27 +953,28 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - statement = ( - "SELECT MIN(first_modified) AS first_modified\n" + - "FROM (\n" + - textwrap.indent( - "\nUNION ALL\n".join( - table_schema.format_get_first_modified(table_name) - for table_name, table_schema in self.TABLES.items() - ), - " " * 4 - ) + "\n) AS tables\n" + query = reduce( + lambda a, b: (a[0] + "\nUNION ALL\n" + b[0], a[1] + b[1]), + ( + table_schema.format_get_first_modified(table_name) + for table_name, table_schema in self.TABLES.items() + ) ) with self.conn, self.conn.cursor() as cursor: - cursor.execute(statement) - return cursor.fetchone()[0] + cursor.execute(*query) + return { + table_name: first_modified + for (table_name, first_modified) in cursor + if first_modified + } def get_last_modified(self): """ @@ -971,24 +982,25 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - statement = ( - "SELECT MAX(last_modified) AS last_modified\n" + - "FROM (\n" + - textwrap.indent( - "\nUNION ALL\n".join( - table_schema.format_get_last_modified(table_name) - for table_name, table_schema in self.TABLES.items() - ), - " " * 4 - ) + "\n) AS tables\n" + query = reduce( + lambda a, b: (a[0] + "\nUNION ALL\n" + b[0], a[1] + b[1]), + ( + table_schema.format_get_last_modified(table_name) + for table_name, table_schema in self.TABLES.items() + ) ) with self.conn, self.conn.cursor() as cursor: - cursor.execute(statement) - return cursor.fetchone()[0] + cursor.execute(*query) + return { + table_name: last_modified + for (table_name, last_modified) in cursor + if last_modified + } diff --git a/kcidb/db/schematic.py b/kcidb/db/schematic.py index 8ac3daaf..9e8f99b2 100644 --- a/kcidb/db/schematic.py +++ b/kcidb/db/schematic.py @@ -271,33 +271,32 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the current - database schema's I/O schema version, each containing at most the + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ - assert isinstance(objects_per_report, int) - assert objects_per_report >= 0 - assert isinstance(with_metadata, bool) - assert after is None or \ - isinstance(after, datetime.datetime) and after.tzinfo - assert until is None or \ - isinstance(until, datetime.datetime) and until.tzinfo # We can live with this for now, pylint: disable=too-many-arguments # Or if you prefer, pylint: disable=too-many-positional-arguments @@ -375,8 +374,9 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -390,8 +390,9 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -567,8 +568,9 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -583,8 +585,9 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -669,34 +672,47 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the schema's - I/O schema version, each containing at most the specified number - of objects. + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the + specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ + assert self.is_initialized() + io_schema = self.get_schema()[1] assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) - assert after is None or \ - isinstance(after, datetime.datetime) and after.tzinfo - assert until is None or \ - isinstance(until, datetime.datetime) and until.tzinfo - assert self.is_initialized() + assert isinstance(after, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in after.items() + ) + assert isinstance(until, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in until.items() + ) return self.schema.dump_iter(objects_per_report, with_metadata, after, until) diff --git a/kcidb/db/sql/schema.py b/kcidb/db/sql/schema.py index 1a96c4e2..1fdeae36 100644 --- a/kcidb/db/sql/schema.py +++ b/kcidb/db/sql/schema.py @@ -352,8 +352,9 @@ def format_get_first_modified(self, name): name: The name of the target table of the command. Returns: - The formatted "SELECT" command, returning the timestamp in - "first_modified" column. + The formatted "SELECT" command and its parameters container. + The "SELECT" command returns the table name in "table_name", + and the timestamp in "first_modified" columns. Raises: NoTimestamps - The table doesn't have row timestamps. @@ -362,7 +363,10 @@ def format_get_first_modified(self, name): if not self.timestamp: raise NoTimestamps("Table has no timestamp column") return ( - f"SELECT MIN({self.timestamp.name}) AS first_modified FROM {name}" + f"SELECT {self.placeholder} AS table_name, " + f"MIN({self.timestamp.name}) AS first_modified " + f"FROM {name}", + (name,) ) def format_get_last_modified(self, name): @@ -374,8 +378,9 @@ def format_get_last_modified(self, name): name: The name of the target table of the command. Returns: - The formatted "SELECT" command, returning the timestamp in - "last_modified" column. + The formatted "SELECT" command and its parameters container. + The "SELECT" command returns the table name in "table_name", + and the timestamp in "last_modified" columns. Raises: NoTimestamps - The table doesn't have row timestamps. @@ -384,7 +389,10 @@ def format_get_last_modified(self, name): if not self.timestamp: raise NoTimestamps("Table has no timestamp column") return ( - f"SELECT MAX({self.timestamp.name}) AS last_modified FROM {name}" + f"SELECT {self.placeholder} AS table_name, " + f"MAX({self.timestamp.name}) AS last_modified " + f"FROM {name}", + (name,) ) def format_delete(self, name): diff --git a/kcidb/db/sqlite/v04_00.py b/kcidb/db/sqlite/v04_00.py index f195358f..1ca73d2b 100644 --- a/kcidb/db/sqlite/v04_00.py +++ b/kcidb/db/sqlite/v04_00.py @@ -492,24 +492,30 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the I/O version - of the database schema, each containing at most the specified - number of objects. + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the + specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) @@ -522,10 +528,10 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): cursor = self.conn.cursor() try: for table_name, table_schema in self.TABLES.items(): - result = cursor.execute( - *table_schema.format_dump(table_name, with_metadata, - after, until) - ) + result = cursor.execute(*table_schema.format_dump( + table_name, with_metadata, + after.get(table_name), until.get(table_name) + )) obj_list = None for obj in table_schema.unpack_iter(result, with_metadata): @@ -898,32 +904,32 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - statement = ( - "SELECT MIN(first_modified) AS first_modified\n" + - "FROM (\n" + - textwrap.indent( - "\nUNION ALL\n".join( - table_schema.format_get_first_modified(table_name) - for table_name, table_schema in self.TABLES.items() - ), - " " * 4 - ) + "\n) AS tables\n" + query = reduce( + lambda a, b: (a[0] + "\nUNION ALL\n" + b[0], a[1] + b[1]), + ( + table_schema.format_get_first_modified(table_name) + for table_name, table_schema in self.TABLES.items() + ) ) with self.conn: cursor = self.conn.cursor() try: - cursor.execute(statement) - ts_str = cursor.fetchone()[0] + return { + table_name: dateutil.parser.isoparse(first_modified) + for (table_name, first_modified) + in cursor.execute(*query) + if first_modified + } finally: cursor.close() - return ts_str and dateutil.parser.isoparse(ts_str) def get_last_modified(self): """ @@ -931,29 +937,29 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - statement = ( - "SELECT MAX(last_modified) AS last_modified\n" + - "FROM (\n" + - textwrap.indent( - "\nUNION ALL\n".join( - table_schema.format_get_last_modified(table_name) - for table_name, table_schema in self.TABLES.items() - ), - " " * 4 - ) + "\n) AS tables\n" + query = reduce( + lambda a, b: (a[0] + "\nUNION ALL\n" + b[0], a[1] + b[1]), + ( + table_schema.format_get_last_modified(table_name) + for table_name, table_schema in self.TABLES.items() + ) ) with self.conn: cursor = self.conn.cursor() try: - cursor.execute(statement) - ts_str = cursor.fetchone()[0] + return { + table_name: dateutil.parser.isoparse(last_modified) + for (table_name, last_modified) + in cursor.execute(*query) + if last_modified + } finally: cursor.close() - return ts_str and dateutil.parser.isoparse(ts_str) diff --git a/kcidb/test_db.py b/kcidb/test_db.py index 15029f6b..801dbd87 100644 --- a/kcidb/test_db.py +++ b/kcidb/test_db.py @@ -451,27 +451,37 @@ def test_get_modified(clean_database): # Check a post-timestamp schema version time.sleep(1) client.init() - timestamp = client.get_first_modified() - assert timestamp is None - timestamp = client.get_last_modified() - assert timestamp is None + io_schema = client.get_schema()[1] + timestamps = client.get_first_modified() + assert timestamps == {} + timestamps = client.get_last_modified() + assert timestamps == {} before_load = client.get_current_time() client.load(COMPREHENSIVE_IO_DATA) first_modified = client.get_first_modified() last_modified = client.get_last_modified() - assert first_modified is not None - assert isinstance(first_modified, datetime.datetime) - assert first_modified.tzinfo is not None - assert first_modified >= before_load + assert isinstance(first_modified, dict) + assert set(io_schema.id_fields) == set(first_modified) + assert all( + isinstance(t, datetime.datetime) and + t.tzinfo is not None and + t >= before_load + for t in first_modified.values() + ) + + assert isinstance(last_modified, dict) + assert set(io_schema.id_fields) == set(last_modified) + assert all( + isinstance(t, datetime.datetime) and + t.tzinfo is not None and + t >= before_load + for t in first_modified.values() + ) - assert last_modified is not None - assert isinstance(last_modified, datetime.datetime) - assert last_modified.tzinfo is not None - assert last_modified >= before_load + assert all(t >= first_modified[n] for n, t in last_modified.items()) - assert last_modified >= first_modified client.cleanup() diff --git a/main.py b/main.py index d4a10fe5..391b9bbd 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ """Google Cloud Functions for Kernel CI reporting""" +import gc import os import time import atexit @@ -423,45 +424,84 @@ def kcidb_archive(event, context): that is out of the editing window (to be enforced), and hasn't been transferred yet. """ + # It's OK, pylint: disable=too-many-locals + # + # Editing window + edit_window = datetime.timedelta(days=14) + # Maximum duration of the dump transferred in a single execution + max_duration = datetime.timedelta(days=7) + # Duration of each dump piece + piece_duration = datetime.timedelta(hours=6) + op_client = get_db_client(OPERATIONAL_DATABASE) + op_io_schema = op_client.get_schema()[1] + op_obj_list_names = set(op_io_schema.id_fields) op_now = op_client.get_current_time() op_first_modified = op_client.get_first_modified() if not op_first_modified: LOGGER.info("Operational database is empty, nothing to archive") return + # Maximum timestamp of data to archive + max_until = op_now - edit_window + ar_client = get_db_client(ARCHIVE_DATABASE) ar_last_modified = ar_client.get_last_modified() - after = ar_last_modified or \ - (op_first_modified - datetime.timedelta(seconds=1)) - until = min( - # Add a timespan we can fit in time limits - after + datetime.timedelta(days=7), - # Subtract editing window (to be enforced) - op_now - datetime.timedelta(days=14) - ) - if after >= until: + # Find the timestamps right before the data we need to fetch + after = { + n: ( + ar_last_modified.get(n) or + op_first_modified.get(n) and + op_first_modified[n] - datetime.timedelta(seconds=1) + ) for n in op_obj_list_names + } + min_after = min(after.values()) + if min_after >= max_until: LOGGER.info("No data old enough to archive") return + # Find the maximum timestamp of the data we need to fetch + # We try to align all tables on a single time boundary + until = min(min_after + max_duration, max_until) + # Transfer data in pieces which can hopefully fit in memory - after_str = after.isoformat(timespec='microseconds') - while after < until: - next_after = min(after + datetime.timedelta(hours=12), until) - next_after_str = next_after.isoformat(timespec='microseconds') + # Split by time, down to microseconds, as it's our transfer atom + min_after_str = min_after.isoformat(timespec='microseconds') + while all(t < until for t in after.values()): + next_after = { + n: min(max(t, min_after + piece_duration), until) + for n, t in after.items() + } + next_min_after = min(next_after.values()) + next_min_after_str = next_min_after.isoformat(timespec='microseconds') # Transfer the data, preserving the timestamps LOGGER.info("FETCHING operational database dump for (%s, %s] range", - after_str, next_after_str) + min_after_str, next_min_after_str) + for obj_list_name in after: + LOGGER.debug( + "FETCHING %s for (%s, %s] range", + obj_list_name, + after[obj_list_name].isoformat(timespec='microseconds'), + next_after[obj_list_name].isoformat(timespec='microseconds') + ) dump = op_client.dump(with_metadata=True, after=after, until=next_after) LOGGER.info("LOADING a dump of %u objects into archive database", kcidb.io.SCHEMA.count(dump)) ar_client.load(dump, with_metadata=True) LOGGER.info("ARCHIVED %u objects in (%s, %s] range", - kcidb.io.SCHEMA.count(dump), after_str, next_after_str) + kcidb.io.SCHEMA.count(dump), + min_after_str, next_min_after_str) + for obj_list_name in after: + LOGGER.debug("ARCHIVED %u %s", + len(dump.get(obj_list_name, [])), obj_list_name) after = next_after - after_str = next_after_str + min_after = next_min_after + min_after_str = next_min_after_str + # Make sure we have enough memory for the next piece + dump = None + gc.collect() def kcidb_purge_db(event, context):