From a37bff6392f2c63952143d74a3daa97e21de50f3 Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Fri, 5 Jul 2024 18:51:56 +0300 Subject: [PATCH] tests: Make no assumption on KCIDB_DATABASE value --- test_main.py | 69 ++++++++++++++++++++++++++-------------------------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/test_main.py b/test_main.py index 5e8f67ee..37c2c7f8 100644 --- a/test_main.py +++ b/test_main.py @@ -230,6 +230,7 @@ def test_url_caching(empty_deployment): def test_purge_db(empty_deployment): """Check kcidb_purge_db() works correctly""" + # It's OK, pylint: disable=too-many-locals # Make empty_deployment appear used to silence pylint warning assert empty_deployment is None @@ -309,40 +310,40 @@ def filter_test_data(data): # Merge the before and after data data = kcidb.io.SCHEMA.merge(data_before, [data_after]) - # Load the merged data into the database - client = kcidb.db.Client(os.environ["KCIDB_DATABASE"]) - client.load(data, with_metadata=True) - dump = filter_test_data(client.dump()) - for obj_list_name in kcidb.io.SCHEMA.graph: - if obj_list_name: - assert len(dump.get(obj_list_name, [])) == 2, \ - f"Invalid number of {obj_list_name}" - - # Trigger the purge at the boundary - kcidb.mq.JSONPublisher( + # For each type of database, purging expectation, and client + clients = dict( + op=(True, kcidb.db.Client(os.environ["KCIDB_OPERATIONAL_DATABASE"])), + sm=(True, kcidb.db.Client(os.environ["KCIDB_SAMPLE_DATABASE"])), + ar=(False, kcidb.db.Client(os.environ["KCIDB_ARCHIVE_DATABASE"])), + ) + publisher = kcidb.mq.JSONPublisher( os.environ["GCP_PROJECT"], os.environ["KCIDB_PURGE_DB_TRIGGER_TOPIC"] - ).publish(dict(database="op", timedelta=dict(stamp=str_after))) - - # Wait and check for the purge - deadline = datetime.now(timezone.utc) + timedelta(minutes=5) - while datetime.now(timezone.utc) < deadline: - time.sleep(5) + ) + for database, (purging, client) in clients.items(): + client.load(data, with_metadata=True) dump = filter_test_data(client.dump()) - # If everything was purged - # NOTE: For some reason we're hitting incomplete purges sometimes - if all( - len(dump.get(n, [])) == 1 - for n in kcidb.io.SCHEMA.graph - if n - ): - break - assert dump == data_after - - # Make sure we were getting the operational DB dump - op_client = kcidb.db.Client(os.environ["KCIDB_OPERATIONAL_DATABASE"]) - assert dump == filter_test_data(op_client.dump()) - - # Make sure the archive database is still intact - ar_client = kcidb.db.Client(os.environ["KCIDB_ARCHIVE_DATABASE"]) - assert filter_test_data(ar_client.dump()) == data + for obj_list_name in kcidb.io.SCHEMA.graph: + if obj_list_name: + assert len(dump.get(obj_list_name, [])) == 2, \ + f"Invalid number of {obj_list_name}" + + # Trigger the purge at the boundary + publisher.publish( + dict(database=database, timedelta=dict(stamp=str_after)) + ) + + # Wait and check for the purge + deadline = datetime.now(timezone.utc) + timedelta(minutes=5) + while datetime.now(timezone.utc) < deadline: + time.sleep(5) + dump = filter_test_data(client.dump()) + # If everything was purged + # NOTE: For some reason we're hitting incomplete purges sometimes + if all( + len(dump.get(n, [])) == 1 + for n in kcidb.io.SCHEMA.graph + if n + ): + break + assert dump == (data_after if purging else data)