From c982fd34084a06d81be0303231c75dd6a0ac8480 Mon Sep 17 00:00:00 2001 From: Gurdeep Atwal Date: Tue, 21 Jan 2025 14:25:16 +0000 Subject: [PATCH] file validation --- core/file_handler.py | 28 +++++++++++++++++--------- exporter/applications/forms/parties.py | 8 ++++++-- exporter/goods/forms/common.py | 1 + exporter/goods/forms/firearms.py | 2 ++ exporter/goods/forms/goods.py | 1 + exporter/organisation/forms.py | 3 +++ unit_tests/core/test_file_handler.py | 12 ++--------- 7 files changed, 33 insertions(+), 22 deletions(-) diff --git a/core/file_handler.py b/core/file_handler.py index 3bc7bb019b..c4e209fe92 100644 --- a/core/file_handler.py +++ b/core/file_handler.py @@ -1,8 +1,8 @@ import boto3 import logging +import magic from django.conf import settings -from django.core.files.uploadhandler import UploadFileException from django.http import StreamingHttpResponse from django_chunk_upload_handlers.s3 import S3FileUploadHandler @@ -46,9 +46,25 @@ def s3_client(): class SafeS3FileUploadHandler(S3FileUploadHandler): """ - S3FileUploadHandler. + S3FileUploadHandler with mime-type validation. """ + ACCEPTED_FILE_UPLOAD_MIME_TYPES = settings.ACCEPTED_FILE_UPLOAD_MIME_TYPES + + def receive_data_chunk(self, raw_data, start): + """ + Receive a single file chunk from the browser, validate the + file type for the first chunk and leave the rest to super. + """ + # For the first chunk + if start == 0: + mime = magic.from_buffer(raw_data, mime=True) + if mime not in self.ACCEPTED_FILE_UPLOAD_MIME_TYPES: + # Return non this halts the upload and lets the Filefield handle the + # validation for user messages. + return + super().receive_data_chunk(raw_data, start) + def file_complete(self, *args, **kwargs): """Override `file_complete` to ensure that all necessary attributes are set on the file object. @@ -62,14 +78,6 @@ def file_complete(self, *args, **kwargs): return file -class UploadFailed(UploadFileException): - pass - - -class UnacceptableMimeTypeError(UploadFailed): - pass - - def generate_file(result): for chunk in iter(lambda: result["Body"].read(settings.STREAMING_CHUNK_SIZE), b""): yield chunk diff --git a/exporter/applications/forms/parties.py b/exporter/applications/forms/parties.py index de3f631e38..c2aa55e71b 100644 --- a/exporter/applications/forms/parties.py +++ b/exporter/applications/forms/parties.py @@ -430,12 +430,12 @@ def get_title(self): class PartyDocumentUploadForm(forms.Form): title = "Upload an end-user document" - # from django.core.validators import FileExtensionValidator party_document = forms.FileField( label=FileUploadFileTypes.UPLOAD_GUIDANCE_TEXT, error_messages={ "required": "Select an end-user document", }, + allow_empty_file=True, validators=[FileExtensionValidator()], ) @@ -497,6 +497,7 @@ class PartyEnglishTranslationDocumentUploadForm(forms.Form): error_messages={ "required": "Select an English translation", }, + allow_empty_file=True, validators=[FileExtensionValidator()], ) @@ -526,6 +527,7 @@ class PartyCompanyLetterheadDocumentUploadForm(forms.Form): error_messages={ "required": "Select a document on company letterhead", }, + allow_empty_file=True, validators=[FileExtensionValidator()], ) @@ -550,7 +552,9 @@ def get_title(self): class PartyEC3DocumentUploadForm(forms.Form): title = "Upload an EC3 form (optional)" - party_ec3_document = forms.FileField(label="", required=False, validators=[FileExtensionValidator()]) + party_ec3_document = forms.FileField( + label="", required=False, allow_empty_file=True, validators=[FileExtensionValidator()] + ) ec3_missing_reason = forms.CharField( widget=forms.Textarea(attrs={"rows": "5"}), label="", diff --git a/exporter/goods/forms/common.py b/exporter/goods/forms/common.py index cb89613417..0a95546b9a 100644 --- a/exporter/goods/forms/common.py +++ b/exporter/goods/forms/common.py @@ -363,6 +363,7 @@ class Layout: error_messages={ "required": "Select a document that shows what your product is designed to do", }, + allow_empty_file=True, validators=[FileExtensionValidator()], ) description = forms.CharField( diff --git a/exporter/goods/forms/firearms.py b/exporter/goods/forms/firearms.py index b8f07ed1e7..5e313413b1 100644 --- a/exporter/goods/forms/firearms.py +++ b/exporter/goods/forms/firearms.py @@ -223,6 +223,7 @@ class Layout: error_messages={ "required": "Select a registered firearms dealer certificate", }, + allow_empty_file=True, widget=PotentiallyUnsafeClearableFileInput, validators=[FileExtensionValidator()], ) @@ -335,6 +336,7 @@ class Layout: widget=PotentiallyUnsafeClearableFileInput( force_required=True, ), + allow_empty_file=True, validators=[FileExtensionValidator()], ) diff --git a/exporter/goods/forms/goods.py b/exporter/goods/forms/goods.py index 7f92669e4d..ccbd7d0e12 100644 --- a/exporter/goods/forms/goods.py +++ b/exporter/goods/forms/goods.py @@ -1113,6 +1113,7 @@ class AttachFirearmsDealerCertificateForm(forms.Form): error_messages={ "required": "Select certificate file to upload", }, + allow_empty_file=True, validators=[FileExtensionValidator()], ) diff --git a/exporter/organisation/forms.py b/exporter/organisation/forms.py index ba45046a82..695da4e237 100644 --- a/exporter/organisation/forms.py +++ b/exporter/organisation/forms.py @@ -29,6 +29,8 @@ class Layout: label="", help_text="The file must be smaller than 50MB", error_messages={"required": "Select certificate file to upload"}, + allow_empty_file=True, + validators=[FileExtensionValidator()], ) reference_code = forms.CharField( label="Certificate number", @@ -64,6 +66,7 @@ class Layout: label=FileUploadFileTypes.UPLOAD_GUIDANCE_TEXT, help_text="The file must be smaller than 50MB", error_messages={"required": "Select certificate file to upload"}, + allow_empty_file=True, validators=[FileExtensionValidator()], ) reference_code = forms.CharField( diff --git a/unit_tests/core/test_file_handler.py b/unit_tests/core/test_file_handler.py index 063aed034e..a6e409da95 100644 --- a/unit_tests/core/test_file_handler.py +++ b/unit_tests/core/test_file_handler.py @@ -3,13 +3,11 @@ from unittest import mock from os import path -from django.core.files.uploadhandler import UploadFileException from core.file_handler import ( s3_client, SafeS3FileUploadHandler, S3Wrapper, - UnacceptableMimeTypeError, ) @@ -45,20 +43,14 @@ def test_invalid_file_type_upload(mock_handler): with open(f"{TEST_FILES_PATH}/invalid_type.zip", "rb") as f: content = f.read() mock_handler.abort = mock.Mock() - with pytest.raises(UnacceptableMimeTypeError, match="Unsupported file type: application/zip") as e: - mock_handler.receive_data_chunk(content, 0) - assert isinstance(e.value, UploadFileException) - mock_handler.abort.assert_called_once() + assert mock_handler.receive_data_chunk(content, 0) is None def test_invalid_file_mime_type_upload(mock_handler): with open(f"{TEST_FILES_PATH}/invalid_mime.txt", "rb") as f: content = f.read() mock_handler.abort = mock.Mock() - with pytest.raises(UnacceptableMimeTypeError, match="Unsupported file type: application/zip") as e: - mock_handler.receive_data_chunk(content, 0) - assert isinstance(e.value, UploadFileException) - mock_handler.abort.assert_called_once() + assert mock_handler.receive_data_chunk(content, 0) is None def test_s3_wrapper_get_client(settings, mocker):