From 3747f9aa40ed03899becb98ecae2abf926c8875f Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Mon, 20 May 2024 15:47:08 -0400 Subject: [PATCH] Add logs to cleaning steps in the ingestion server and skip saving tags (#4358) --- ingestion_server/ingestion_server/cleanup.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 7257431700f..d9ca160b4ab 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -255,6 +255,10 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): for field, clean_value in cleaned_data.items(): update_field_expressions.append(f"{field} = {clean_value}") # Save cleaned values for later + # (except for tags, which take up too much space) + if field == "tags": + log.debug("Skipping tags.") + continue cleaned_values[field].append((identifier, clean_value)) if len(update_field_expressions) > 0: @@ -291,6 +295,7 @@ class CleanDataUploader: } def __init__(self): + log.info("Initializing clean data uploader.") self.date = time.strftime("%Y-%m-%d") self.buffer_size = config("CLEANUP_BUFFER_SIZE", default=10_000_000, cast=int) bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog") @@ -303,6 +308,7 @@ def __init__(self): log.error(f"Error connecting to S3 or creating bucket: {e}") self.s3 = None self.s3_bucket = None + log.info(f"Connected to S3 and `{bucket_name}` bucket loaded.") @staticmethod def _get_s3_resource(): @@ -326,17 +332,22 @@ def _upload_to_s3(self, field: str): return part_number = self.buffer[field].part - log.info(f"Uploading file part {part_number} of `{field}` to S3...") s3_file_name = f"{self.s3_path}/{self.date}_{field}_{part_number}.tsv" tsv_file = f"{field}.tsv" with open(tsv_file, "w") as f: csv_writer = csv.writer(f, delimiter="\t") csv_writer.writerows(self.buffer[field].rows) + file_size = os.path.getsize(tsv_file) / (1024 * 1024) try: + log.info( + f"Uploading file part {part_number} ({file_size:.4f} MB) of `{field}` " + f"with {len(self.buffer[field].rows)} rows to S3..." + ) self.s3_bucket.upload_file(tsv_file, s3_file_name) except Exception as e: log.error(f"Error uploading {field} to S3: {e}") os.remove(tsv_file) + log.info(f"`{tsv_file}` removed locally. Clearing buffer.") self.buffer[field].part += 1 self.buffer[field].rows = [] @@ -352,10 +363,11 @@ def save(self, result: dict) -> dict[str, int]: return {field: len(items) for field, items in result.items()} def flush(self): - log.info("Clearing buffer.") + log.info("Flushing remaining rows...") for field in self.buffer: if self.buffer[field].rows: self._upload_to_s3(field) + log.info("Saved all the cleaned data.") def clean_image_data(table):