Skip to content

Commit

Permalink
Merge pull request #618 from projectcaluma/celery
Browse files Browse the repository at this point in the history
feat: add celery to allow indexing in background
  • Loading branch information
winged authored Sep 12, 2024
2 parents 629f812 + d6f995c commit 766f012
Show file tree
Hide file tree
Showing 20 changed files with 815 additions and 239 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ WORKDIR /app

COPY . ./

ENV POETRY_NO_INTERACTION 1
ENV POETRY_NO_INTERACTION=1

RUN pip install -U poetry

Expand Down Expand Up @@ -58,4 +58,4 @@ RUN pip install /tmp/*.whl && rm /tmp/*.whl

USER 1001

CMD ["sh", "-c", "wait-for-it $DATABASE_HOST:${DATABASE_PORT:-5432} -- manage.py migrate && gunicorn --workers 10 --access-logfile - --limit-request-line 16384 --bind :8000 alexandria.wsgi"]
CMD ["sh", "-c", "wait-for-it $DATABASE_HOST:${DATABASE_PORT:-5432} -- ./manage.py migrate && gunicorn --workers 10 --access-logfile - --limit-request-line 16384 --bind :8000 alexandria.wsgi"]
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ debug-alexandria: ## start an api container with service ports for debugging
@echo "Run './manage.py runserver 0:8000' to start the debugging server"
@docker compose run --rm --user root --use-aliases --service-ports alexandria bash

.PHONY: debug-celery
debug-celery: ## start a celery container with service ports for debugging
@docker compose stop celery
@echo "See https://docs.celeryq.dev/en/stable/userguide/debugging.html on how to debug celery"
@docker compose run --rm --user root --use-aliases --service-ports celery

.PHONY: makemigrations
makemigrations: ## Make django migrations
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ ALEXANDRIA_MANABI_DAV_URI_SCHEMES = {
- `ALEXANDRIA_ISO_639_TO_PSQL_SEARCH_CONFIG`: Mapping from language code from tika to django search vector language config. Also defines which languages are supported by the search. (default: `{"en":"english","de":"german","fr":"french","it":"italian"}`)
- `ALEXANDRIA_CONTENT_SEARCH_TYPE`: Django full text search type. (default: phrase)

#### Celery (optional)
For checksums and file content search [celery](https://docs.celeryq.dev) with a redis broker is used.

- `REDIS_HOST`: Redis host (default: redis)
- `REDIS_PORT`: Redis port (default: 6379)
- `REDIS_USER`: Redis user (default: default)
- `REDIS_PASSWORD`: Redis password (default: redis)
- `CELERY_BROKER_URL`: Broker url (default: `redis://{REDIS_USER}:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0`)
- `CELERY_TASK_ACKS_LATE`: Tasks only are marked complete after finishing (default: True)
- `CELERY_TASK_SOFT_TIME_LIMIT`: Prevents task existing forever (default: 60)

#### Development

For development, you can also set the following environemnt variables to help you:
Expand Down
5 changes: 5 additions & 0 deletions alexandria/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ("celery_app",)
18 changes: 18 additions & 0 deletions alexandria/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "alexandria.settings")

app = Celery("alexandria")

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")


# Load task modules from all registered Django apps.
app.autodiscover_tasks()
14 changes: 14 additions & 0 deletions alexandria/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pytest_factoryboy.fixture import Box
from rest_framework.test import APIClient

from alexandria.core import tasks
from alexandria.oidc_auth.models import OIDCUser


Expand Down Expand Up @@ -52,6 +53,19 @@ def mock_tika(mocker):
mocker.patch("tika.language.from_buffer", return_value="en")


@pytest.fixture(autouse=True)
def mock_celery(mocker):
mocker.patch("django.db.transaction.on_commit", side_effect=lambda f: f())
mocker.patch(
"alexandria.core.tasks.set_checksum.delay",
side_effect=lambda id: tasks.set_checksum(id),
)
mocker.patch(
"alexandria.core.tasks.set_content_vector.delay",
side_effect=lambda id: tasks.set_content_vector(id),
)


@pytest.fixture
def admin_groups():
return ["admin"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from tqdm import tqdm

from alexandria.core.models import File
from alexandria.core.tasks import set_content_vector


class Command(BaseCommand):
Expand All @@ -26,7 +27,7 @@ def handle(self, *args, **options):
# iterate over files in batches to prevent memory exhaustion
for file in tqdm(query.iterator(50), "Generating vectors", query.count()):
try:
file.save() # this will call set_content_vector
set_content_vector(file.pk)
except Exception as e: # noqa: B902
failed_files.append(str(file.id))
self.stdout.write(
Expand Down
72 changes: 18 additions & 54 deletions alexandria/core/models.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
import hashlib
import logging
import re
import uuid
from mimetypes import guess_extension
from pathlib import Path
from tempfile import NamedTemporaryFile

import tika.language
import tika.parser
from django.conf import settings
from django.contrib.postgres.fields import ArrayField
from django.contrib.postgres.indexes import GinIndex
from django.contrib.postgres.search import SearchVector, SearchVectorField
from django.contrib.postgres.search import SearchVectorField
from django.core.exceptions import ImproperlyConfigured
from django.core.files import File as DjangoFile
from django.core.validators import RegexValidator
from django.db import models, transaction
from django.db.models import Value
from django.db.models.fields.files import ImageFile
from django.dispatch import receiver
from django.utils.translation import gettext_lazy as _
Expand Down Expand Up @@ -232,55 +228,6 @@ class Variant(models.TextChoices):
mime_type = models.CharField(max_length=255)
size = models.IntegerField()

@staticmethod
def make_checksum(bytes_: bytes) -> str:
return f"sha256:{hashlib.sha256(bytes_).hexdigest()}"

def set_checksum(self):
if not settings.ALEXANDRIA_ENABLE_CHECKSUM:
return

self.content.file.file.seek(0)
self.checksum = self.make_checksum(self.content.file.file.read())

def set_content_vector(self):
if (
self.variant != File.Variant.ORIGINAL
or not settings.ALEXANDRIA_ENABLE_CONTENT_SEARCH
):
return

self.content.file.file.seek(0)
parsed_content = tika.parser.from_buffer(self.content.file.file)

name_vector = SearchVector(Value(Path(self.name).stem), weight="A")
if not parsed_content["content"]:
self.content_vector = name_vector
return

# use part of content for language detection, beacause metadata is not reliable
self.language = tika.language.from_buffer(parsed_content["content"][:1000])
config = settings.ALEXANDRIA_ISO_639_TO_PSQL_SEARCH_CONFIG.get(
self.language, "simple"
)
self.content_vector = name_vector + SearchVector(
Value(parsed_content["content"]),
config=config,
weight="B",
)

def save(
self, force_insert=False, force_update=False, using=None, update_fields=None
):
self.set_checksum()
self.set_content_vector()

file = super().save(force_insert, force_update, using, update_fields)

self.create_thumbnail()

return file

def get_webdav_url(self, username, group, host="http://localhost:8000"):
# The path doesn't need to match the actual file path, because we're accessing
# the file via the `File.pk`. So we can use just the name, that will then be
Expand Down Expand Up @@ -373,3 +320,20 @@ def auto_delete_file_on_delete(sender, instance, **kwargs):
"""Delete file from filesystem when `File` object is deleted."""

instance.content.delete(save=False)


@receiver(models.signals.post_save, sender=File)
def set_file_attributes(sender, instance, **kwargs):
from alexandria.core import tasks

if settings.ALEXANDRIA_ENABLE_CHECKSUM and not instance.checksum:
tasks.set_checksum.delay_on_commit(instance.pk)

if (
settings.ALEXANDRIA_ENABLE_CONTENT_SEARCH
and instance.variant == File.Variant.ORIGINAL
and not instance.content_vector
):
tasks.set_content_vector.delay_on_commit(instance.pk)

instance.create_thumbnail()
58 changes: 58 additions & 0 deletions alexandria/core/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import hashlib
from pathlib import Path

import tika.language
import tika.parser
from django.conf import settings
from django.contrib.postgres.search import SearchVector
from django.db.models import Value

from alexandria.core.models import File
from celery import shared_task


@shared_task(soft_time_limit=301)
def set_content_vector(file_pk: str):
file = File.objects.get(pk=file_pk)
file.content.file.file.seek(0)

# tika has an internal time limit of 300s, set the request limit to match that
# different values should be set in tika as well
# https://github.com/CogStack/tika-service/blob/master/README.md#tika-parsers-configuration
parsed_content = tika.parser.from_buffer(
file.content.file.file, requestOptions={"timeout": 300}
)

name_vector = SearchVector(Value(Path(file.name).stem), weight="A")
if not parsed_content["content"]:
# Update only content_vector, to avoid race conditions
File.objects.filter(pk=file.pk).update(content_vector=name_vector)
return

# use part of content for language detection, beacause metadata is not reliable
language = tika.language.from_buffer(parsed_content["content"][:1000])
config = settings.ALEXANDRIA_ISO_639_TO_PSQL_SEARCH_CONFIG.get(language, "simple")
content_vector = name_vector + SearchVector(
Value(parsed_content["content"].strip()),
config=config,
weight="B",
)

# Update only need fields, to avoid race conditions
File.objects.filter(pk=file.pk).update(
content_vector=content_vector, language=language
)


@shared_task
def set_checksum(file_pk: str):
file = File.objects.get(pk=file_pk)
file.content.file.file.seek(0)
checksum = make_checksum(file.content.file.file.read())

# Update only checksum, to avoid race conditions
File.objects.filter(pk=file.pk).update(checksum=checksum)


def make_checksum(bytes_: bytes) -> str:
return f"sha256:{hashlib.sha256(bytes_).hexdigest()}"
Loading

0 comments on commit 766f012

Please sign in to comment.