From 25e1b8378a49d4b9ef05c3c934554cd58e82d3e5 Mon Sep 17 00:00:00 2001 From: Olga Bulat Date: Mon, 13 Mar 2023 17:01:28 +0300 Subject: [PATCH 1/4] Use Python to group items by license to speed up the query Signed-off-by: Olga Bulat --- .../dags/maintenance/add_license_url.py | 110 +++++++++++------- 1 file changed, 69 insertions(+), 41 deletions(-) diff --git a/openverse_catalog/dags/maintenance/add_license_url.py b/openverse_catalog/dags/maintenance/add_license_url.py index c5d824426..e3760844b 100644 --- a/openverse_catalog/dags/maintenance/add_license_url.py +++ b/openverse_catalog/dags/maintenance/add_license_url.py @@ -9,6 +9,7 @@ the `meta_data` column are updated, the DAG will only run the first and the last step, logging the statistics. """ +import json import logging from datetime import timedelta from textwrap import dedent @@ -17,8 +18,9 @@ from airflow.models import DAG from airflow.models.abstractoperator import AbstractOperator from airflow.operators.python import BranchPythonOperator, PythonOperator +from airflow.utils.trigger_rule import TriggerRule from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID, XCOM_PULL_TEMPLATE -from common.licenses.constants import get_reverse_license_path_map +from common.licenses import get_license_info_from_license_pair from common.loader.sql import RETURN_ROW_COUNT from common.slack import send_message from common.sql import PostgresHook @@ -65,7 +67,7 @@ def get_statistics( return next_task -def update_license_url(postgres_conn_id: str, task: AbstractOperator) -> dict[str, int]: +def update_license_url(postgres_conn_id: str, task: AbstractOperator) -> str | None: """Add license_url to meta_data batching all records with the same license. :param task: automatically passed by Airflow, used to set the execution timeout :param postgres_conn_id: Postgres connection id @@ -76,60 +78,85 @@ def update_license_url(postgres_conn_id: str, task: AbstractOperator) -> dict[st postgres_conn_id=postgres_conn_id, default_statement_timeout=PostgresHook.get_execution_timeout(task), ) - license_map = get_reverse_license_path_map() - total_count = 0 - total_counts = {} - for license_items, path in license_map.items(): - license_name, license_version = license_items - logger.info(f"Processing {license_name} {license_version}, {license_items}.") - license_url = f"{base_url}{path}/" - - select_query = dedent( - f""" - SELECT identifier FROM image - WHERE ( - meta_data is NULL AND license = '{license_name}' - AND license_version = '{license_version}') - """ - ) - result = postgres.get_records(select_query) - - if not result: - logger.info(f"No records to update with {license_url}.") + select_query = dedent( + """ + SELECT identifier, license, license_version + FROM image WHERE meta_data IS NULL;""" + ) + records_with_null_in_metadata = postgres.get_records(select_query) + + # Dictionary with license pair as key and list of identifiers as value + records_to_update = {} + + for result in records_with_null_in_metadata: + identifier, license_, version = result + license_pair = f"{license_},{version}" + if license_pair not in records_to_update: + records_to_update[license_pair] = [identifier] + else: + records_to_update[license_pair].append(identifier) + + total_updated = 0 + updated_by_license = {} + + for license_pair, identifiers in records_to_update.items(): + license_, license_version = license_pair.split(",") + license_url = get_license_info_from_license_pair(license_, license_version)[-1] + if license_url is None: + logger.info( + f"No license pair {license_pair} in the license reverse path map." + ) continue - logger.info(f"{len(result)} records to update with {license_url}.") - license_url_col = {"license_url": license_url} - update_license_url_query = dedent( + logger.info(f"{len(identifiers):4} items will be updated with {license_url}") + license_url_dict = {"license_url": license_url} + update_query = dedent( f""" UPDATE image - SET meta_data = {Json(license_url_col)} - WHERE identifier IN ({','.join([f"'{r[0]}'" for r in result])}); + SET meta_data = {Json(license_url_dict)} + WHERE identifier IN ({','.join([f"'{r}'" for r in identifiers])}); """ ) - - updated_count = postgres.run( - update_license_url_query, autocommit=True, handler=RETURN_ROW_COUNT + updated_count: int = postgres.run( + update_query, autocommit=True, handler=RETURN_ROW_COUNT ) logger.info(f"{updated_count} records updated with {license_url}.") - total_counts[license_url] = updated_count - total_count += updated_count - - logger.info(f"{total_count} image records with missing license_url updated.") - for license_url, count in total_counts.items(): - logger.info(f"{count} records with {license_url}.") - return total_counts + if updated_count: + updated_by_license[license_url] = updated_count + total_updated += updated_count + logger.info(f"Updated {total_updated} rows") + return json.dumps(updated_by_license) def final_report( postgres_conn_id: str, - item_count: int, - task: AbstractOperator, + updated_by_license: str, + task: AbstractOperator = None, ): + """Check for null in `meta_data` and send a message to Slack + with the statistics of the DAG run. + + :param postgres_conn_id: Postgres connection id. + :param updated_by_license: stringified JSON with the number of records updated + for each license_url. If `update_license_url` was skipped, this will be "None". + :param task: automatically passed by Airflow, used to set the execution timeout. + """ null_meta_data_count = get_null_counts(postgres_conn_id, task) + if updated_by_license == "None": + updated_message = "No records were updated." + else: + updated_by_license = json.loads(updated_by_license) + formatted_item_count = "".join( + [ + f"{license_url}: {count} rows\n" + for license_url, count in updated_by_license.items() + ] + ) + updated_message = f"Update statistics:\n{formatted_item_count}" message = f""" -Added license_url to *{item_count}* items` +`add_license_url` DAG run completed. +{updated_message} Now, there are {null_meta_data_count} records with NULL meta_data left. """ send_message( @@ -169,9 +196,10 @@ def final_report( final_report = PythonOperator( task_id=FINAL_REPORT, python_callable=final_report, + trigger_rule=TriggerRule.ALL_DONE, op_kwargs={ "postgres_conn_id": POSTGRES_CONN_ID, - "item_count": XCOM_PULL_TEMPLATE.format( + "updated_by_license": XCOM_PULL_TEMPLATE.format( update_license_url.task_id, "return_value" ), }, From 7c90b5289ff111759be0dbcce774643c9666cbef Mon Sep 17 00:00:00 2001 From: Olga Bulat Date: Wed, 15 Mar 2023 08:21:11 +0300 Subject: [PATCH 2/4] Update openverse_catalog/dags/maintenance/add_license_url.py Co-authored-by: Madison Swain-Bowden --- openverse_catalog/dags/maintenance/add_license_url.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/openverse_catalog/dags/maintenance/add_license_url.py b/openverse_catalog/dags/maintenance/add_license_url.py index e3760844b..d1b3f45e2 100644 --- a/openverse_catalog/dags/maintenance/add_license_url.py +++ b/openverse_catalog/dags/maintenance/add_license_url.py @@ -87,15 +87,12 @@ def update_license_url(postgres_conn_id: str, task: AbstractOperator) -> str | N records_with_null_in_metadata = postgres.get_records(select_query) # Dictionary with license pair as key and list of identifiers as value - records_to_update = {} + records_to_update = defaultdict(list) for result in records_with_null_in_metadata: identifier, license_, version = result license_pair = f"{license_},{version}" - if license_pair not in records_to_update: - records_to_update[license_pair] = [identifier] - else: - records_to_update[license_pair].append(identifier) + records_to_update[license_pair].append(identifier) total_updated = 0 updated_by_license = {} From 197aececf4a5b839eb762481045caddaae426e57 Mon Sep 17 00:00:00 2001 From: Olga Bulat Date: Wed, 15 Mar 2023 08:21:42 +0300 Subject: [PATCH 3/4] Update openverse_catalog/dags/maintenance/add_license_url.py Co-authored-by: Madison Swain-Bowden --- openverse_catalog/dags/maintenance/add_license_url.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openverse_catalog/dags/maintenance/add_license_url.py b/openverse_catalog/dags/maintenance/add_license_url.py index d1b3f45e2..d68f269f6 100644 --- a/openverse_catalog/dags/maintenance/add_license_url.py +++ b/openverse_catalog/dags/maintenance/add_license_url.py @@ -99,7 +99,7 @@ def update_license_url(postgres_conn_id: str, task: AbstractOperator) -> str | N for license_pair, identifiers in records_to_update.items(): license_, license_version = license_pair.split(",") - license_url = get_license_info_from_license_pair(license_, license_version)[-1] + *_, license_url = get_license_info_from_license_pair(license_, license_version) if license_url is None: logger.info( f"No license pair {license_pair} in the license reverse path map." From 3bfc2fc8ffc4a4e9044379f040c8d25960b404af Mon Sep 17 00:00:00 2001 From: Olga Bulat Date: Wed, 15 Mar 2023 10:00:16 +0300 Subject: [PATCH 4/4] Remove unnecessary type conversions Signed-off-by: Olga Bulat --- .../dags/maintenance/add_license_url.py | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/openverse_catalog/dags/maintenance/add_license_url.py b/openverse_catalog/dags/maintenance/add_license_url.py index d68f269f6..1d4cbbb76 100644 --- a/openverse_catalog/dags/maintenance/add_license_url.py +++ b/openverse_catalog/dags/maintenance/add_license_url.py @@ -9,8 +9,8 @@ the `meta_data` column are updated, the DAG will only run the first and the last step, logging the statistics. """ -import json import logging +from collections import defaultdict from datetime import timedelta from textwrap import dedent from typing import Literal @@ -67,7 +67,7 @@ def get_statistics( return next_task -def update_license_url(postgres_conn_id: str, task: AbstractOperator) -> str | None: +def update_license_url(postgres_conn_id: str, task: AbstractOperator) -> dict[str, int]: """Add license_url to meta_data batching all records with the same license. :param task: automatically passed by Airflow, used to set the execution timeout :param postgres_conn_id: Postgres connection id @@ -91,19 +91,15 @@ def update_license_url(postgres_conn_id: str, task: AbstractOperator) -> str | N for result in records_with_null_in_metadata: identifier, license_, version = result - license_pair = f"{license_},{version}" - records_to_update[license_pair].append(identifier) + records_to_update[(license_, version)].append(identifier) total_updated = 0 updated_by_license = {} - for license_pair, identifiers in records_to_update.items(): - license_, license_version = license_pair.split(",") - *_, license_url = get_license_info_from_license_pair(license_, license_version) + for (license_, version), identifiers in records_to_update.items(): + *_, license_url = get_license_info_from_license_pair(license_, version) if license_url is None: - logger.info( - f"No license pair {license_pair} in the license reverse path map." - ) + logger.info(f"No license pair ({license_}, {version}) in the license map.") continue logger.info(f"{len(identifiers):4} items will be updated with {license_url}") license_url_dict = {"license_url": license_url} @@ -122,12 +118,12 @@ def update_license_url(postgres_conn_id: str, task: AbstractOperator) -> str | N updated_by_license[license_url] = updated_count total_updated += updated_count logger.info(f"Updated {total_updated} rows") - return json.dumps(updated_by_license) + return updated_by_license def final_report( postgres_conn_id: str, - updated_by_license: str, + updated_by_license: dict[str, int] | None, task: AbstractOperator = None, ): """Check for null in `meta_data` and send a message to Slack @@ -140,10 +136,9 @@ def final_report( """ null_meta_data_count = get_null_counts(postgres_conn_id, task) - if updated_by_license == "None": + if not updated_by_license: updated_message = "No records were updated." else: - updated_by_license = json.loads(updated_by_license) formatted_item_count = "".join( [ f"{license_url}: {count} rows\n" @@ -174,9 +169,9 @@ def final_report( }, schedule_interval=None, catchup=False, - # Use the docstring at the top of the file as md docs in the UI doc_md=__doc__, tags=["data_normalization"], + render_template_as_native_obj=True, ) with dag: