diff --git a/cloud b/cloud index ba1a4f2b..b4231e14 100755 --- a/cloud +++ b/cloud @@ -112,6 +112,7 @@ function execute_command() { declare -r load_queue_trigger_topic="${prefix}load_queue_trigger" declare -r cache_bucket_name="${project}_${prefix}cache" declare -r pick_notifications_trigger_topic="${prefix}pick_notifications_trigger" + declare -r purge_op_db_trigger_topic="${prefix}purge_op_db_trigger" declare -r cache_redirect_function_name="cache_redirect" declare cache_redirector_url="https://${CLOUD_FUNCTION_REGION}" declare cache_redirector_url+="-${project}.cloudfunctions.net/" @@ -255,6 +256,7 @@ function execute_command() { --new-load-subscription="$new_load_subscription" --updated-publish="$updated_publish" --updated-topic="$updated_topic" + --purge-op-db-trigger-topic="$purge_op_db_trigger_topic" --updated-urls-topic="$updated_urls_topic" --spool-collection-path="$spool_collection_path" --extra-cc="$extra_cc" @@ -310,6 +312,7 @@ function execute_command() { --updated-debug-subscription="$updated_debug_subscription" \ --pick-notifications-trigger-topic \ "$pick_notifications_trigger_topic" \ + --purge-op-db-trigger-topic "$purge_op_db_trigger_topic" \ --smtp-topic="$smtp_topic" \ --smtp-subscription="$smtp_subscription" sections_run "$sections" firestore_deploy "$project" @@ -322,6 +325,7 @@ function execute_command() { --load-queue-trigger-topic="$load_queue_trigger_topic" \ --pick-notifications-trigger-topic \ "$pick_notifications_trigger_topic" \ + --purge-op-db-trigger-topic "$purge_op_db_trigger_topic" \ --updated-urls-topic="$updated_urls_topic" \ --updated-topic="$updated_topic" \ --cache-redirect-function-name="$cache_redirect_function_name" \ @@ -331,7 +335,8 @@ function execute_command() { --project="$project" \ --prefix="$prefix" \ --load-queue-trigger-topic="$load_queue_trigger_topic" \ - --pick-notifications-trigger-topic="$pick_notifications_trigger_topic" + --pick-notifications-trigger-topic="$pick_notifications_trigger_topic" \ + --purge-op-db-trigger-topic="$purge_op_db_trigger_topic" sections_run "$sections" submitters_deploy \ "$project" "$new_topic" "${submitters[@]}" elif [ "$command" == "shell" ]; then @@ -361,6 +366,7 @@ function execute_command() { --load-queue-trigger-topic="$load_queue_trigger_topic" \ --pick-notifications-trigger-topic \ "$pick_notifications_trigger_topic" \ + --purge-op-db-trigger-topic "$purge_op_db_trigger_topic" \ --new-topic="$new_topic" \ --new-load-subscription="$new_load_subscription" \ --new-debug-subscription="$new_debug_subscription" \ diff --git a/kcidb/cloud/cloud_functions.sh b/kcidb/cloud/cloud_functions.sh index a2dfe3dc..397a21ba 100644 --- a/kcidb/cloud/cloud_functions.sh +++ b/kcidb/cloud/cloud_functions.sh @@ -16,6 +16,7 @@ declare _CLOUD_FUNCTIONS_SH= # --new-topic=NAME --new-load-subscription=NAME # --updated-publish=true|false # --updated-topic=NAME +# --purge-op-db-trigger-topic=NAME # --updated-urls-topic=NAME # --cache-bucket-name=NAME # --cache-redirector-url=URL @@ -38,6 +39,7 @@ function cloud_functions_env() { heavy_asserts \ new_topic new_load_subscription \ updated_publish updated_topic \ + purge_op_db_trigger_topic \ updated_urls_topic \ spool_collection_path \ extra_cc \ @@ -70,6 +72,7 @@ function cloud_functions_env() { [KCIDB_CLEAN_TEST_DATABASES]="$clean_test_databases" [KCIDB_EMPTY_TEST_DATABASES]="$empty_test_databases" [KCIDB_UPDATED_QUEUE_TOPIC]="$updated_topic" + [KCIDB_PURGE_OP_DB_TRIGGER_TOPIC]="$purge_op_db_trigger_topic" [KCIDB_UPDATED_URLS_TOPIC]="$updated_urls_topic" [KCIDB_SELECTED_SUBSCRIPTIONS]="" [KCIDB_SPOOL_COLLECTION_PATH]="$spool_collection_path" @@ -123,6 +126,7 @@ function cloud_functions_env() { # --project=NAME --prefix=PREFIX --source=PATH # --load-queue-trigger-topic=NAME # --pick-notifications-trigger-topic=NAME +# --purge-op-db-trigger-topic=NAME # --updated-urls-topic=NAME # --updated-topic=NAME # --spool-collection-path=PATH @@ -133,6 +137,7 @@ function cloud_functions_deploy() { params="$(getopt_vars sections project prefix source \ load_queue_trigger_topic \ pick_notifications_trigger_topic \ + purge_op_db_trigger_topic \ updated_urls_topic \ updated_topic \ spool_collection_path \ @@ -152,6 +157,14 @@ function cloud_functions_deploy() { trigger_event+="document.create" declare trigger_resource="projects/$project/databases/(default)/documents/" trigger_resource+="${spool_collection_path}/{notification_id}" + cloud_function_deploy "$sections" "$source" "$project" "$prefix" \ + purge_op_db \ + --env-vars-file "$env_yaml_file" \ + --trigger-topic "${purge_op_db_trigger_topic}" \ + --memory 256MB \ + --max-instances=1 \ + --timeout 540 + cloud_function_deploy "$sections" "$source" "$project" "$prefix" \ pick_notifications \ --env-vars-file "$env_yaml_file" \ @@ -215,6 +228,8 @@ function cloud_functions_withdraw() { cache_redirect_function_name \ -- "$@")" eval "$params" + cloud_function_withdraw "$sections" "$project" "$prefix" \ + purge_op_db cloud_function_withdraw "$sections" "$project" "$prefix" \ pick_notifications cloud_function_withdraw "$sections" "$project" "$prefix" \ diff --git a/kcidb/cloud/pubsub.sh b/kcidb/cloud/pubsub.sh index 3560c635..5cfda7f4 100644 --- a/kcidb/cloud/pubsub.sh +++ b/kcidb/cloud/pubsub.sh @@ -134,6 +134,7 @@ function pubsub_subscription_withdraw() { # --updated-topic=NAME # --updated-debug-subscription=NAME # --pick-notifications-trigger-topic=NAME +# --purge-op-db-trigger-topic=NAME # --updated-urls-topic=NAME # --smtp-topic=NAME # --smtp-subscription=NAME @@ -147,6 +148,7 @@ function pubsub_deploy() { updated_topic \ updated_debug_subscription \ pick_notifications_trigger_topic \ + purge_op_db_trigger_topic \ updated_urls_topic \ smtp_topic smtp_subscription \ -- "$@")" @@ -167,6 +169,7 @@ function pubsub_deploy() { pubsub_subscription_deploy "$project" "${updated_topic}" \ "${updated_debug_subscription}" pubsub_topic_deploy "$project" "${pick_notifications_trigger_topic}" + pubsub_topic_deploy "$project" "${purge_op_db_trigger_topic}" pubsub_topic_deploy "$project" "${updated_urls_topic}" if [ -n "$smtp_topic" ]; then pubsub_topic_deploy "$project" "$smtp_topic" @@ -179,6 +182,7 @@ function pubsub_deploy() { # Args: --project=NAME # --load-queue-trigger-topic=NAME # --pick-notifications-trigger-topic=NAME +# --purge-op-db-trigger-topic=NAME # --updated-urls-topic=NAME # --new-topic=NAME # --new-load-subscription=NAME @@ -192,6 +196,7 @@ function pubsub_withdraw() { params="$(getopt_vars project \ load_queue_trigger_topic \ pick_notifications_trigger_topic \ + purge_op_db_trigger_topic \ updated_urls_topic \ new_topic \ new_load_subscription \ @@ -213,6 +218,7 @@ function pubsub_withdraw() { pubsub_topic_withdraw "$project" "$load_queue_trigger_topic" pubsub_topic_withdraw "$project" "$pick_notifications_trigger_topic" pubsub_topic_withdraw "$project" "$updated_urls_topic" + pubsub_topic_withdraw "$project" "$purge_op_db_trigger_topic" } fi # _PUBSUB_SH diff --git a/kcidb/cloud/scheduler.sh b/kcidb/cloud/scheduler.sh index 01b33629..40e217a7 100644 --- a/kcidb/cloud/scheduler.sh +++ b/kcidb/cloud/scheduler.sh @@ -66,12 +66,14 @@ function scheduler_job_withdraw() { # --prefix=STRING # --load-queue-trigger-topic=NAME # --pick-notifications-trigger-topic=NAME +# --purge-op-db-trigger-topic=NAME function scheduler_deploy() { declare params params="$(getopt_vars project \ prefix \ load_queue_trigger_topic \ pick_notifications_trigger_topic \ + purge_op_db_trigger_topic \ -- "$@")" eval "$params" # Deploy the jobs @@ -80,6 +82,9 @@ function scheduler_deploy() { scheduler_job_pubsub_deploy "$project" "${prefix}pick_notifications_trigger" \ "$pick_notifications_trigger_topic" \ '*/10 * * * *' '{}' + scheduler_job_pubsub_deploy "$project" "${prefix}purge_op_db_trigger" \ + "$purge_op_db_trigger_topic" \ + '0 6 * * MON' '{"delta": {"months": 6}}' } # Withdraw from the scheduler @@ -89,6 +94,7 @@ function scheduler_withdraw() { declare -r prefix="$1"; shift scheduler_job_withdraw "$project" "${prefix}load_queue_trigger" scheduler_job_withdraw "$project" "${prefix}pick_notifications_trigger" + scheduler_job_withdraw "$project" "${prefix}purge_op_db_trigger" } fi # _SCHEDULER_SH diff --git a/kcidb/cloud/sections.sh b/kcidb/cloud/sections.sh index 39f0d951..a6e4f160 100644 --- a/kcidb/cloud/sections.sh +++ b/kcidb/cloud/sections.sh @@ -14,6 +14,7 @@ declare -A -r SECTIONS=( ["secrets"]="Secrets" ["firestore"]="Firestore database" ["storage"]="Google cloud storage" + ["cloud_functions.purge_op_db"]="Cloud Functions: kcidb_purge_op_db()" ["cloud_functions.pick_notifications"]="Cloud Functions: kcidb_pick_notifications()" ["cloud_functions.send_notification"]="Cloud Functions: kcidb_send_notification()" ["cloud_functions.spool_notifications"]="Cloud Functions: kcidb_spool_notifications()" diff --git a/kcidb/misc.py b/kcidb/misc.py index 25ee5c34..204f571f 100644 --- a/kcidb/misc.py +++ b/kcidb/misc.py @@ -9,10 +9,14 @@ import argparse import logging import json +import datetime from textwrap import indent from importlib import metadata +import dateutil +import dateutil.relativedelta import dateutil.parser from google.cloud import secretmanager +import jsonschema import jq import kcidb.io as io @@ -482,3 +486,72 @@ def isliced(iterable, size): if not iterator_slice: break yield iterator_slice + + +def parse_timedelta_json(data, stamp): + """ + Parse JSON data for a time delta: an optional timestamp, and an optional + delta, but at least one of them must be present. If the timestamp is not + present the "stamp" is used instead. The delta can only be positive and is + subtracted from the timestamp. The timestamp is rounded down to the + precision of the delta, based on the presence of its components in the + JSON. + + Args: + data: The JSON data to parse. + stamp: The (aware) timestamp to use, if not present in the data. + """ + # Recognized time components, smaller to larger, and their minimums + components_min = dict( + microseconds=0, + seconds=0, + minutes=0, + hours=0, + days=1, + months=1, + years=1, + ) + # The timedelta schema + schema = dict( + type="object", + properties=dict( + delta=dict( + type="object", + properties={ + # Delta is always positive and is subtracted to reduce + # chance of forgetting the sign and going to the future, + # wiping everything + c: dict(type="integer", minimum=0) + for c in components_min + }, + anyOf=[dict(required=[c]) for c in components_min], + additionalProperties=False, + ), + stamp=dict(type="string", format="date-time",), + ), + anyOf=[{"required": ["delta"]}, {"required": ["stamp"]},], + additionalProperties=False, + ) + + assert isinstance(stamp, datetime.datetime) and stamp.tzinfo + jsonschema.validate( + instance=data, schema=schema, + format_checker=jsonschema.Draft7Validator.FORMAT_CHECKER + ) + + # Use the base timestamp from the data, if present + if "stamp" in data: + stamp = dateutil.parser.isoparse(data["stamp"]) + + # Retrieve and apply the timedelta from input, if any + if "delta" in data: + delta = data["delta"].copy() + # Round the timestamp down to delta precision and subtract delta + for component, minimum in components_min.items(): + if component in delta: + break + # Singular means replace with value + delta[component[:-1]] = minimum + stamp = stamp - dateutil.relativedelta.relativedelta(**delta) + + return stamp diff --git a/kcidb/test_misc.py b/kcidb/test_misc.py new file mode 100644 index 00000000..e6d172dd --- /dev/null +++ b/kcidb/test_misc.py @@ -0,0 +1,88 @@ +"""kcdib.misc module tests""" + +import datetime +from pytest import raises +from jsonschema.exceptions import ValidationError +import kcidb.misc + + +def test_parse_timedelta_json(): + """Check kcidb-tests-validate works""" + f = kcidb.misc.parse_timedelta_json + min_stamp = datetime.datetime(1, 1, 1, 0, 0, 0, 0, + tzinfo=datetime.timezone.utc) + stamp = datetime.datetime(1, 2, 3, 4, 5, 6, 7, + tzinfo=datetime.timezone.utc) + stamp_str = stamp.isoformat(timespec="microseconds") + with raises(ValidationError): + f([], stamp) + with raises(ValidationError): + f({}, stamp) + with raises(ValidationError): + f(dict(stamp=10), stamp) + with raises(ValidationError): + f(dict(stamp=""), stamp) + with raises(ValidationError): + f(dict(delta=10), stamp) + with raises(ValidationError): + f(dict(delta=""), stamp) + with raises(ValidationError): + f(dict(delta={}), stamp) + with raises(ValidationError): + f(dict(delta=dict(seconds=-1)), stamp) + + assert f(dict(stamp=stamp_str), min_stamp) == stamp + assert f(dict(delta=dict(microseconds=0), stamp=stamp_str), + min_stamp) == stamp + + assert f(dict(delta=dict(microseconds=0)), stamp) == stamp + assert f(dict(delta=dict(seconds=0, microseconds=0)), stamp) == stamp + assert f(dict(delta=dict(minutes=0, seconds=0, microseconds=0)), + stamp) == stamp + assert f(dict(delta=dict(hours=0, minutes=0, seconds=0, microseconds=0)), + stamp) == stamp + assert f(dict(delta=dict(days=0, + hours=0, minutes=0, seconds=0, microseconds=0)), + stamp) == stamp + assert f(dict(delta=dict(months=0, days=0, + hours=0, minutes=0, seconds=0, microseconds=0)), + stamp) == stamp + assert f(dict(delta=dict(years=0, months=0, days=0, + hours=0, minutes=0, seconds=0, microseconds=0)), + stamp) == stamp + assert f(dict(delta=dict(seconds=0)), stamp) == \ + datetime.datetime(1, 2, 3, 4, 5, 6, tzinfo=datetime.timezone.utc) + assert f(dict(delta=dict(minutes=0)), stamp) == \ + datetime.datetime(1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc) + assert f(dict(delta=dict(hours=0)), stamp) == \ + datetime.datetime(1, 2, 3, 4, tzinfo=datetime.timezone.utc) + assert f(dict(delta=dict(days=0)), stamp) == \ + datetime.datetime(1, 2, 3, tzinfo=datetime.timezone.utc) + assert f(dict(delta=dict(months=0)), stamp) == \ + datetime.datetime(1, 2, 1, tzinfo=datetime.timezone.utc) + assert f(dict(delta=dict(years=0)), stamp) == min_stamp + + assert f( + dict(delta=dict(years=0), stamp=stamp_str), + min_stamp + ) == min_stamp + assert f( + dict(delta=dict(years=0, months=1, days=2), stamp=stamp_str), + min_stamp + ) == min_stamp + + assert f( + dict(delta=dict(months=6)), + datetime.datetime(2023, 12, 15, 15, 52, 24, 204547, + tzinfo=datetime.timezone.utc) + ) == datetime.datetime(2023, 6, 1, tzinfo=datetime.timezone.utc) + + assert f( + dict(delta=dict(months=2)), + datetime.datetime(2023, 3, 31, 21, 11, 1, tzinfo=datetime.timezone.utc) + ) == datetime.datetime(2023, 1, 1, tzinfo=datetime.timezone.utc) + + assert f( + dict(delta=dict(months=3)), + datetime.datetime(2023, 4, 3, 2, 1, tzinfo=datetime.timezone.utc) + ) == datetime.datetime(2023, 1, 1, tzinfo=datetime.timezone.utc) diff --git a/main.py b/main.py index 6ce83f73..fb7c667f 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,7 @@ import atexit import tempfile import base64 +import json import datetime import logging import smtplib @@ -421,6 +422,27 @@ def kcidb_pick_notifications(data, context): spool_client.ack(notification_id) +def kcidb_purge_op_db(event, context): + """ + Purge data from the operational database, older than the optional delta + from the current (or specified) database timestamp, rounded to smallest + delta component. Require that either the delta or the timestamp are + present. + """ + # Parse the input JSON + string = base64.b64decode(event["data"]).decode() + data = json.loads(string) + + # Get the operational database client + client = get_db_client(OPERATIONAL_DATABASE) + + # Parse/calculate the cut-off timestamp + stamp = kcidb.misc.parse_timedelta_json(data, client.get_current_time()) + + # Purge the data + client.purge(stamp) + + def send_message(message): """ Send message via email. diff --git a/test_main.py b/test_main.py index d005b055..9371c5df 100644 --- a/test_main.py +++ b/test_main.py @@ -3,6 +3,8 @@ import os import subprocess import unittest +from copy import deepcopy +from datetime import datetime, timezone, timedelta from importlib import import_module from urllib.parse import quote import time @@ -222,3 +224,124 @@ def test_url_caching(empty_deployment): if check_url_in_cache(url_not_expected): raise AssertionError(f"Unexpected URL '{url_not_expected}' \ found in the cache") + + +def test_purge_op_db(empty_deployment): + """Check kcidb_purge_op_db() works correctly""" + + # Make empty_deployment appear used to silence pylint warning + assert empty_deployment is None + + # Use the current time to avoid deployment purge trigger + timestamp_before = datetime.now(timezone.utc) + str_before = timestamp_before.isoformat(timespec="microseconds") + + data_before = dict( + version=dict( + major=kcidb.io.SCHEMA.major, minor=kcidb.io.SCHEMA.minor + ), + checkouts=[dict( + id="origin:1", origin="origin", + _timestamp=str_before + )], + builds=[dict( + id="origin:1", origin="origin", checkout_id="origin:1", + _timestamp=str_before + )], + tests=[dict( + id="origin:1", origin="origin", build_id="origin:1", + _timestamp=str_before + )], + issues=[dict( + id="origin:1", origin="origin", version=1, + _timestamp=str_before + )], + incidents=[dict( + id="origin:1", origin="origin", + issue_id="origin:1", issue_version=1, + _timestamp=str_before + )], + ) + + timestamp_after = timestamp_before + timedelta(microseconds=1) + str_after = timestamp_after.isoformat(timespec="microseconds") + + data_after = dict( + version=dict( + major=kcidb.io.SCHEMA.major, minor=kcidb.io.SCHEMA.minor + ), + checkouts=[dict( + id="origin:2", origin="origin", + _timestamp=str_after + )], + builds=[dict( + id="origin:2", origin="origin", checkout_id="origin:2", + _timestamp=str_after + )], + tests=[dict( + id="origin:2", origin="origin", build_id="origin:2", + _timestamp=str_after + )], + issues=[dict( + id="origin:2", origin="origin", version=1, + _timestamp=str_after + )], + incidents=[dict( + id="origin:2", origin="origin", + issue_id="origin:2", issue_version=1, + _timestamp=str_after + )], + ) + + def filter_test_data(data): + """Filter objects created by this test from I/O data""" + return { + key: [ + deepcopy(obj) for obj in value + if obj.get("_timestamp") in (str_before, str_after) + ] if key and key in kcidb.io.SCHEMA.graph + else deepcopy(value) + for key, value in data.items() + } + + # Merge the before and after data + data = kcidb.io.SCHEMA.merge(data_before, [data_after]) + + # Load the merged data into the database + client = kcidb.db.Client(os.environ["KCIDB_DATABASE"]) + client.load(data, with_metadata=True) + dump = filter_test_data(client.dump()) + for obj_list_name in kcidb.io.SCHEMA.graph: + if obj_list_name: + assert len(dump.get(obj_list_name, [])) == 2, \ + f"Invalid number of {obj_list_name}" + + # Trigger the purge at the boundary + kcidb.mq.JSONPublisher( + os.environ["GCP_PROJECT"], + os.environ["KCIDB_PURGE_OP_DB_TRIGGER_TOPIC"] + ).publish(dict(stamp=str_after)) + + # Wait and check for the purge + deadline = datetime.now(timezone.utc) + timedelta(minutes=5) + while datetime.now(timezone.utc) < deadline: + dump = filter_test_data(client.dump()) + # If data has changed + if not all( + len(dump.get(n, [])) == 2 + for n in kcidb.io.SCHEMA.graph + if n + ): + assert dump == data_after + break + time.sleep(5) + else: + assert False, "Operational database purge timed out" + + # Make sure we were getting the operational DB dump + op_client = kcidb.db.Client(os.environ["KCIDB_OPERATIONAL_DATABASE"]) + assert dump == filter_test_data(op_client.dump()) + + # Make sure the archive database is still intact + ar_client = kcidb.db.Client(os.environ["KCIDB_ARCHIVE_DATABASE"]) + assert filter_test_data(ar_client.dump()) == data