Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support time range dump #595

Merged
merged 4 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 65 additions & 9 deletions kcidb/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from kcidb.db import abstract, schematic, mux, \
bigquery, postgresql, sqlite, json, null, misc # noqa: F401

# It's OK for now, pylint: disable=too-many-lines

# Module's logger
LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -151,9 +153,8 @@ def purge(self, before=None):

Args:
before: An "aware" datetime.datetime object specifying the
the earliest (database server) time the data to be
*preserved* should've arrived. Any other data will be
purged.
earliest (database server) time the data to be *preserved*
should've arrived. Any other data will be purged.
Can be None to have nothing removed. The latter can be
used to test if the database supports purging.

Expand Down Expand Up @@ -275,7 +276,8 @@ def upgrade(self, target_version=None):
"Target schema is older than the current schema"
self.driver.upgrade(target_version)

def dump_iter(self, objects_per_report=0, with_metadata=True):
def dump_iter(self, objects_per_report=0, with_metadata=True,
after=None, until=None):
"""
Dump all data from the database in object number-limited chunks.

Expand All @@ -284,37 +286,76 @@ def dump_iter(self, objects_per_report=0, with_metadata=True):
report data, or zero for no limit.
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.

Returns:
An iterator returning report JSON data adhering to the current I/O
schema version, each containing at most the specified number of
objects.

Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
assert self.is_initialized()
assert isinstance(objects_per_report, int)
assert objects_per_report >= 0
assert isinstance(with_metadata, bool)
assert after is None or \
isinstance(after, datetime.datetime) and after.tzinfo
assert until is None or \
isinstance(until, datetime.datetime) and until.tzinfo
assert self.is_initialized()
yield from self.driver.dump_iter(
objects_per_report=objects_per_report,
with_metadata=with_metadata
with_metadata=with_metadata,
after=after, until=until
)

def dump(self, with_metadata=True):
def dump(self, with_metadata=True, after=None, until=None):
"""
Dump all data from the database.

Args:
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.

Returns:
The JSON data from the database adhering to the current I/O schema
version.

Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
assert isinstance(with_metadata, bool)
assert after is None or \
isinstance(after, datetime.datetime) and after.tzinfo
assert until is None or \
isinstance(until, datetime.datetime) and until.tzinfo
assert self.is_initialized()
try:
return next(self.dump_iter(objects_per_report=0,
with_metadata=with_metadata))
with_metadata=with_metadata,
after=after, until=until))
except StopIteration:
return self.get_schema()[1].new()

Expand Down Expand Up @@ -777,13 +818,28 @@ def dump_main():
help='Do not dump metadata fields',
action='store_true'
)
parser.add_argument(
'--after',
metavar='AFTER',
type=kcidb.misc.iso_timestamp,
help="An ISO-8601 timestamp specifying the latest time the data to "
"be *excluded* from the dump should've arrived."
)
parser.add_argument(
'--until',
metavar='UNTIL',
type=kcidb.misc.iso_timestamp,
help="An ISO-8601 timestamp specifying the latest time the data to "
"be *included* into the dump should've arrived."
)
args = parser.parse_args()
client = Client(args.database)
if not client.is_initialized():
raise Exception(f"Database {args.database!r} is not initialized")
kcidb.misc.json_dump_stream(
client.dump_iter(objects_per_report=args.objects_per_report,
with_metadata=not args.without_metadata),
with_metadata=not args.without_metadata,
after=args.after, until=args.until),
sys.stdout, indent=args.indent, seq=args.seq_out
)

Expand Down
25 changes: 21 additions & 4 deletions kcidb/db/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,8 @@ def purge(self, before):

Args:
before: An "aware" datetime.datetime object specifying the
the earliest (database server) time the data to be
*preserved* should've arrived. Any other data will be
purged.
earliest (database server) time the data to be *preserved*
should've arrived. Any other data will be purged.
Can be None to have nothing removed. The latter can be
used to test if the database supports purging.

Expand Down Expand Up @@ -171,7 +170,7 @@ def upgrade(self, target_version):
"Target schema is older than the current schema"

@abstractmethod
def dump_iter(self, objects_per_report, with_metadata):
def dump_iter(self, objects_per_report, with_metadata, after, until):
"""
Dump all data from the database in object number-limited chunks.
The database must be initialized.
Expand All @@ -181,15 +180,33 @@ def dump_iter(self, objects_per_report, with_metadata):
report data, or zero for no limit.
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.

Returns:
An iterator returning report JSON data adhering to the current
database schema's I/O schema version, each containing at most the
specified number of objects.

Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
assert isinstance(objects_per_report, int)
assert objects_per_report >= 0
assert isinstance(with_metadata, bool)
assert after is None or \
isinstance(after, datetime.datetime) and after.tzinfo
assert until is None or \
isinstance(until, datetime.datetime) and until.tzinfo
assert self.is_initialized()

# No, it's not, pylint: disable=too-many-return-statements
Expand Down
48 changes: 41 additions & 7 deletions kcidb/db/bigquery/v04_00.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from kcidb.db.schematic import \
Schema as AbstractSchema, \
Connection as AbstractConnection
from kcidb.db.misc import NotFound
from kcidb.db.misc import NotFound, NoTimestamps
from kcidb.db.bigquery.schema import validate_json_obj_list

# We'll manage for now, pylint: disable=too-many-lines
Expand Down Expand Up @@ -749,7 +749,7 @@ def _unpack_node(cls, node, drop_null=True):
node[key] = cls._unpack_node(value)
return node

def dump_iter(self, objects_per_report, with_metadata):
def dump_iter(self, objects_per_report, with_metadata, after, until):
"""
Dump all data from the database in object number-limited chunks.

