From 84ac386b8e9717a89a6a62b03c88392f14fdd844 Mon Sep 17 00:00:00 2001 From: Olga Bulat Date: Sun, 5 Feb 2023 11:32:39 +0300 Subject: [PATCH] Create a list of fields to clean outside of worker Signed-off-by: Olga Bulat --- ingestion_server/ingestion_server/cleanup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 375d08a78..3bd2d2374 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -197,7 +197,7 @@ def test_tls_supported(cls, url): return True -def _clean_data_worker(rows, temp_table, sources_config, table): +def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): log.info("Starting data cleaning worker") global_field_to_func = sources_config["*"]["fields"] worker_conn = database_connect() @@ -216,7 +216,7 @@ def _clean_data_worker(rows, temp_table, sources_config, table): } start_time = time.time() - cleaned_values = {field: [] for field in _get_cleanable_fields(table)} + cleaned_values = {field: [] for field in all_fields} for row in rows: # Map fields that need updating to their cleaning functions source = row["source"] @@ -349,7 +349,7 @@ def clean_image_data(table): end = job_size * n last_end = end # Arguments for parallel _clean_data_worker calls - jobs.append((batch[start:end], temp_table, source_config, table)) + jobs.append((batch[start:end], temp_table, source_config, _get_cleanable_fields("image"))) pool = multiprocessing.Pool(processes=num_workers) log.info(f"Starting {len(jobs)} cleaning jobs") conn.commit()