From 4e84ea8f992941274bf533e6ae878d298d988192 Mon Sep 17 00:00:00 2001 From: Le H Ngo Date: Mon, 15 Jan 2024 16:50:20 +0000 Subject: [PATCH 1/5] Adds celery healthcheck --- api/conf/settings.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/conf/settings.py b/api/conf/settings.py index e8dd5a804f..4ad88673bd 100644 --- a/api/conf/settings.py +++ b/api/conf/settings.py @@ -114,6 +114,8 @@ "health_check.cache", "health_check.storage", "health_check.contrib.migrations", + "health_check.contrib.celery", + "health_check.contrib.celery_ping", "django_audit_log_middleware", "lite_routing", "api.appeals", From a9c303243d53fa4b9e4d88c2abcd8fd60f9bbb24 Mon Sep 17 00:00:00 2001 From: Arun Siluvery Date: Tue, 16 Jan 2024 17:37:05 +0000 Subject: [PATCH 2/5] Add log event about acquiring lock in send_licence_details_to_lite_hmrc task --- api/licences/celery_tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/api/licences/celery_tasks.py b/api/licences/celery_tasks.py index 5aa1484237..a139e21a5d 100644 --- a/api/licences/celery_tasks.py +++ b/api/licences/celery_tasks.py @@ -29,6 +29,7 @@ def send_licence_details_to_lite_hmrc(licence_id, action): try: with transaction.atomic(): # transaction.atomic + select_for_update + nowait=True will throw an error if row has already been locked + logger.info("Attempt to acquire lock (non-blocking) before updating licence %s", str(licence_id)) licence = Licence.objects.select_for_update(nowait=True).get(id=licence_id) send_licence(licence, action) except HMRCIntegrationException as e: From b2e2f54f7d34cbd620eb3c363987fd2269440876 Mon Sep 17 00:00:00 2001 From: Arun Siluvery Date: Tue, 16 Jan 2024 18:12:45 +0000 Subject: [PATCH 3/5] Delay scheduling of send_licence_details_to_lite_hmrc by 10 sec When a licence is issued many other things happen at that time which include, - updating of Case status - sending notification email to Exporter - issuing of licence When the task to send licence details to lite-hmrc runs it first attempts to acquire lock on the table as it needs to update table. However it is observed that the task is not able to aquire the lock. One possibility is the updates are still pending so to avoid any issues acquiring lock defer the task execution by 10 sec which should be more than sufficient to flush all pending updates. --- api/licences/celery_tasks.py | 11 ++++++++--- .../libraries/hmrc_integration_operations.py | 10 +++++----- api/licences/tests/test_api_to_hmrc_integration.py | 12 +++++++----- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/api/licences/celery_tasks.py b/api/licences/celery_tasks.py index a139e21a5d..00340ae823 100644 --- a/api/licences/celery_tasks.py +++ b/api/licences/celery_tasks.py @@ -9,7 +9,7 @@ MAX_ATTEMPTS = 5 RETRY_BACKOFF = 1200 - +DEFER_EXECUTION_DELAY = 10 # secs logger = get_task_logger(__name__) @@ -29,8 +29,10 @@ def send_licence_details_to_lite_hmrc(licence_id, action): try: with transaction.atomic(): # transaction.atomic + select_for_update + nowait=True will throw an error if row has already been locked + # nowait=True makes it non-blocking logger.info("Attempt to acquire lock (non-blocking) before updating licence %s", str(licence_id)) licence = Licence.objects.select_for_update(nowait=True).get(id=licence_id) + send_licence(licence, action) except HMRCIntegrationException as e: logger.error("Error sending licence %s details to lite-hmrc: %s", str(licence_id), str(e)) @@ -50,5 +52,8 @@ def schedule_licence_details_to_lite_hmrc(licence_id, action): ) return - logger.info("Scheduling task to %s licence %s details to lite-hmrc", action, str(licence_id)) - send_licence_details_to_lite_hmrc.delay(licence_id, action) + logger.info("Scheduling task to %s licence %s details in lite-hmrc", action, licence.reference_code) + + # Defer task execution to allow for all Licence updates to complete + # so that the task be able to acquire lock when it executes + send_licence_details_to_lite_hmrc.apply_async(args=(licence_id, action), countdown=DEFER_EXECUTION_DELAY) diff --git a/api/licences/libraries/hmrc_integration_operations.py b/api/licences/libraries/hmrc_integration_operations.py index 27bb4d847a..3e54df36cb 100644 --- a/api/licences/libraries/hmrc_integration_operations.py +++ b/api/licences/libraries/hmrc_integration_operations.py @@ -33,9 +33,9 @@ class HMRCIntegrationException(APIException): def send_licence(licence: Licence, action: str): - """Sends licence information to HMRC Integration""" + """Sends licence information to lite-hmrc""" - logging.info(f"Sending licence '{licence.id}', action '{action}' to HMRC Integration") + logging.info(f"Sending licence '{licence.reference_code}' with action '{action}' to lite-hmrc") url = f"{settings.LITE_HMRC_INTEGRATION_URL}{SEND_LICENCE_ENDPOINT}" data = {"licence": HMRCIntegrationLicenceSerializer(licence).data} @@ -46,15 +46,15 @@ def send_licence(licence: Licence, action: str): if response.status_code not in [status.HTTP_200_OK, status.HTTP_201_CREATED]: raise HMRCIntegrationException( - f"An unexpected response was received when sending licence '{licence.id}', action '{action}' to HMRC " - f"Integration -> status={response.status_code}, message={response.text}" + f"An unexpected response was received when sending licence '{licence.reference_code}', action '{action}' to lite-hmrc " + f"-> status={response.status_code}, message={response.text}" ) if response.status_code == status.HTTP_201_CREATED: licence.hmrc_integration_sent_at = timezone.now() licence.save() - logging.info(f"Successfully sent licence '{licence.id}', action '{action}' to HMRC Integration") + logging.info(f"Successfully sent licence '{licence.reference_code}' with action '{action}' to lite-hmrc") def get_mail_status(licence: Licence): diff --git a/api/licences/tests/test_api_to_hmrc_integration.py b/api/licences/tests/test_api_to_hmrc_integration.py index c5d5fa7946..17e45f7028 100644 --- a/api/licences/tests/test_api_to_hmrc_integration.py +++ b/api/licences/tests/test_api_to_hmrc_integration.py @@ -380,8 +380,8 @@ def test_send_licence_failure(self, serializer, requests_post): requests_post.assert_called_once() self.assertEqual( str(error.exception), - f"An unexpected response was received when sending licence '{self.standard_licence.id}', " - f"action '{self.hmrc_integration_status}' to HMRC Integration -> status=400, message=Bad request", + f"An unexpected response was received when sending licence '{self.standard_licence.reference_code}', " + f"action '{self.hmrc_integration_status}' to lite-hmrc -> status=400, message=Bad request", ) self.assertIsNone(self.standard_licence.hmrc_integration_sent_at) @@ -477,14 +477,14 @@ def setUp(self): value=product.value, ) - @mock.patch("api.licences.celery_tasks.send_licence_details_to_lite_hmrc.delay") + @mock.patch("api.licences.celery_tasks.send_licence_details_to_lite_hmrc.apply_async") def test_schedule_licence_details_to_lite_hmrc(self, send_licence_details_to_lite_hmrc_task): send_licence_details_to_lite_hmrc_task.return_value = None schedule_licence_details_to_lite_hmrc(str(self.standard_licence.id), self.hmrc_integration_status) send_licence_details_to_lite_hmrc_task.assert_called_with( - str(self.standard_licence.id), self.hmrc_integration_status + args=(str(self.standard_licence.id), self.hmrc_integration_status), countdown=10 ) @mock.patch("api.licences.celery_tasks.send_licence") @@ -507,7 +507,9 @@ def test_send_licence_to_hmrc_integration_failure(self, send_licence): def test_send_licence_to_hmrc_integration_with_background_task_success(self, send_licence): send_licence.return_value = None - send_licence_details_to_lite_hmrc.delay(str(self.standard_licence.id), self.hmrc_integration_status) + send_licence_details_to_lite_hmrc.apply_async( + args=(str(self.standard_licence.id), self.hmrc_integration_status) + ) send_licence.assert_called_once() From b6cbbbf424760da2de43cc392097e8c56b792271 Mon Sep 17 00:00:00 2001 From: Arun Siluvery Date: Wed, 17 Jan 2024 10:30:24 +0000 Subject: [PATCH 4/5] Wait till the lock is available before updating Licence table The task that sends licence details to lite-hmrc is scheduled when licence is issued which is already part of a transaction. Licence row would've already been locked during that transaction so if continue without waiting it will result in OperationalError. --- api/licences/celery_tasks.py | 18 ++++++++++-------- .../tests/test_api_to_hmrc_integration.py | 8 +++----- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/api/licences/celery_tasks.py b/api/licences/celery_tasks.py index 00340ae823..81268bc535 100644 --- a/api/licences/celery_tasks.py +++ b/api/licences/celery_tasks.py @@ -9,7 +9,6 @@ MAX_ATTEMPTS = 5 RETRY_BACKOFF = 1200 -DEFER_EXECUTION_DELAY = 10 # secs logger = get_task_logger(__name__) @@ -28,10 +27,15 @@ def send_licence_details_to_lite_hmrc(licence_id, action): """ try: with transaction.atomic(): - # transaction.atomic + select_for_update + nowait=True will throw an error if row has already been locked - # nowait=True makes it non-blocking - logger.info("Attempt to acquire lock (non-blocking) before updating licence %s", str(licence_id)) - licence = Licence.objects.select_for_update(nowait=True).get(id=licence_id) + # It is essential to use select_for_update() without nowait=True because + # if lock is not available then we need to wait here till it is available. + # + # This task is scheduled when licence is issued which is already part of a transaction. + # Licence row would've already been locked during that transaction so if continue + # without waiting it will result in OperationalError. + # Wait here till it is released at which point this continues execution. + logger.info("Attempt to acquire lock (blocking) before updating licence %s", str(licence_id)) + licence = Licence.objects.select_for_update().get(id=licence_id) send_licence(licence, action) except HMRCIntegrationException as e: @@ -54,6 +58,4 @@ def schedule_licence_details_to_lite_hmrc(licence_id, action): logger.info("Scheduling task to %s licence %s details in lite-hmrc", action, licence.reference_code) - # Defer task execution to allow for all Licence updates to complete - # so that the task be able to acquire lock when it executes - send_licence_details_to_lite_hmrc.apply_async(args=(licence_id, action), countdown=DEFER_EXECUTION_DELAY) + send_licence_details_to_lite_hmrc.delay(licence_id, action) diff --git a/api/licences/tests/test_api_to_hmrc_integration.py b/api/licences/tests/test_api_to_hmrc_integration.py index 17e45f7028..35020eacdb 100644 --- a/api/licences/tests/test_api_to_hmrc_integration.py +++ b/api/licences/tests/test_api_to_hmrc_integration.py @@ -477,14 +477,14 @@ def setUp(self): value=product.value, ) - @mock.patch("api.licences.celery_tasks.send_licence_details_to_lite_hmrc.apply_async") + @mock.patch("api.licences.celery_tasks.send_licence_details_to_lite_hmrc.delay") def test_schedule_licence_details_to_lite_hmrc(self, send_licence_details_to_lite_hmrc_task): send_licence_details_to_lite_hmrc_task.return_value = None schedule_licence_details_to_lite_hmrc(str(self.standard_licence.id), self.hmrc_integration_status) send_licence_details_to_lite_hmrc_task.assert_called_with( - args=(str(self.standard_licence.id), self.hmrc_integration_status), countdown=10 + str(self.standard_licence.id), self.hmrc_integration_status ) @mock.patch("api.licences.celery_tasks.send_licence") @@ -507,9 +507,7 @@ def test_send_licence_to_hmrc_integration_failure(self, send_licence): def test_send_licence_to_hmrc_integration_with_background_task_success(self, send_licence): send_licence.return_value = None - send_licence_details_to_lite_hmrc.apply_async( - args=(str(self.standard_licence.id), self.hmrc_integration_status) - ) + send_licence_details_to_lite_hmrc.delay(str(self.standard_licence.id), self.hmrc_integration_status) send_licence.assert_called_once() From d863af8458a09f06d470aa87224dd8ee6e4b2c31 Mon Sep 17 00:00:00 2001 From: Arun Siluvery Date: Wed, 17 Jan 2024 10:47:36 +0000 Subject: [PATCH 5/5] Add additional logs during case finalisation To better capture the sequence of events during case finalisation and the start of task that sends licence details to lite-hmrc --- api/cases/views/views.py | 7 +++++++ api/licences/libraries/hmrc_integration_operations.py | 5 +++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/api/cases/views/views.py b/api/cases/views/views.py index ea7bba332a..f96e8a12cc 100644 --- a/api/cases/views/views.py +++ b/api/cases/views/views.py @@ -1,3 +1,5 @@ +import logging + from django.core.exceptions import PermissionDenied from django.db import transaction from django.http.response import JsonResponse, HttpResponse @@ -954,6 +956,8 @@ def put(self, request, pk): ) licence.decisions.set([Decision.objects.get(name=decision) for decision in required_decisions]) + + logging.info("Initiate issue of licence %s (status: %s)", licence.reference_code, licence.status) licence.issue() return_payload["licence"] = licence.id @@ -976,6 +980,7 @@ def put(self, request, pk): old_status = case.status.status case.status = get_case_status_by_status(CaseStatusEnum.FINALISED) case.save() + logging.info("Case status is now finalised") decisions = required_decisions.copy() @@ -1017,6 +1022,8 @@ def put(self, request, pk): for document in documents: document.send_exporter_notifications() + logging.info("Licence documents visible to exporter, notification sent") + return JsonResponse(return_payload, status=status.HTTP_201_CREATED) diff --git a/api/licences/libraries/hmrc_integration_operations.py b/api/licences/libraries/hmrc_integration_operations.py index 3e54df36cb..e6bfdcbe69 100644 --- a/api/licences/libraries/hmrc_integration_operations.py +++ b/api/licences/libraries/hmrc_integration_operations.py @@ -35,7 +35,7 @@ class HMRCIntegrationException(APIException): def send_licence(licence: Licence, action: str): """Sends licence information to lite-hmrc""" - logging.info(f"Sending licence '{licence.reference_code}' with action '{action}' to lite-hmrc") + logging.info("Sending licence %s with action %s to lite-hmrc", licence.reference_code, action) url = f"{settings.LITE_HMRC_INTEGRATION_URL}{SEND_LICENCE_ENDPOINT}" data = {"licence": HMRCIntegrationLicenceSerializer(licence).data} @@ -51,10 +51,11 @@ def send_licence(licence: Licence, action: str): ) if response.status_code == status.HTTP_201_CREATED: + logging.info("Record that new licence %s with action %s is sent to lite-hmrc", licence.reference_code, action) licence.hmrc_integration_sent_at = timezone.now() licence.save() - logging.info(f"Successfully sent licence '{licence.reference_code}' with action '{action}' to lite-hmrc") + logging.info("Successfully sent licence %s with action %s to lite-hmrc", licence.reference_code, action) def get_mail_status(licence: Licence):