Expand All @@ -758,11 +758,25 @@ def dump_iter(self, objects_per_report, with_metadata):
report data, or zero for no limit.
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.

Returns:
An iterator returning report JSON data adhering to the I/O version
of the database schema, each containing at most the specified
number of objects.

Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
assert isinstance(objects_per_report, int)
assert objects_per_report >= 0
Expand All @@ -771,14 +785,34 @@ def dump_iter(self, objects_per_report, with_metadata):
obj_num = 0
data = self.io.new()
for obj_list_name, table_schema in self.TABLE_MAP.items():
query_string = \
"SELECT " + \
ts_field = next(
(f for f in table_schema if f.name == "_timestamp"),
None
)
if (after or until) and not ts_field:
raise NoTimestamps(
f"Table {obj_list_name!r} has no {ts_field.name!r} column"
)

query_string = (
"SELECT " +
", ".join(
f"`{f.name}`" for f in table_schema
if with_metadata or f.name[0] != '_'
) + \
f" FROM `{obj_list_name}`"
query_job = self.conn.query_create(query_string)
) +
f" FROM `{obj_list_name}`" +
((
" WHERE " + " AND ".join(
f"{ts_field.name} {op} ?"
for op, v in ((">", after), ("<=", until)) if v
)
) if (after or until) else "")
)
query_parameters = [
bigquery.ScalarQueryParameter(None, ts_field.field_type, v)
for v in (after, until) if v
]
query_job = self.conn.query_create(query_string, query_parameters)
obj_list = None
for row in query_job:
if obj_list is None:
Expand Down
5 changes: 2 additions & 3 deletions kcidb/db/bigquery/v04_02.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ def purge(self, before):

Args:
before: An "aware" datetime.datetime object specifying the
the earliest (database server) time the data to be
*preserved* should've arrived. Any other data will be
purged.
earliest (database server) time the data to be *preserved*
should've arrived. Any other data will be purged.
Can be None to have nothing removed. The latter can be
used to test if the database supports purging.

Expand Down
4 changes: 4 additions & 0 deletions kcidb/db/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class UnsupportedSchema(Error):
"""Database schema version is not supported"""


class NoTimestamps(Error):
"""Row timestamps required for the operation don't exist"""


def format_spec_list(specs):
"""
Format a database specification list string out of a list of specification
Expand Down
45 changes: 43 additions & 2 deletions kcidb/db/mux.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Kernel CI reporting database - multiplexing"""

import textwrap
import datetime
from abc import abstractmethod
import kcidb.io as io
import kcidb.db.misc
Expand Down Expand Up @@ -252,6 +253,32 @@ def empty(self):
for driver in self.drivers:
driver.empty()

def purge(self, before):
"""
Remove all the data from the database that arrived before the
specified time, if the database supports that.
The database must be initialized.

Args:
before: An "aware" datetime.datetime object specifying the
earliest (database server) time the data to be *preserved*
should've arrived. Any other data will be purged.
Can be None to have nothing removed. The latter can be
used to test if the database supports purging.

Returns:
True if the database supports purging, and the requested data was
purged. False if the database doesn't support purging.
"""
assert self.is_initialized()
assert before is None or \
isinstance(before, datetime.datetime) and before.tzinfo
purging_supported = all(d.purge(None) for d in self.drivers)
if before is not None and purging_supported:
for driver in self.drivers:
driver.purge(before)
return purging_supported

def get_current_time(self):
"""
Get the current time from the database server.
Expand Down Expand Up @@ -328,7 +355,7 @@ def upgrade(self, target_version):
driver.upgrade(driver_version)
self.version = version

def dump_iter(self, objects_per_report, with_metadata):
def dump_iter(self, objects_per_report, with_metadata, after, until):
"""
Dump all data from the first database in object number-limited chunks.

Expand All @@ -337,14 +364,28 @@ def dump_iter(self, objects_per_report, with_metadata):
report data, or zero for no limit.
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.

Returns:
An iterator returning report JSON data adhering to the current I/O
schema version, each containing at most the specified number of
objects.

Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
yield from self.drivers[0].dump_iter(objects_per_report,
with_metadata)
with_metadata, after, until)

# We can live with this for now, pylint: disable=too-many-arguments
# Or if you prefer, pylint: disable=too-many-positional-arguments
Expand Down
Loading