From 921f852e943159ba8e75c4802dac90e66ea8b146 Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Thu, 31 Oct 2024 21:36:29 +0200 Subject: [PATCH 1/4] db.mux: Support purging --- kcidb/db/mux.py | 27 +++++++++++++++++++++++++++ kcidb/test_db.py | 16 ++++++++++++---- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/kcidb/db/mux.py b/kcidb/db/mux.py index 85f2fd7b..97e3bc25 100644 --- a/kcidb/db/mux.py +++ b/kcidb/db/mux.py @@ -1,6 +1,7 @@ """Kernel CI reporting database - multiplexing""" import textwrap +import datetime from abc import abstractmethod import kcidb.io as io import kcidb.db.misc @@ -252,6 +253,32 @@ def empty(self): for driver in self.drivers: driver.empty() + def purge(self, before): + """ + Remove all the data from the database that arrived before the + specified time, if the database supports that. + The database must be initialized. + + Args: + before: An "aware" datetime.datetime object specifying the + earliest (database server) time the data to be *preserved* + should've arrived. Any other data will be purged. + Can be None to have nothing removed. The latter can be + used to test if the database supports purging. + + Returns: + True if the database supports purging, and the requested data was + purged. False if the database doesn't support purging. + """ + assert self.is_initialized() + assert before is None or \ + isinstance(before, datetime.datetime) and before.tzinfo + purging_supported = all(d.purge(None) for d in self.drivers) + if before is not None and purging_supported: + for driver in self.drivers: + driver.purge(before) + return purging_supported + def get_current_time(self): """ Get the current time from the database server. diff --git a/kcidb/test_db.py b/kcidb/test_db.py index 28e2a435..cd8db7f4 100644 --- a/kcidb/test_db.py +++ b/kcidb/test_db.py @@ -1949,11 +1949,19 @@ def test_purge(empty_database): """Test the purge() method behaves as documented""" client = empty_database + drivers = [*client.driver.drivers] \ + if isinstance(client.driver, kcidb.db.mux.Driver) \ + else [client.driver] + # If this is a database and schema which *should* support purging - if isinstance(client.driver, (kcidb.db.bigquery.Driver, - kcidb.db.postgresql.Driver, - kcidb.db.sqlite.Driver)) and \ - client.driver.get_schema()[0] >= (4, 2): + if all( + isinstance(driver, + (kcidb.db.bigquery.Driver, + kcidb.db.postgresql.Driver, + kcidb.db.sqlite.Driver)) and + driver.get_schema()[0] >= (4, 2) + for driver in drivers + ): io_data_1 = deepcopy(COMPREHENSIVE_IO_DATA) io_data_2 = deepcopy(COMPREHENSIVE_IO_DATA) for obj_list_name in kcidb.io.SCHEMA.graph: From f80d402e6157df18da7b49ff27dd974b2c8dfd19 Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Thu, 31 Oct 2024 21:42:09 +0200 Subject: [PATCH 2/4] db: Fix purge() docstrings --- kcidb/db/__init__.py | 5 ++--- kcidb/db/abstract.py | 5 ++--- kcidb/db/bigquery/v04_02.py | 5 ++--- kcidb/db/postgresql/v04_02.py | 5 ++--- kcidb/db/schematic.py | 10 ++++------ kcidb/db/sqlite/v04_02.py | 5 ++--- 6 files changed, 14 insertions(+), 21 deletions(-) diff --git a/kcidb/db/__init__.py b/kcidb/db/__init__.py index 30edc44b..00dd1ca6 100644 --- a/kcidb/db/__init__.py +++ b/kcidb/db/__init__.py @@ -151,9 +151,8 @@ def purge(self, before=None): Args: before: An "aware" datetime.datetime object specifying the - the earliest (database server) time the data to be - *preserved* should've arrived. Any other data will be - purged. + earliest (database server) time the data to be *preserved* + should've arrived. Any other data will be purged. Can be None to have nothing removed. The latter can be used to test if the database supports purging. diff --git a/kcidb/db/abstract.py b/kcidb/db/abstract.py index 64e5c5ec..aded832d 100644 --- a/kcidb/db/abstract.py +++ b/kcidb/db/abstract.py @@ -86,9 +86,8 @@ def purge(self, before): Args: before: An "aware" datetime.datetime object specifying the - the earliest (database server) time the data to be - *preserved* should've arrived. Any other data will be - purged. + earliest (database server) time the data to be *preserved* + should've arrived. Any other data will be purged. Can be None to have nothing removed. The latter can be used to test if the database supports purging. diff --git a/kcidb/db/bigquery/v04_02.py b/kcidb/db/bigquery/v04_02.py index f39c58e3..45b5e7f2 100644 --- a/kcidb/db/bigquery/v04_02.py +++ b/kcidb/db/bigquery/v04_02.py @@ -87,9 +87,8 @@ def purge(self, before): Args: before: An "aware" datetime.datetime object specifying the - the earliest (database server) time the data to be - *preserved* should've arrived. Any other data will be - purged. + earliest (database server) time the data to be *preserved* + should've arrived. Any other data will be purged. Can be None to have nothing removed. The latter can be used to test if the database supports purging. diff --git a/kcidb/db/postgresql/v04_02.py b/kcidb/db/postgresql/v04_02.py index a6a33132..7d74931d 100644 --- a/kcidb/db/postgresql/v04_02.py +++ b/kcidb/db/postgresql/v04_02.py @@ -77,9 +77,8 @@ def purge(self, before): Args: before: An "aware" datetime.datetime object specifying the - the earliest (database server) time the data to be - *preserved* should've arrived. Any other data will be - purged. + earliest (database server) time the data to be *preserved* + should've arrived. Any other data will be purged. Can be None to have nothing removed. The latter can be used to test if the database supports purging. diff --git a/kcidb/db/schematic.py b/kcidb/db/schematic.py index bae3b344..5d1eb208 100644 --- a/kcidb/db/schematic.py +++ b/kcidb/db/schematic.py @@ -260,9 +260,8 @@ def purge(self, before): Args: before: An "aware" datetime.datetime object specifying the - the earliest (database server) time the data to be - *preserved* should've arrived. Any other data will be - purged. + earliest (database server) time the data to be *preserved* + should've arrived. Any other data will be purged. Can be None to have nothing removed. The latter can be used to test if the database supports purging. @@ -503,9 +502,8 @@ def purge(self, before): Args: before: An "aware" datetime.datetime object specifying the - the earliest (database server) time the data to be - *preserved* should've arrived. Any other data will be - purged. + earliest (database server) time the data to be *preserved* + should've arrived. Any other data will be purged. Can be None to have nothing removed. The latter can be used to test if the database supports purging. diff --git a/kcidb/db/sqlite/v04_02.py b/kcidb/db/sqlite/v04_02.py index a1f4f26e..407520d4 100644 --- a/kcidb/db/sqlite/v04_02.py +++ b/kcidb/db/sqlite/v04_02.py @@ -81,9 +81,8 @@ def purge(self, before): Args: before: An "aware" datetime.datetime object specifying the - the earliest (database server) time the data to be - *preserved* should've arrived. Any other data will be - purged. + earliest (database server) time the data to be *preserved* + should've arrived. Any other data will be purged. Can be None to have nothing removed. The latter can be used to test if the database supports purging. From ca1278f5edd4c523bfb72af10fcdf5542248eeeb Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Thu, 31 Oct 2024 21:43:13 +0200 Subject: [PATCH 3/4] monitor.spool: Fix formatting of a docstring --- kcidb/monitor/spool/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kcidb/monitor/spool/__init__.py b/kcidb/monitor/spool/__init__.py index 1eeececf..e8d84ab0 100644 --- a/kcidb/monitor/spool/__init__.py +++ b/kcidb/monitor/spool/__init__.py @@ -252,7 +252,7 @@ def unpicked(self, timestamp=None): Args: timestamp: An "aware" datetime.datetime object specifying the intended pickup time, or None to use - datetime.datetime.now(datetime.timezone.utc). + datetime.datetime.now(datetime.timezone.utc). Yields: The ID of the next notification free for picking. From c9e469cacc26a50c74a3ebe6d9c62885bde30c2e Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Thu, 31 Oct 2024 21:44:10 +0200 Subject: [PATCH 4/4] db: Support dumping a time range --- kcidb/db/__init__.py | 69 ++++++++++++++++++++++++--- kcidb/db/abstract.py | 20 +++++++- kcidb/db/bigquery/v04_00.py | 48 ++++++++++++++++--- kcidb/db/misc.py | 4 ++ kcidb/db/mux.py | 18 ++++++- kcidb/db/null.py | 19 +++++++- kcidb/db/postgresql/schema.py | 7 ++- kcidb/db/postgresql/v04_00.py | 21 +++++++-- kcidb/db/postgresql/v04_02.py | 3 +- kcidb/db/postgresql/v04_05.py | 5 +- kcidb/db/schematic.py | 43 +++++++++++++++-- kcidb/db/sql/schema.py | 62 +++++++++++++++++++++--- kcidb/db/sqlite/schema.py | 7 ++- kcidb/db/sqlite/v04_00.py | 19 +++++++- kcidb/db/sqlite/v04_02.py | 3 +- kcidb/test_db.py | 89 +++++++++++++++++++++++++++++++++++ 16 files changed, 397 insertions(+), 40 deletions(-) diff --git a/kcidb/db/__init__.py b/kcidb/db/__init__.py index 00dd1ca6..1971719c 100644 --- a/kcidb/db/__init__.py +++ b/kcidb/db/__init__.py @@ -11,6 +11,8 @@ from kcidb.db import abstract, schematic, mux, \ bigquery, postgresql, sqlite, json, null, misc # noqa: F401 +# It's OK for now, pylint: disable=too-many-lines + # Module's logger LOGGER = logging.getLogger(__name__) @@ -274,7 +276,8 @@ def upgrade(self, target_version=None): "Target schema is older than the current schema" self.driver.upgrade(target_version) - def dump_iter(self, objects_per_report=0, with_metadata=True): + def dump_iter(self, objects_per_report=0, with_metadata=True, + after=None, until=None): """ Dump all data from the database in object number-limited chunks. @@ -283,37 +286,76 @@ def dump_iter(self, objects_per_report=0, with_metadata=True): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the current I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ - assert self.is_initialized() assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo + assert self.is_initialized() yield from self.driver.dump_iter( objects_per_report=objects_per_report, - with_metadata=with_metadata + with_metadata=with_metadata, + after=after, until=until ) - def dump(self, with_metadata=True): + def dump(self, with_metadata=True, after=None, until=None): """ Dump all data from the database. Args: with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: The JSON data from the database adhering to the current I/O schema version. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ + assert isinstance(with_metadata, bool) + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo assert self.is_initialized() try: return next(self.dump_iter(objects_per_report=0, - with_metadata=with_metadata)) + with_metadata=with_metadata, + after=after, until=until)) except StopIteration: return self.get_schema()[1].new() @@ -776,13 +818,28 @@ def dump_main(): help='Do not dump metadata fields', action='store_true' ) + parser.add_argument( + '--after', + metavar='AFTER', + type=kcidb.misc.iso_timestamp, + help="An ISO-8601 timestamp specifying the latest time the data to " + "be *excluded* from the dump should've arrived." + ) + parser.add_argument( + '--until', + metavar='UNTIL', + type=kcidb.misc.iso_timestamp, + help="An ISO-8601 timestamp specifying the latest time the data to " + "be *included* into the dump should've arrived." + ) args = parser.parse_args() client = Client(args.database) if not client.is_initialized(): raise Exception(f"Database {args.database!r} is not initialized") kcidb.misc.json_dump_stream( client.dump_iter(objects_per_report=args.objects_per_report, - with_metadata=not args.without_metadata), + with_metadata=not args.without_metadata, + after=args.after, until=args.until), sys.stdout, indent=args.indent, seq=args.seq_out ) diff --git a/kcidb/db/abstract.py b/kcidb/db/abstract.py index aded832d..b4915653 100644 --- a/kcidb/db/abstract.py +++ b/kcidb/db/abstract.py @@ -170,7 +170,7 @@ def upgrade(self, target_version): "Target schema is older than the current schema" @abstractmethod - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. The database must be initialized. @@ -180,15 +180,33 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the current database schema's I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo assert self.is_initialized() # No, it's not, pylint: disable=too-many-return-statements diff --git a/kcidb/db/bigquery/v04_00.py b/kcidb/db/bigquery/v04_00.py index f3188a9c..a80506a4 100644 --- a/kcidb/db/bigquery/v04_00.py +++ b/kcidb/db/bigquery/v04_00.py @@ -17,7 +17,7 @@ from kcidb.db.schematic import \ Schema as AbstractSchema, \ Connection as AbstractConnection -from kcidb.db.misc import NotFound +from kcidb.db.misc import NotFound, NoTimestamps from kcidb.db.bigquery.schema import validate_json_obj_list # We'll manage for now, pylint: disable=too-many-lines @@ -749,7 +749,7 @@ def _unpack_node(cls, node, drop_null=True): node[key] = cls._unpack_node(value) return node - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. @@ -758,11 +758,25 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the I/O version of the database schema, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 @@ -771,14 +785,34 @@ def dump_iter(self, objects_per_report, with_metadata): obj_num = 0 data = self.io.new() for obj_list_name, table_schema in self.TABLE_MAP.items(): - query_string = \ - "SELECT " + \ + ts_field = next( + (f for f in table_schema if f.name == "_timestamp"), + None + ) + if (after or until) and not ts_field: + raise NoTimestamps( + f"Table {obj_list_name!r} has no {ts_field.name!r} column" + ) + + query_string = ( + "SELECT " + ", ".join( f"`{f.name}`" for f in table_schema if with_metadata or f.name[0] != '_' - ) + \ - f" FROM `{obj_list_name}`" - query_job = self.conn.query_create(query_string) + ) + + f" FROM `{obj_list_name}`" + + (( + " WHERE " + " AND ".join( + f"{ts_field.name} {op} ?" + for op, v in ((">", after), ("<=", until)) if v + ) + ) if (after or until) else "") + ) + query_parameters = [ + bigquery.ScalarQueryParameter(None, ts_field.field_type, v) + for v in (after, until) if v + ] + query_job = self.conn.query_create(query_string, query_parameters) obj_list = None for row in query_job: if obj_list is None: diff --git a/kcidb/db/misc.py b/kcidb/db/misc.py index 0e0d6ce2..a7738bdb 100644 --- a/kcidb/db/misc.py +++ b/kcidb/db/misc.py @@ -33,6 +33,10 @@ class UnsupportedSchema(Error): """Database schema version is not supported""" +class NoTimestamps(Error): + """Row timestamps required for the operation don't exist""" + + def format_spec_list(specs): """ Format a database specification list string out of a list of specification diff --git a/kcidb/db/mux.py b/kcidb/db/mux.py index 97e3bc25..8c1332fa 100644 --- a/kcidb/db/mux.py +++ b/kcidb/db/mux.py @@ -355,7 +355,7 @@ def upgrade(self, target_version): driver.upgrade(driver_version) self.version = version - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the first database in object number-limited chunks. @@ -364,14 +364,28 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the current I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ yield from self.drivers[0].dump_iter(objects_per_report, - with_metadata) + with_metadata, after, until) # We can live with this for now, pylint: disable=too-many-arguments # Or if you prefer, pylint: disable=too-many-positional-arguments diff --git a/kcidb/db/null.py b/kcidb/db/null.py index fa1d71db..8e9c2fe9 100644 --- a/kcidb/db/null.py +++ b/kcidb/db/null.py @@ -129,7 +129,7 @@ def get_last_modified(self): """ return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. @@ -138,13 +138,30 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the current I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ del objects_per_report + del with_metadata + del after + del until yield io.SCHEMA.new() # We can live with this for now, pylint: disable=too-many-arguments diff --git a/kcidb/db/postgresql/schema.py b/kcidb/db/postgresql/schema.py index 414819a8..ee9cfcd0 100644 --- a/kcidb/db/postgresql/schema.py +++ b/kcidb/db/postgresql/schema.py @@ -237,7 +237,7 @@ def __init__(self, constraint=None, class Table(_SQLTable): """A table schema""" - def __init__(self, columns, primary_key=None): + def __init__(self, columns, primary_key=None, timestamp=None): """ Initialize the table schema. @@ -249,9 +249,12 @@ def __init__(self, columns, primary_key=None): primary_key: A list of names of columns constituting the primary key. None or an empty list to use the column with the PRIMARY_KEY constraint instead. + timestamp The name of the column containing last row change + timestamp. Must exist in "columns". """ # TODO: Switch to hardcoding "_" key_sep in base class - super().__init__("%s", columns, primary_key, key_sep="_") + super().__init__("%s", columns, primary_key, key_sep="_", + timestamp=timestamp) class Index(_SQLIndex): diff --git a/kcidb/db/postgresql/v04_00.py b/kcidb/db/postgresql/v04_00.py index 97ffc842..aecad66a 100644 --- a/kcidb/db/postgresql/v04_00.py +++ b/kcidb/db/postgresql/v04_00.py @@ -569,7 +569,7 @@ def empty(self): for name, schema in self.TABLES.items(): cursor.execute(schema.format_delete(name)) - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. @@ -578,11 +578,25 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the I/O version of the database schema, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 @@ -593,8 +607,9 @@ def dump_iter(self, objects_per_report, with_metadata): with self.conn, self.conn.cursor() as cursor: for table_name, table_schema in self.TABLES.items(): obj_list = None - cursor.execute(table_schema.format_dump(table_name, - with_metadata)) + cursor.execute(*table_schema.format_dump(table_name, + with_metadata, + after, until)) for obj in table_schema.unpack_iter(cursor, with_metadata): if obj_list is None: obj_list = [] diff --git a/kcidb/db/postgresql/v04_02.py b/kcidb/db/postgresql/v04_02.py index 7d74931d..a07ad741 100644 --- a/kcidb/db/postgresql/v04_02.py +++ b/kcidb/db/postgresql/v04_02.py @@ -31,7 +31,8 @@ class Schema(PreviousSchema): TABLES_ARGS = { name: merge_dicts( args, - columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"]) + columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"]), + timestamp="_timestamp", ) for name, args in PreviousSchema.TABLES_ARGS.items() } diff --git a/kcidb/db/postgresql/v04_05.py b/kcidb/db/postgresql/v04_05.py index 4f234f2a..e3949d3a 100644 --- a/kcidb/db/postgresql/v04_05.py +++ b/kcidb/db/postgresql/v04_05.py @@ -52,11 +52,12 @@ class Schema(PreviousSchema): # For use by descendants TABLES_ARGS = merge_dicts( PreviousSchema.TABLES_ARGS, - tests=dict( + tests=merge_dicts( + PreviousSchema.TABLES_ARGS["tests"], columns=merge_dicts( PreviousSchema.TABLES_ARGS["tests"]["columns"], status=Column("STATUS"), - ) + ), ), ) diff --git a/kcidb/db/schematic.py b/kcidb/db/schematic.py index 5d1eb208..870d3def 100644 --- a/kcidb/db/schematic.py +++ b/kcidb/db/schematic.py @@ -274,7 +274,7 @@ def purge(self, before): return False @abstractmethod - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. The database must be initialized. @@ -284,15 +284,33 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the current database schema's I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo # We can live with this for now, pylint: disable=too-many-arguments # Or if you prefer, pylint: disable=too-many-positional-arguments @@ -605,7 +623,7 @@ def upgrade(self, target_version): self.conn.set_schema_version(schema.version) self.schema = schema(self.conn) - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. The database must be initialized. @@ -615,17 +633,36 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the schema's I/O schema version, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo assert self.is_initialized() - return self.schema.dump_iter(objects_per_report, with_metadata) + return self.schema.dump_iter(objects_per_report, with_metadata, + after, until) # We can live with this for now, pylint: disable=too-many-arguments # Or if you prefer, pylint: disable=too-many-positional-arguments diff --git a/kcidb/db/sql/schema.py b/kcidb/db/sql/schema.py index 249996a8..7cc2d1d4 100644 --- a/kcidb/db/sql/schema.py +++ b/kcidb/db/sql/schema.py @@ -2,8 +2,10 @@ Kernel CI report database - generic SQL schema definitions """ +import datetime import re from enum import Enum +from kcidb.db.misc import NoTimestamps class Constraint(Enum): @@ -149,7 +151,10 @@ def format_def(self): class Table: """A table schema""" - def __init__(self, placeholder, columns, primary_key=None, key_sep="_"): + # It's OK, pylint: disable=too-many-arguments + # Or, if you wish, pylint: disable=too-many-positional-arguments + def __init__(self, placeholder, columns, primary_key=None, key_sep="_", + timestamp=None): """ Initialize the table schema. @@ -164,7 +169,10 @@ def __init__(self, placeholder, columns, primary_key=None, key_sep="_"): primary key. None or an empty list to use the column with the PRIMARY_KEY constraint instead. key_sep: String used to replace dots in column names ("key" - separator) + separator). + timestamp The name of the column containing last row change + timestamp. Must exist in "columns". None, if the + table has no such column. """ assert isinstance(placeholder, str) and str assert isinstance(columns, dict) @@ -173,6 +181,8 @@ def __init__(self, placeholder, columns, primary_key=None, key_sep="_"): isinstance(column, Column) for name, column in columns.items() ) + assert timestamp is None or timestamp in columns + # The number of columns with PRIMARY_KEY constraint set primary_key_constraints = sum( c.constraint == Constraint.PRIMARY_KEY for c in columns.values() @@ -199,6 +209,8 @@ def __init__(self, placeholder, columns, primary_key=None, key_sep="_"): } # A list of columns in the explicitly-specified primary key self.primary_key = [self.columns[name] for name in primary_key] + # The timestamp table column + self.timestamp = timestamp and self.columns[timestamp] def format_create(self, name): """ @@ -273,7 +285,7 @@ def format_insert(self, name, prio_db, with_metadata): c not in self.primary_key ) - def format_dump(self, name, with_metadata): + def format_dump(self, name, with_metadata, after, until): """ Format the "SELECT" command for dumping the table contents, returning data suitable for unpacking with unpack*() methods. @@ -282,18 +294,54 @@ def format_dump(self, name, with_metadata): name: The name of the target table of the command. with_metadata: True, if metadata fields should be dumped too. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to be + excluded from the dump should've arrived. The data + after this time will be dumped. Can be None to + have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to be + dumped should've arrived. The data after this time + will not be dumped. Can be None to have no limit + on newer data. Returns: - The formatted "SELECT" command. + The formatted "SELECT" command, and its parameter container. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(name, str) assert isinstance(with_metadata, bool) - return "SELECT " + \ + assert after is None or \ + isinstance(after, datetime.datetime) and after.tzinfo + assert until is None or \ + isinstance(until, datetime.datetime) and until.tzinfo + + if (after or until) and not self.timestamp: + raise NoTimestamps("Table has no timestamp column") + + return ( + "SELECT " + ", ".join( c.name for c in self.columns.values() if with_metadata or not c.schema.metadata_expr - ) + \ - f" FROM {name}" + ) + + f" FROM {name}" + + (( + " WHERE " + " AND ".join( + f"{self.timestamp.name} {op} {self.placeholder}" + for op, v in ((">", after), ("<=", until)) if v + ) + ) if (after or until) else ""), + [ + self.timestamp.schema.pack( + v.isoformat(timespec='microseconds') + ) + for v in (after, until) if v + ] + ) def format_delete(self, name): """ diff --git a/kcidb/db/sqlite/schema.py b/kcidb/db/sqlite/schema.py index 4f345e95..78a6cb4d 100644 --- a/kcidb/db/sqlite/schema.py +++ b/kcidb/db/sqlite/schema.py @@ -174,7 +174,7 @@ def __init__(self, constraint=None, class Table(_SQLTable): """A table schema""" - def __init__(self, columns, primary_key=None): + def __init__(self, columns, primary_key=None, timestamp=None): """ Initialize the table schema. @@ -186,9 +186,12 @@ def __init__(self, columns, primary_key=None): primary_key: A list of names of columns constituting the primary key. None or an empty list to use the column with the PRIMARY_KEY constraint instead. + timestamp The name of the column containing last row change + timestamp. Must exist in "columns". """ # TODO: Switch to using "_" key_sep, and hardcoding it in base class - super().__init__("?", columns, primary_key, key_sep=".") + super().__init__("?", columns, primary_key, key_sep=".", + timestamp=timestamp) def format_create(self, name): """ diff --git a/kcidb/db/sqlite/v04_00.py b/kcidb/db/sqlite/v04_00.py index 689551bd..398e429c 100644 --- a/kcidb/db/sqlite/v04_00.py +++ b/kcidb/db/sqlite/v04_00.py @@ -497,7 +497,7 @@ def empty(self): finally: cursor.close() - def dump_iter(self, objects_per_report, with_metadata): + def dump_iter(self, objects_per_report, with_metadata, after, until): """ Dump all data from the database in object number-limited chunks. @@ -506,11 +506,25 @@ def dump_iter(self, objects_per_report, with_metadata): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. + after: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be excluded from the dump should've arrived. + The data after this time will be dumped. + Can be None to have no limit on older data. + until: An "aware" datetime.datetime object specifying + the latest (database server) time the data to + be dumped should've arrived. + The data after this time will not be dumped. + Can be None to have no limit on newer data. Returns: An iterator returning report JSON data adhering to the I/O version of the database schema, each containing at most the specified number of objects. + + Raises: + NoTimestamps - Either "after" or "until" are not None, and + the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) assert objects_per_report >= 0 @@ -523,7 +537,8 @@ def dump_iter(self, objects_per_report, with_metadata): try: for table_name, table_schema in self.TABLES.items(): result = cursor.execute( - table_schema.format_dump(table_name, with_metadata) + *table_schema.format_dump(table_name, with_metadata, + after, until) ) obj_list = None for obj in table_schema.unpack_iter(result, diff --git a/kcidb/db/sqlite/v04_02.py b/kcidb/db/sqlite/v04_02.py index 407520d4..45d699c7 100644 --- a/kcidb/db/sqlite/v04_02.py +++ b/kcidb/db/sqlite/v04_02.py @@ -31,7 +31,8 @@ class Schema(PreviousSchema): TABLES_ARGS = { name: merge_dicts( args, - columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"]) + columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"]), + timestamp="_timestamp", ) for name, args in PreviousSchema.TABLES_ARGS.items() } diff --git a/kcidb/test_db.py b/kcidb/test_db.py index cd8db7f4..dbe5264e 100644 --- a/kcidb/test_db.py +++ b/kcidb/test_db.py @@ -1994,3 +1994,92 @@ def test_purge(empty_database): assert client.dump() == kcidb.io.SCHEMA.new() else: assert not client.purge(None) + + +def test_dump_limits(empty_database): + """Test the dump() method observes time limits""" + # It's OK, pylint: disable=too-many-locals + client = empty_database + + # Check we can do a basic dump + io_schema = client.get_schema()[1] + assert client.dump(with_metadata=False) == io_schema.new() + + drivers = [*client.driver.drivers] \ + if isinstance(client.driver, kcidb.db.mux.Driver) \ + else [client.driver] + + # If this is a database and schema which *should* support purging + if all( + isinstance(driver, + (kcidb.db.bigquery.Driver, + kcidb.db.postgresql.Driver, + kcidb.db.sqlite.Driver)) and + driver.get_schema()[0] >= (4, 2) + for driver in drivers + ): + io_data_1 = deepcopy(COMPREHENSIVE_IO_DATA) + io_data_2 = deepcopy(COMPREHENSIVE_IO_DATA) + for obj_list_name in kcidb.io.SCHEMA.graph: + if obj_list_name: + for obj in io_data_2[obj_list_name]: + obj["id"] = "origin:2" + + client.load(io_data_1) + assert client.dump(with_metadata=False) == io_data_1 + time.sleep(1) + dump = client.dump(with_metadata=True) + first_load_timestamps = [ + dateutil.parser.isoparse(obj["_timestamp"]) + for obj_list_name in io_schema.id_fields + for obj in dump[obj_list_name] + ] + before_first_load = min(first_load_timestamps) - \ + datetime.timedelta(microseconds=1) + latest_first_load = max(first_load_timestamps) + time.sleep(1) + client.load(io_data_2) + dump = client.dump(with_metadata=True) + second_load_timestamps = [ + dateutil.parser.isoparse(obj["_timestamp"]) + for obj_list_name in io_schema.id_fields + for obj in dump[obj_list_name] + ] + latest_second_load = max(second_load_timestamps) + + # Check both datasets are in the database + # regardless of the object order + for dump in [ + client.dump(with_metadata=False), + client.dump(with_metadata=False, + after=None, + until=None), + client.dump(with_metadata=False, + after=before_first_load, + until=latest_second_load), + ]: + for io_data in (io_data_1, io_data_2): + for obj_list_name in io_schema.id_fields: + assert obj_list_name in dump + for obj in io_data[obj_list_name]: + assert obj in dump[obj_list_name] + + assert client.dump(with_metadata=False, + until=latest_first_load) == io_data_1 + assert client.dump(with_metadata=False, + after=latest_first_load) == io_data_2 + assert client.dump(with_metadata=False, + until=before_first_load) == io_schema.new() + assert client.dump(with_metadata=False, + after=latest_second_load) == io_schema.new() + time.sleep(1) + assert client.purge(client.get_current_time()) + assert client.dump() == io_schema.new() + else: + now = datetime.datetime.now(datetime.timezone.utc) + with pytest.raises(AssertionError): + client.dump(after=now) + with pytest.raises(AssertionError): + client.dump(until=now) + with pytest.raises(AssertionError): + client.dump(after=now, until=now)