Skip to content

Commit

Permalink
Merge pull request #652 from projectcaluma/fix-mime-type-handling
Browse files Browse the repository at this point in the history
fix: mime type handling
  • Loading branch information
czosel authored Sep 25, 2024
2 parents ec22c19 + 4b8ab5b commit 3c56ab6
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 66 deletions.
8 changes: 5 additions & 3 deletions alexandria/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,12 @@ def manabi(settings):

@pytest.fixture()
def document_post_data(category):
content = io.BytesIO(
b"%PDF-1.\ntrailer<</Root<</Pages<</Kids[<</MediaBox[0 0 3 3]>>]>>>>>>"
)
content.name = "foo.pdf"
return {
"content": io.BytesIO(
b"%PDF-1.\ntrailer<</Root<</Pages<</Kids[<</MediaBox[0 0 3 3]>>]>>>>>>"
),
"content": content,
"data": io.BytesIO(
json.dumps({"title": "winstonsmith", "category": category.pk}).encode(
"utf-8"
Expand Down
59 changes: 34 additions & 25 deletions alexandria/core/tests/__snapshots__/test_viewsets.ambr

Large diffs are not rendered by default.

72 changes: 52 additions & 20 deletions alexandria/core/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,37 +60,41 @@ def test_anonymous_writing(
)


@pytest.mark.parametrize("enable_checksum", (True, False))
@pytest.mark.parametrize(
"file_type,allowed_mime_types,thumbnail_count,status",
"file_type,extension,content_type,allowed_mime_types,thumbnail_count,status",
[
("png", None, 1, HTTP_201_CREATED),
("unsupported", None, 0, HTTP_201_CREATED),
("png", ["application/pdf"], 1, HTTP_400_BAD_REQUEST),
# happy case
("png", "png", "image/png", None, 1, HTTP_201_CREATED),
# inconsistent extension
("png", "jpeg", "image/png", None, 1, HTTP_400_BAD_REQUEST),
# missing extension
("png", "", "image/png", None, 1, HTTP_400_BAD_REQUEST),
# inconsistent content
("unsupported", "png", "image/png", None, 0, HTTP_400_BAD_REQUEST),
# mime type not allowed by category
("png", "png", "image/png", ["application/pdf"], 1, HTTP_400_BAD_REQUEST),
],
)
def test_file_upload(
admin_client,
document_factory,
tmp_path,
file_factory,
enable_checksum,
file_type,
extension,
settings,
thumbnail_count,
allowed_mime_types,
content_type,
status,
category_factory,
):
settings.ALEXANDRIA_ENABLE_THUMBNAIL_GENERATION = True
settings.ALEXANDRIA_ENABLE_CHECKSUM = enable_checksum
category = category_factory(allowed_mime_types=allowed_mime_types)
doc = document_factory(category=category)
data = {
"name": "file.png",
"document": str(doc.pk),
"content": io.BytesIO(getattr(FileData, file_type)),
}
filename = f"file.{extension}"
content = io.BytesIO(getattr(FileData, file_type))
content.name = filename
content.content_type = content_type
data = {"name": filename, "document": str(doc.pk), "content": content}
url = reverse("file-list")
resp = admin_client.post(url, data=data, format="multipart")

Expand All @@ -105,10 +109,23 @@ def test_file_upload(
File.objects.filter(variant=File.Variant.THUMBNAIL).count() == thumbnail_count
)

if enable_checksum:
assert doc.files.filter(
variant=File.Variant.ORIGINAL
).first().checksum == make_checksum(getattr(FileData, file_type))

def test_generate_checksum(admin_client, document_factory, settings):
settings.ALEXANDRIA_ENABLE_CHECKSUM = True
doc = document_factory()
data = {
"name": "file.png",
"document": str(doc.pk),
"content": io.BytesIO(FileData.png),
}
url = reverse("file-list")
resp = admin_client.post(url, data=data, format="multipart")

assert resp.status_code == HTTP_201_CREATED
doc.refresh_from_db()

file = doc.files.filter(name="file.png", variant=File.Variant.ORIGINAL).first()
assert file.checksum == make_checksum(FileData.png)


def test_at_rest_encryption(admin_client, settings, document, mocker):
Expand Down Expand Up @@ -193,8 +210,8 @@ def test_validate_created_by(
if viewset == FileViewSet:
del post_data["data"]
post_data["content"] = io.BytesIO(b"datadatatatat")
for key in ["variant", "name"]:
post_data[key] = serialized_model[key]
post_data["name"] = "foo.txt"
post_data["variant"] = serialized_model["variant"]
post_data["document"] = serialized_model["document"]["id"]
if viewset == DocumentViewSet and not update:
post_data = document_post_data
Expand Down Expand Up @@ -366,6 +383,21 @@ def test_download_file(admin_client, file, presigned, expected_status):
assert result.status_code == expected_status


@pytest.mark.parametrize(
"mime_type,expected_content_disposition",
[("application/pdf", "inline"), ("text/html", "attachment")],
)
def test_download_file_mime_type(
admin_client, file_factory, mime_type, expected_content_disposition
):
file = file_factory(mime_type=mime_type)
response = admin_client.get(reverse("file-detail", args=(file.pk,)))
url = response.json()["data"]["attributes"]["download-url"]
result = admin_client.get(url)
assert result.headers["Content-Type"] == mime_type
assert expected_content_disposition in result.headers["Content-Disposition"]


@pytest.mark.freeze_time(TIMESTAMP, as_arg=True)
def test_presigned_url_expired(admin_client, client, file, freezer, settings):
response = admin_client.get(reverse("file-detail", args=(file.pk,)))
Expand Down
2 changes: 1 addition & 1 deletion alexandria/core/tests/test_viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def test_api_create(fixture, admin_client, viewset, snapshot, document_post_data
if viewset.get_view_name() == "File":
data = {
"content": io.BytesIO(b"FiLeCoNtEnt"),
"name": serializer.data["name"],
"name": "foo.txt",
"document": str(fixture.document.pk),
}
opts = {"format": "multipart"}
Expand Down
57 changes: 41 additions & 16 deletions alexandria/core/validations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import magic
from clamdpy import ClamdNetworkSocket
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from django.utils.translation import gettext_lazy
from generic_permissions.validation import validator_for
from rest_framework.exceptions import ValidationError

Expand All @@ -27,10 +27,12 @@ def validate_file_infection(file_content):
file_content.seek(0)

if result.status == "FOUND":
raise ValidationError(_("File is infected with malware."), code="infected")
raise ValidationError(
gettext_lazy("File is infected with malware."), code="infected"
)
elif result.status == "ERROR":
raise ValidationError(
(_("Malware scan had an error: ") + result.reason),
(gettext_lazy("Malware scan had an error: ") + result.reason),
code="incomplete",
)

Expand All @@ -47,7 +49,7 @@ def validate_mime_type(mime_type, category):
and mime_type not in category.allowed_mime_types
):
raise ValidationError(
_(
gettext_lazy(
"File type %(mime_type)s is not allowed in category %(category)s."
% {"mime_type": mime_type, "category": category.pk}
)
Expand All @@ -62,17 +64,40 @@ def validate_file(self, data, context):
validate_file_infection(data["content"])

# Validate that the mime type is allowed in the category
mime_type = data["content"].content_type
if mime_type == "application/octet-stream" or not mime_type:
guess, encoding = guess_type(data["name"])
if guess is not None:
mime_type = guess
else:
data["content"].seek(0)
mime_type = magic.from_buffer(data["content"].read(), mime=True)
data["content"].seek(0)

validate_mime_type(mime_type, data["document"].category)
data["mime_type"] = mime_type
content_type_header = data["content"].content_type
extension_type, _ = guess_type(data["name"])

if not content_type_header: # pragma: no cover
raise ValidationError(gettext_lazy("Missing Content-Type header"))
if not extension_type:
raise ValidationError(gettext_lazy("Unknown file extension"))

if content_type_header == "application/octet-stream":
content_type_header = extension_type
if content_type_header != extension_type:
raise ValidationError(
gettext_lazy(
"Content-Type %(content_type)s does not match file extension %(extension)s."
% {"content_type": content_type_header, "extension": extension_type}
)
)

data["content"].seek(0)
file_content_type = magic.from_buffer(data["content"].read(), mime=True)
data["content"].seek(0)

if file_content_type != content_type_header:
raise ValidationError(
gettext_lazy(
"Content-Type %(content_type)s does not match detected file content %(file_content_type)s."
% {
"content_type": content_type_header,
"file_content_type": file_content_type,
}
)
)

validate_mime_type(content_type_header, data["document"].category)
data["mime_type"] = content_type_header

return data
8 changes: 7 additions & 1 deletion alexandria/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class TagViewSet(PermissionViewMixin, VisibilityViewMixin, ModelViewSet):
filterset_class = TagFilterSet
search_fields = ("name", "description")
select_for_includes = {"tag_synonym_group": ["tag_synonym_group"]}
ordering_fields = "__all__"
ordering = ["name"]


class MarkViewSet(PermissionViewMixin, VisibilityViewMixin, ModelViewSet):
Expand Down Expand Up @@ -247,8 +249,12 @@ def download(self, request, pk=None):
)
obj = models.File.objects.get(pk=pk)

unsafe = obj.mime_type not in settings.SAFE_FOR_INLINE_DISPOSITION
return FileResponse(
obj.content.file.file, as_attachment=False, filename=obj.name
obj.content.file.file,
as_attachment=unsafe,
filename=obj.name,
content_type=obj.mime_type,
)
raise PermissionDenied(
_("For downloading a file use the presigned download URL.")
Expand Down
11 changes: 11 additions & 0 deletions alexandria/settings/alexandria.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,14 @@ def default(default_dev=env.NOTSET, default_prod=env.NOTSET):
ALEXANDRIA_CONTENT_SEARCH_TYPE = env.str(
"ALEXANDRIA_CONTENT_SEARCH_TYPE", default="phrase"
)

# Mime types that are considered safe for Content-Disposition: inline
SAFE_FOR_INLINE_DISPOSITION = env.list(
"ALEXANDRIA_SAFE_FOR_INLINE_DISPOSITION",
default=[
"application/pdf",
"image/jpeg",
"image/png",
"image/gif",
],
)

0 comments on commit 3c56ab6

Please sign in to comment.