Skip to content

Commit

Permalink
Fix dataset deletion script
Browse files Browse the repository at this point in the history
The S3 data deletion is now handled by the dataset repository module
when distributions are deleted. Adjust the dataset deletion script
accordingly.
  • Loading branch information
simenheg committed Oct 24, 2024
1 parent a0cb42a commit 9138975
Showing 1 changed file with 3 additions and 85 deletions.
88 changes: 3 additions & 85 deletions scripts/delete_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@
import os
from datetime import datetime, timezone

import boto3

from metadata.common import CONFIDENTIALITIES
from scripts.util import chunk, confirm_to_continue

# Must be done before repository import.
os.environ["AWS_XRAY_SDK_ENABLED"] = "false"

Expand All @@ -26,7 +21,6 @@ def print_output(
deleted_distributions,
deleted_editions,
deleted_versions,
deleted_s3_objects,
output_dir_path=None,
):
output = json.dumps(
Expand All @@ -35,7 +29,6 @@ def print_output(
"deleted_distributions": deleted_distributions,
"deleted_editions": deleted_editions,
"deleted_versions": deleted_versions,
"deleted_s3_objects": deleted_s3_objects,
}
),
indent=2,
Expand All @@ -55,56 +48,6 @@ def write_output(output_dir_path, dataset_id, output_json):
f.write(output_json)


def find_s3_objects(bucket, dataset_id):
s3 = boto3.client("s3", region_name="eu-west-1")

stages = [
cp["Prefix"]
for cp in s3.list_objects_v2(
Bucket=bucket,
Delimiter="/",
)["CommonPrefixes"]
]

objects = []

paginator = s3.get_paginator("list_objects_v2")

for stage in stages:
for confidentiality in CONFIDENTIALITIES:
prefix = f"{stage}{confidentiality}/{dataset_id}/"
logger.debug(f"Looking for data in {prefix}")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
objects.append(obj["Key"])
return objects


def _delete_s3_objects(s3_client, bucket, objects):
response = s3_client.delete_objects(
Bucket=bucket,
Delete={"Objects": objects},
)
return [obj["Key"] for obj in response.get("Deleted", [])]


def delete_s3_objects(bucket, objects):
objects = [{"Key": key} for key in objects]
n_objs = len(objects)
s3_client = boto3.client("s3", region_name="eu-west-1")
deleted_objs = []

if n_objs > 1000:
for i, c in enumerate(chunk(objects, 1000)):
logger.info("Deleting objects {}/{}...".format(len(c) + i * 1000, n_objs))
deleted_objs += _delete_s3_objects(s3_client, bucket, c)
else:
logger.info(f"Deleting {n_objs} objects...")
deleted_objs = _delete_s3_objects(s3_client, bucket, objects)

return deleted_objs


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--env", required=True, choices=["dev", "prod"])
Expand All @@ -119,12 +62,6 @@ def delete_s3_objects(bucket, objects):
required=False,
help="Optional. Path to directory where you want output to be written",
)
parser.add_argument(
"--delete-s3-data",
default=False,
help="Optional. Identify and delete any dataset data from S3",
action="store_true",
)
parser.add_argument(
"--log-level",
default="INFO",
Expand All @@ -137,10 +74,10 @@ def delete_s3_objects(bucket, objects):
args = parser.parse_args()

os.environ["AWS_PROFILE"] = f"okdata-{args.env}"
s3_bucket = f"ok-origo-dataplatform-{args.env}"
os.environ["AWS_REGION"] = "eu-west-1"
os.environ["DATA_BUCKET_NAME"] = f"ok-origo-dataplatform-{args.env}"

dataset_id = args.dataset_id
delete_s3_data = args.delete_s3_data
apply_changes = args.apply

dataset_repository = DatasetRepository()
Expand All @@ -152,8 +89,6 @@ def delete_s3_objects(bucket, objects):
deleted_editions = []
deleted_versions = []
deleted_datasets = []
deleted_s3_objects = []
s3_objects = []

try:
logger.info(f"Preparing to delete dataset {dataset_id}")
Expand Down Expand Up @@ -183,8 +118,7 @@ def delete_s3_objects(bucket, objects):

logger.info(f"Found {len(edition_ids)} editions")

# Get distributions that are to be deleted. Here we do not extract only the "Id" field,
# since we need information about "distribution_type" in order to clean up in s3 afterwards.
# Get distributions that are to be deleted
distributions = []
for edition_id in edition_ids:
_dataset_id, _version, _edition = edition_id.split("/")
Expand All @@ -196,15 +130,6 @@ def delete_s3_objects(bucket, objects):

logger.info(f"Found {len(distributions)} distributions")

if delete_s3_data:
s3_objects = find_s3_objects(s3_bucket, dataset_id)
n_objs = len(s3_objects)

logger.info(f"Found {n_objs} S3 objects in {s3_bucket}")

if n_objs > 1000 and apply_changes:
confirm_to_continue(f"That's a lot of objects: {n_objs:,}")

# Delete distributions and store in deleted_distributions
for distribution in distributions:
if apply_changes:
Expand Down Expand Up @@ -238,18 +163,11 @@ def delete_s3_objects(bucket, objects):

deleted_datasets.append(dataset_id)

# Delete S3 data
if delete_s3_data and n_objs > 0 and apply_changes:
deleted_s3_objects = delete_s3_objects(s3_bucket, s3_objects)
else:
deleted_s3_objects = s3_objects

finally:
print_output(
dataset_id,
deleted_distributions,
deleted_editions,
deleted_versions,
deleted_s3_objects,
args.output,
)

0 comments on commit 9138975

Please sign in to comment.