diff --git a/cron-scheduler.py b/cron-scheduler.py index be627e7d2..4da03e6b7 100644 --- a/cron-scheduler.py +++ b/cron-scheduler.py @@ -17,9 +17,13 @@ schedule_automatic_contact_archive, ) from datahub.company.tasks.export_potential import update_company_export_potential_from_csv -from datahub.company_activity.tasks.ingest_company_activity import ingest_activity_data -from datahub.company_activity.tasks.ingest_stova_attendees import ingest_stova_attendee_data -from datahub.company_activity.tasks.ingest_stova_events import ingest_stova_event_data +from datahub.company_activity.tasks.ingest_company_activity import ( + company_activity_identification_task, +) +from datahub.company_activity.tasks.ingest_stova_attendees import ( + stova_attendee_identification_task, +) +from datahub.company_activity.tasks.ingest_stova_events import stova_event_identification_task from datahub.core.queues.constants import ( EVERY_EIGHT_AM, EVERY_EIGHT_THIRTY_AM_ON_FIRST_EACH_MONTH, @@ -132,9 +136,9 @@ def schedule_jobs(): description='Update companies from dnb service', ) job_scheduler( - function=ingest_activity_data, + function=company_activity_identification_task, cron=EVERY_HOUR, - description='Check S3 for new Company Activity data files and schedule ingestion', + description='Identify new company activity objects', ) job_scheduler( function=eyb_triage_identification_task, @@ -142,14 +146,14 @@ def schedule_jobs(): description='Identify new EYB triage objects', ) job_scheduler( - function=ingest_stova_event_data, + function=stova_event_identification_task, cron=EVERY_HOUR, - description='Check S3 for new Stova Event files and schedule ingestion', + description='Identify new Stova event objects', ) job_scheduler( - function=ingest_stova_attendee_data, + function=stova_attendee_identification_task, cron=EVERY_HOUR, - description='Check S3 for new Stova Attendee files and schedule ingestion', + description='Identify new Stova attendee objects', ) if settings.ENABLE_ESTIMATED_LAND_DATE_REMINDERS: diff --git a/datahub/company_activity/tasks/__init__.py b/datahub/company_activity/tasks/__init__.py index 255d19bf8..122277d46 100644 --- a/datahub/company_activity/tasks/__init__.py +++ b/datahub/company_activity/tasks/__init__.py @@ -1,13 +1,13 @@ from datahub.company_activity.tasks.ingest_great_data import ingest_great_data from datahub.company_activity.tasks.ingest_stova_attendees import stova_attendee_ingestion_task from datahub.company_activity.tasks.ingest_stova_events import ( - ingest_stova_event_data, - stova_ingestion_task, + stova_event_identification_task, + stova_event_ingestion_task, ) __all__ = ( ingest_great_data, - ingest_stova_event_data, + stova_event_identification_task, stova_attendee_ingestion_task, - stova_ingestion_task, + stova_event_ingestion_task, ) diff --git a/datahub/company_activity/tasks/ingest_company_activity.py b/datahub/company_activity/tasks/ingest_company_activity.py index de137c699..87eedb1df 100644 --- a/datahub/company_activity/tasks/ingest_company_activity.py +++ b/datahub/company_activity/tasks/ingest_company_activity.py @@ -16,7 +16,7 @@ TWO_HOURS_IN_SECONDS = 7200 -def ingest_activity_data(): +def company_activity_identification_task(): logger.info('Checking for new Company Activity data files') task = CompanyActivityIngestionTask() task.ingest() diff --git a/datahub/company_activity/tasks/ingest_stova_attendees.py b/datahub/company_activity/tasks/ingest_stova_attendees.py index 23298e6bd..ed0dba4ec 100644 --- a/datahub/company_activity/tasks/ingest_stova_attendees.py +++ b/datahub/company_activity/tasks/ingest_stova_attendees.py @@ -16,7 +16,7 @@ DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' -def ingest_stova_attendee_data() -> None: +def stova_attendee_identification_task() -> None: """Identifies the most recent file to be ingested and schedules a task to ingest it""" logger.info('Stova attendee identification task started.') identification_task = StovaAttendeeIndentificationTask(prefix=STOVA_ATTENDEE_PREFIX) diff --git a/datahub/company_activity/tasks/ingest_stova_events.py b/datahub/company_activity/tasks/ingest_stova_events.py index 1c0829774..b9145b636 100644 --- a/datahub/company_activity/tasks/ingest_stova_events.py +++ b/datahub/company_activity/tasks/ingest_stova_events.py @@ -13,15 +13,15 @@ DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' -def ingest_stova_event_data() -> None: +def stova_event_identification_task() -> None: """Identifies the most recent file to be ingested and schedules a task to ingest it""" logger.info('Stova event identification task started.') identification_task = StovaEventIndentificationTask(prefix=STOVA_EVENT_PREFIX) - identification_task.identify_new_objects(stova_ingestion_task) + identification_task.identify_new_objects(stova_event_ingestion_task) logger.info('Stova event identification task finished.') -def stova_ingestion_task(object_key: str) -> None: +def stova_event_ingestion_task(object_key: str) -> None: """Ingest the given key (file) from S3""" logger.info(f'Stova event ingestion task started for file {object_key}.') ingestion_task = StovaEventIngestionTask( diff --git a/datahub/company_activity/tests/test_tasks/test_ingestion_tasks.py b/datahub/company_activity/tests/test_tasks/test_ingestion_tasks.py index 486b1a530..850f96ad7 100644 --- a/datahub/company_activity/tests/test_tasks/test_ingestion_tasks.py +++ b/datahub/company_activity/tests/test_tasks/test_ingestion_tasks.py @@ -19,7 +19,7 @@ from datahub.company_activity.tasks import ingest_great_data from datahub.company_activity.tasks.constants import BUCKET, GREAT_PREFIX, REGION from datahub.company_activity.tasks.ingest_company_activity import ( - ingest_activity_data, TWO_HOURS_IN_SECONDS, + company_activity_identification_task, TWO_HOURS_IN_SECONDS, ) from datahub.core.queues.constants import EVERY_HOUR from datahub.core.queues.job_scheduler import job_scheduler @@ -62,7 +62,7 @@ def test_company_activity_ingestion_task_schedule(self, mock_system): scheduler = Scheduler(queue, connection=Redis.from_url(settings.REDIS_BASE_URL)) scheduled_jobs = scheduler.get_jobs() - func = 'datahub.company_activity.tasks.ingest_company_activity.ingest_activity_data' + func = 'datahub.company_activity.tasks.ingest_company_activity.company_activity_identification_task' # noqa scheduled_job = [job for job in scheduled_jobs if job.func_name == func][0] assert scheduled_job.meta['cron_string'] == EVERY_HOUR @@ -94,7 +94,7 @@ def test_ingestion_job_is_queued_for_new_files(self, mock, test_files, caplog): initial_job_count = rq_queue.count with caplog.at_level(logging.INFO): - ingest_activity_data() + company_activity_identification_task() assert 'Checking for new Company Activity data files' in caplog.text assert f'Scheduled ingestion of {new_file}' in caplog.text @@ -129,7 +129,7 @@ def test_ingestion_job_is_not_queued_for_already_ingested_file(self, mock, test_ initial_job_count = rq_queue.count with caplog.at_level(logging.INFO): - ingest_activity_data() + company_activity_identification_task() assert f'{new_file} has already been ingested' in caplog.text assert rq_queue.count == initial_job_count @@ -160,7 +160,7 @@ def test_job_not_queued_when_already_on_queue(self, mock, test_files, caplog): initial_job_count = rq_queue.count with caplog.at_level(logging.INFO): - ingest_activity_data() + company_activity_identification_task() assert f'{new_file} has already been queued for ingestion' in caplog.text assert rq_queue.count == initial_job_count @@ -189,7 +189,7 @@ def test_job_not_queued_when_already_running(self, mock_worker, mock_scheduler, mock_worker_instance.get_current_job = MagicMock(return_value=mock_job) mock_worker.all.return_value = [mock_worker_instance] - ingest_activity_data() + company_activity_identification_task() assert rq_queue.count == initial_job_count @pytest.mark.django_db @@ -213,7 +213,7 @@ def test_no_running_job(self, mock_worker, mock_scheduler, test_files): mock_worker_instance = Worker(['long-running'], connection=redis) mock_worker_instance.get_current_job = MagicMock(return_value=None) mock_worker.all.return_value = [mock_worker_instance] - ingest_activity_data() + company_activity_identification_task() assert rq_queue.count == initial_job_count + 1 @pytest.mark.django_db @@ -232,7 +232,7 @@ def test_child_job(self, caplog, test_files): IngestedFile.objects.create(filepath=file) with caplog.at_level(logging.INFO): - ingest_activity_data() + company_activity_identification_task() assert f'Ingesting file: {new_file} started' in caplog.text @mock_aws @@ -242,5 +242,5 @@ def test_empty_s3_bucket(self, caplog): """ setup_s3_bucket(BUCKET, []) with caplog.at_level(logging.INFO): - ingest_activity_data() + company_activity_identification_task() assert 'No files found' in caplog.text diff --git a/datahub/company_activity/tests/test_tasks/test_stova_attendee_ingestion_task.py b/datahub/company_activity/tests/test_tasks/test_stova_attendee_ingestion_task.py index 291e8c764..c53878e3a 100644 --- a/datahub/company_activity/tests/test_tasks/test_stova_attendee_ingestion_task.py +++ b/datahub/company_activity/tests/test_tasks/test_stova_attendee_ingestion_task.py @@ -18,7 +18,7 @@ from datahub.company_activity.models import StovaAttendee from datahub.company_activity.tasks.constants import BUCKET, REGION, STOVA_ATTENDEE_PREFIX from datahub.company_activity.tasks.ingest_stova_attendees import ( - ingest_stova_attendee_data, + stova_attendee_identification_task, stova_attendee_ingestion_task, StovaAttendeeIngestionTask, ) @@ -129,7 +129,7 @@ def test_stova_data_file_ingestion(self, caplog, test_file, test_file_path): create_stova_event_records() with caplog.at_level(logging.INFO): - ingest_stova_attendee_data() + stova_attendee_identification_task() assert 'Stova attendee identification task started.' in caplog.text assert 'Stova attendee identification task finished.' in caplog.text assert ( @@ -404,7 +404,7 @@ def test_stova_attendee_not_created_if_event_does_not_exist( setup_s3_files(BUCKET, test_file, test_file_path) with caplog.at_level(logging.INFO): - ingest_stova_attendee_data() + stova_attendee_identification_task() # These are from the fixture file. assert ( 'The event associated with this attendee does not exist, skipping attendee with ' diff --git a/datahub/company_activity/tests/test_tasks/test_stova_ingestion_task.py b/datahub/company_activity/tests/test_tasks/test_stova_ingestion_task.py index d589dd7fb..d3f7d2b47 100644 --- a/datahub/company_activity/tests/test_tasks/test_stova_ingestion_task.py +++ b/datahub/company_activity/tests/test_tasks/test_stova_ingestion_task.py @@ -16,8 +16,8 @@ from datahub.company_activity.models import StovaEvent from datahub.company_activity.tasks.constants import BUCKET, REGION, STOVA_EVENT_PREFIX from datahub.company_activity.tasks.ingest_stova_events import ( - ingest_stova_event_data, - stova_ingestion_task, + stova_event_identification_task, + stova_event_ingestion_task, StovaEventIngestionTask, ) from datahub.company_activity.tests.factories import ( @@ -124,7 +124,7 @@ def test_stova_data_file_ingestion(self, caplog, test_file, test_file_path): setup_s3_bucket(BUCKET) setup_s3_files(BUCKET, test_file, test_file_path) with caplog.at_level(logging.INFO): - ingest_stova_event_data() + stova_event_identification_task() assert 'Stova event identification task started.' in caplog.text assert 'Stova event identification task finished.' in caplog.text assert f'Stova event ingestion task started for file {test_file_path}' in caplog.text @@ -149,7 +149,7 @@ def test_skip_previously_ingested_records(self, test_file_path, test_base_stova_ test_file = gzip.compress(record.encode('utf-8')) setup_s3_bucket(BUCKET) setup_s3_files(BUCKET, test_file, test_file_path) - stova_ingestion_task(test_file_path) + stova_event_ingestion_task(test_file_path) assert StovaEvent.objects.filter(stova_event_id=123456789).count() == 1 @pytest.mark.django_db @@ -163,7 +163,7 @@ def test_invalid_file(self, test_file_path): init(transport=mock_transport) setup_s3_bucket(BUCKET) with pytest.raises(Exception) as e: - stova_ingestion_task(test_file_path) + stova_event_ingestion_task(test_file_path) exception = e.value.args[0] assert 'The specified key does not exist' in exception expected = "key: 'data-flow/exports/ExportAventriEvents/" 'stovaEventFake2.jsonl.gz'