Skip to content

Commit

Permalink
Update data model
Browse files Browse the repository at this point in the history
  • Loading branch information
mvandenburgh committed Jan 24, 2025
1 parent 5b5b8e7 commit 92785b1
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Generated by Django 4.2.17 on 2024-12-11 17:12
# Generated by Django 4.2.17 on 2024-12-11 17:17
from __future__ import annotations

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):
Expand All @@ -27,18 +28,29 @@ class Migration(migrations.Migration):
max_length=255,
),
),
],
),
migrations.CreateModel(
name='GarbageCollectionEventRecord',
fields=[
(
'id',
models.BigAutoField(
auto_created=True, primary_key=True, serialize=False, verbose_name='ID'
),
),
(
'records',
'record',
models.JSONField(
help_text='JSON serialization of the queryset of records that were garbage collected.'
help_text='JSON serialization of the record that was garbage collected.'
),
),
(
'garbage_collection_event_id',
models.UUIDField(
editable=False,
help_text='Unique identifier for the garbage collection event. Used to '
'associate multiple records that are part of the same GC event.',
'event',
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name='records',
to='api.garbagecollectionevent',
),
),
],
Expand Down
3 changes: 2 additions & 1 deletion dandiapi/api/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .asset_paths import AssetPath, AssetPathRelation
from .audit import AuditRecord
from .dandiset import Dandiset
from .garbage_collection import GarbageCollectionEvent
from .garbage_collection import GarbageCollectionEvent, GarbageCollectionEventRecord
from .oauth import StagingApplication
from .upload import Upload
from .user import UserMetadata
Expand All @@ -18,6 +18,7 @@
'AuditRecord',
'Dandiset',
'GarbageCollectionEvent',
'GarbageCollectionEventRecord',
'StagingApplication',
'Upload',
'UserMetadata',
Expand Down
18 changes: 11 additions & 7 deletions dandiapi/api/models/garbage_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ class GarbageCollectionEvent(models.Model):
type = models.CharField(
max_length=255, help_text='The model name of the records that were garbage collected.'
)
records = models.JSONField(
help_text='JSON serialization of the queryset of records that were garbage collected.'

def __str__(self) -> str:
return f'{self.type} ({self.created})'


class GarbageCollectionEventRecord(models.Model):
event = models.ForeignKey(
GarbageCollectionEvent, on_delete=models.CASCADE, related_name='records'
)

garbage_collection_event_id = models.UUIDField(
editable=False,
help_text='Unique identifier for the garbage collection event. '
'Used to associate multiple records that are part of the same GC event.',
record = models.JSONField(
help_text='JSON serialization of the record that was garbage collected.'
)

def __str__(self) -> str:
return f'{self.type} ({self.created})'
return f'{self.event.type} record'
71 changes: 47 additions & 24 deletions dandiapi/api/services/garbage_collection/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
from __future__ import annotations

from concurrent.futures import Future, ThreadPoolExecutor, wait
from datetime import timedelta
import json
from uuid import uuid4

from celery.utils.log import get_task_logger
from django.core import serializers
from django.db import transaction
from django.utils import timezone
from more_itertools import chunked

from dandiapi.api.models import AssetBlob, GarbageCollectionEvent, Upload
from dandiapi.api.models import (
AssetBlob,
GarbageCollectionEvent,
GarbageCollectionEventRecord,
Upload,
)
from dandiapi.api.storage import DandiMultipartMixin

logger = get_task_logger(__name__)
Expand All @@ -32,27 +37,36 @@ def _garbage_collect_uploads() -> int:
qs = Upload.objects.filter(
created__lt=timezone.now() - UPLOAD_EXPIRATION_TIME,
)
if not qs.exists():
return 0

# Chunk the queryset to avoid creating a single
# GarbageCollectionEvent with a huge JSONField
gc_event_uuid = uuid4()
deleted_records = 0
for uploads_chunk in chunked(qs.iterator(), GARBAGE_COLLECTION_EVENT_CHUNK_SIZE):
with transaction.atomic():
GarbageCollectionEvent.objects.create(
garbage_collection_event_id=gc_event_uuid,
records=json.loads(serializers.serialize('json', uploads_chunk)),
type=Upload.__name__,
futures: list[Future] = []

with transaction.atomic(), ThreadPoolExecutor() as executor:
event = GarbageCollectionEvent.objects.create(type=Upload.__name__)
for uploads_chunk in chunked(qs.iterator(), GARBAGE_COLLECTION_EVENT_CHUNK_SIZE):
GarbageCollectionEventRecord.objects.bulk_create(
GarbageCollectionEventRecord(
event=event, record=json.loads(serializers.serialize('json', [u]))[0]
)
for u in uploads_chunk
)

# Delete the blobs from S3
for chunk in uploads_chunk:
chunk.blob.delete()
futures.append(
executor.submit(
lambda chunk: [u.blob.delete(save=False) for u in chunk],
uploads_chunk,
)
)

deleted_records += Upload.objects.filter(
pk__in=[u.pk for u in uploads_chunk],
).delete()[0]

wait(futures)

return deleted_records


Expand All @@ -61,27 +75,36 @@ def _garbage_collect_asset_blobs() -> int:
assets__isnull=True,
created__lt=timezone.now() - ASSET_BLOB_EXPIRATION_TIME,
)
if not qs.exists():
return 0

# Chunk the queryset to avoid creating a single
# GarbageCollectionEvent with a huge JSONField
gc_event_uuid = uuid4()
deleted_records = 0
for asset_blobs_chunk in chunked(qs.iterator(), GARBAGE_COLLECTION_EVENT_CHUNK_SIZE):
with transaction.atomic():
GarbageCollectionEvent.objects.create(
garbage_collection_event_id=gc_event_uuid,
records=json.loads(serializers.serialize('json', asset_blobs_chunk)),
type=AssetBlob.__name__,
futures: list[Future] = []

with transaction.atomic(), ThreadPoolExecutor() as executor:
event = GarbageCollectionEvent.objects.create(type=AssetBlob.__name__)
for asset_blobs_chunk in chunked(qs.iterator(), GARBAGE_COLLECTION_EVENT_CHUNK_SIZE):
GarbageCollectionEventRecord.objects.bulk_create(
GarbageCollectionEventRecord(
event=event, record=json.loads(serializers.serialize('json', [a]))[0]
)
for a in asset_blobs_chunk
)

# Delete the blobs from S3
for chunk in asset_blobs_chunk:
chunk.blob.delete()
futures.append(
executor.submit(
lambda chunk: [a.blob.delete(save=False) for a in chunk],
asset_blobs_chunk,
)
)

deleted_records += AssetBlob.objects.filter(
pk__in=[a.pk for a in asset_blobs_chunk],
).delete()[0]

wait(futures)

return deleted_records


Expand Down
33 changes: 26 additions & 7 deletions dandiapi/api/tests/test_garbage_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
from freezegun import freeze_time
import pytest

from dandiapi.api.models import AssetBlob, GarbageCollectionEvent, Upload
from dandiapi.api.models import (
AssetBlob,
GarbageCollectionEvent,
GarbageCollectionEventRecord,
Upload,
)
from dandiapi.api.services import garbage_collection


Expand Down Expand Up @@ -61,16 +66,18 @@ def test_garbage_collect_asset_blobs(asset_factory, asset_blob_factory):
@pytest.mark.django_db
def test_garbage_collection_event_records(asset_blob_factory, upload_factory):
# Create enough asset blobs to create 3 GarbageCollectionEvents
asset_blob_count = garbage_collection.GARBAGE_COLLECTION_EVENT_CHUNK_SIZE * 2 + 1
garbage_collected_asset_blobs: list[AssetBlob] = []
for _ in range(garbage_collection.GARBAGE_COLLECTION_EVENT_CHUNK_SIZE * 2 + 1):
for _ in range(asset_blob_count):
asset_blob: AssetBlob = asset_blob_factory()
asset_blob.created = timezone.now() - garbage_collection.ASSET_BLOB_EXPIRATION_TIME
asset_blob.save()
garbage_collected_asset_blobs.append(asset_blob)

# Create enough uploads to create 3 GarbageCollectionEvents
upload_count = garbage_collection.GARBAGE_COLLECTION_EVENT_CHUNK_SIZE * 2 + 1
garbage_collected_uploads: list[Upload] = []
for _ in range(garbage_collection.GARBAGE_COLLECTION_EVENT_CHUNK_SIZE * 2 + 1):
for _ in range(upload_count):
upload: Upload = upload_factory()
upload.created = timezone.now() - garbage_collection.UPLOAD_EXPIRATION_TIME
upload.save()
Expand All @@ -97,17 +104,29 @@ def test_garbage_collection_event_records(asset_blob_factory, upload_factory):
)

# Make sure the GarbageCollectionEvent records are created
assert GarbageCollectionEvent.objects.count() == 6
assert GarbageCollectionEvent.objects.filter(type=AssetBlob.__name__).count() == 3
assert GarbageCollectionEvent.objects.filter(type=Upload.__name__).count() == 3
assert GarbageCollectionEvent.objects.count() == 2
assert GarbageCollectionEvent.objects.filter(type=AssetBlob.__name__).count() == 1
assert GarbageCollectionEvent.objects.filter(type=Upload.__name__).count() == 1

assert GarbageCollectionEventRecord.objects.count() == asset_blob_count + upload_count
assert (
GarbageCollectionEventRecord.objects.filter(event__type=AssetBlob.__name__).count()
== asset_blob_count
)
assert (
GarbageCollectionEventRecord.objects.filter(event__type=Upload.__name__).count()
== upload_count
)

# Make sure running garbage_collect() again doesn't delete the GarbageCollectionEvent
# records yet
with freeze_time(time_to_freeze=timezone.now()):
garbage_collection.garbage_collect()
assert GarbageCollectionEvent.objects.count() == 6
assert GarbageCollectionEvent.objects.count() == 2
assert GarbageCollectionEventRecord.objects.count() == asset_blob_count + upload_count

# Make sure the GarbageCollectionEvent records are deleted after the RESTORATION_WINDOW
with freeze_time(time_to_freeze=timezone.now() + garbage_collection.RESTORATION_WINDOW):
garbage_collection.garbage_collect()
assert GarbageCollectionEvent.objects.count() == 0
assert GarbageCollectionEventRecord.objects.count() == 0

0 comments on commit 92785b1

Please sign in to comment.