Skip to content

Commit

Permalink
Add logs to cleaning steps in the ingestion server and skip saving ta…
Browse files Browse the repository at this point in the history
…gs (#4358)
  • Loading branch information
krysal authored May 20, 2024
1 parent 352b710 commit 3747f9a
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
for field, clean_value in cleaned_data.items():
update_field_expressions.append(f"{field} = {clean_value}")
# Save cleaned values for later
# (except for tags, which take up too much space)
if field == "tags":
log.debug("Skipping tags.")
continue
cleaned_values[field].append((identifier, clean_value))

if len(update_field_expressions) > 0:
Expand Down Expand Up @@ -291,6 +295,7 @@ class CleanDataUploader:
}

def __init__(self):
log.info("Initializing clean data uploader.")
self.date = time.strftime("%Y-%m-%d")
self.buffer_size = config("CLEANUP_BUFFER_SIZE", default=10_000_000, cast=int)
bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog")
Expand All @@ -303,6 +308,7 @@ def __init__(self):
log.error(f"Error connecting to S3 or creating bucket: {e}")
self.s3 = None
self.s3_bucket = None
log.info(f"Connected to S3 and `{bucket_name}` bucket loaded.")

@staticmethod
def _get_s3_resource():
Expand All @@ -326,17 +332,22 @@ def _upload_to_s3(self, field: str):
return

part_number = self.buffer[field].part
log.info(f"Uploading file part {part_number} of `{field}` to S3...")
s3_file_name = f"{self.s3_path}/{self.date}_{field}_{part_number}.tsv"
tsv_file = f"{field}.tsv"
with open(tsv_file, "w") as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(self.buffer[field].rows)
file_size = os.path.getsize(tsv_file) / (1024 * 1024)
try:
log.info(
f"Uploading file part {part_number} ({file_size:.4f} MB) of `{field}` "
f"with {len(self.buffer[field].rows)} rows to S3..."
)
self.s3_bucket.upload_file(tsv_file, s3_file_name)
except Exception as e:
log.error(f"Error uploading {field} to S3: {e}")
os.remove(tsv_file)
log.info(f"`{tsv_file}` removed locally. Clearing buffer.")
self.buffer[field].part += 1
self.buffer[field].rows = []

Expand All @@ -352,10 +363,11 @@ def save(self, result: dict) -> dict[str, int]:
return {field: len(items) for field, items in result.items()}

def flush(self):
log.info("Clearing buffer.")
log.info("Flushing remaining rows...")
for field in self.buffer:
if self.buffer[field].rows:
self._upload_to_s3(field)
log.info("Saved all the cleaned data.")


def clean_image_data(table):
Expand Down

0 comments on commit 3747f9a

Please sign in to comment.