Skip to content

Commit

Permalink
Upload Ingestion Server's TSV files to AWS S3 (skip tags) (#4529)
Browse files Browse the repository at this point in the history
* Save files of cleaned data to temporary directory and remove extra single quotes in values

* Recreate temporary directory before cleaning

* Add stocksnap to TLS_CACHE

* Fix test

* Upload files to AWS S3 (without tags)

* Add default values to AWS variables in env.template

* Add explanatory comment for skipped non-existing files
  • Loading branch information
krysal authored Jun 27, 2024
1 parent bda77f6 commit 97ff97d
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
6 changes: 4 additions & 2 deletions ingestion_server/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ PYTHONUNBUFFERED="0"

#ENVIRONMENT="local"

#AWS_ACCESS_KEY_ID=""
#AWS_SECRET_ACCESS_KEY=""
#AWS_REGION="us-east-1"
#AWS_ACCESS_KEY_ID="test_key"
#AWS_SECRET_ACCESS_KEY="test_secret"
#AWS_S3_ENDPOINT="http://s3:5000"

#ELASTICSEARCH_URL="es"
#ELASTICSEARCH_PORT="9200"
Expand Down
43 changes: 43 additions & 0 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import uuid
from urllib.parse import urlparse

import boto3
import requests as re
import tldextract
from decouple import config
from psycopg2.extras import DictCursor, Json

from ingestion_server.db_helpers import database_connect
Expand Down Expand Up @@ -302,6 +304,46 @@ def save_cleaned_data(result: dict) -> dict[str, int]:
return cleanup_counts


def _upload_to_s3(fields):
"""
Upload cleaned data to S3. It assumes that the bucket already exists.
Locally, it connects to a MinIO instance through its endpoint and test credentials.
On live environments, the connection is allowed via IAM roles.
"""
bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog")
s3_path = "shared/data-refresh-cleaned-data"
try:
s3 = boto3.resource(
"s3",
endpoint_url=config("AWS_S3_ENDPOINT", default=None),
aws_access_key_id=config("AWS_ACCESS_KEY_ID", default=None),
aws_secret_access_key=config("AWS_SECRET_ACCESS_KEY", default=None),
region_name=config("AWS_REGION", default=None),
)
s3.meta.client.head_bucket(Bucket=bucket_name)
bucket = s3.Bucket(bucket_name)
log.info(f"Connected to S3 and '{bucket_name}' bucket loaded.")
except Exception as e:
log.error(f"Files upload failed. Error connecting to S3.\n{e}")
return

for field in fields:
file_path = TMP_DIR.joinpath(f"{field}.tsv")
if not file_path.exists():
# Once the data has been cleaned in `upstream,` the cleaning process will
# not generate these files. Also, tags never generate any (refer to the
# `_clean_data_worker` function).
continue

try:
bucket.upload_file(file_path, f"{s3_path}/{field}.tsv")
log.info(f"Uploaded '{field}.tsv' to S3.")
file_path.unlink()
except Exception as e:
log.error(f"Error uploading '{field}.tsv' to S3: {e}")


def clean_image_data(table):
"""
Clean up data loaded from upstream that is unsuitable for prod before going live.
Expand Down Expand Up @@ -397,6 +439,7 @@ def clean_image_data(table):
conn.commit()
iter_cur.close()
conn.close()
_upload_to_s3(cleanable_fields_for_table)
end_time = time.perf_counter()
cleanup_time = end_time - start_time
log.info(
Expand Down

0 comments on commit 97ff97d

Please sign in to comment.