Skip to content

Commit

Permalink
Rename identification tasks to be consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverjwroberts committed Jan 22, 2025
1 parent 1eb9b33 commit 40423ce
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 35 deletions.
22 changes: 13 additions & 9 deletions cron-scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
schedule_automatic_contact_archive,
)
from datahub.company.tasks.export_potential import update_company_export_potential_from_csv
from datahub.company_activity.tasks.ingest_company_activity import ingest_activity_data
from datahub.company_activity.tasks.ingest_stova_attendees import ingest_stova_attendee_data
from datahub.company_activity.tasks.ingest_stova_events import ingest_stova_event_data
from datahub.company_activity.tasks.ingest_company_activity import (
company_activity_identification_task,
)
from datahub.company_activity.tasks.ingest_stova_attendees import (
stova_attendee_identification_task,
)
from datahub.company_activity.tasks.ingest_stova_events import stova_event_identification_task
from datahub.core.queues.constants import (
EVERY_EIGHT_AM,
EVERY_EIGHT_THIRTY_AM_ON_FIRST_EACH_MONTH,
Expand Down Expand Up @@ -132,24 +136,24 @@ def schedule_jobs():
description='Update companies from dnb service',
)
job_scheduler(
function=ingest_activity_data,
function=company_activity_identification_task,
cron=EVERY_HOUR,
description='Check S3 for new Company Activity data files and schedule ingestion',
description='Identify new company activity objects',
)
job_scheduler(
function=eyb_triage_identification_task,
cron=EVERY_HOUR,
description='Identify new EYB triage objects',
)
job_scheduler(
function=ingest_stova_event_data,
function=stova_event_identification_task,
cron=EVERY_HOUR,
description='Check S3 for new Stova Event files and schedule ingestion',
description='Identify new Stova event objects',
)
job_scheduler(
function=ingest_stova_attendee_data,
function=stova_attendee_identification_task,
cron=EVERY_HOUR,
description='Check S3 for new Stova Attendee files and schedule ingestion',
description='Identify new Stova attendee objects',
)

if settings.ENABLE_ESTIMATED_LAND_DATE_REMINDERS:
Expand Down
8 changes: 4 additions & 4 deletions datahub/company_activity/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from datahub.company_activity.tasks.ingest_great_data import ingest_great_data
from datahub.company_activity.tasks.ingest_stova_attendees import stova_attendee_ingestion_task
from datahub.company_activity.tasks.ingest_stova_events import (
ingest_stova_event_data,
stova_ingestion_task,
stova_event_identification_task,
stova_event_ingestion_task,
)

__all__ = (
ingest_great_data,
ingest_stova_event_data,
stova_event_identification_task,
stova_attendee_ingestion_task,
stova_ingestion_task,
stova_event_ingestion_task,
)
2 changes: 1 addition & 1 deletion datahub/company_activity/tasks/ingest_company_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
TWO_HOURS_IN_SECONDS = 7200


def ingest_activity_data():
def company_activity_identification_task():
logger.info('Checking for new Company Activity data files')
task = CompanyActivityIngestionTask()
task.ingest()
Expand Down
2 changes: 1 addition & 1 deletion datahub/company_activity/tasks/ingest_stova_attendees.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'


def ingest_stova_attendee_data() -> None:
def stova_attendee_identification_task() -> None:
"""Identifies the most recent file to be ingested and schedules a task to ingest it"""
logger.info('Stova attendee identification task started.')
identification_task = StovaAttendeeIndentificationTask(prefix=STOVA_ATTENDEE_PREFIX)
Expand Down
6 changes: 3 additions & 3 deletions datahub/company_activity/tasks/ingest_stova_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'


def ingest_stova_event_data() -> None:
def stova_event_identification_task() -> None:
"""Identifies the most recent file to be ingested and schedules a task to ingest it"""
logger.info('Stova event identification task started.')
identification_task = StovaEventIndentificationTask(prefix=STOVA_EVENT_PREFIX)
identification_task.identify_new_objects(stova_ingestion_task)
identification_task.identify_new_objects(stova_event_ingestion_task)
logger.info('Stova event identification task finished.')


def stova_ingestion_task(object_key: str) -> None:
def stova_event_ingestion_task(object_key: str) -> None:
"""Ingest the given key (file) from S3"""
logger.info(f'Stova event ingestion task started for file {object_key}.')
ingestion_task = StovaEventIngestionTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from datahub.company_activity.tasks import ingest_great_data
from datahub.company_activity.tasks.constants import BUCKET, GREAT_PREFIX, REGION
from datahub.company_activity.tasks.ingest_company_activity import (
ingest_activity_data, TWO_HOURS_IN_SECONDS,
company_activity_identification_task, TWO_HOURS_IN_SECONDS,
)
from datahub.core.queues.constants import EVERY_HOUR
from datahub.core.queues.job_scheduler import job_scheduler
Expand Down Expand Up @@ -62,7 +62,7 @@ def test_company_activity_ingestion_task_schedule(self, mock_system):

scheduler = Scheduler(queue, connection=Redis.from_url(settings.REDIS_BASE_URL))
scheduled_jobs = scheduler.get_jobs()
func = 'datahub.company_activity.tasks.ingest_company_activity.ingest_activity_data'
func = 'datahub.company_activity.tasks.ingest_company_activity.company_activity_identification_task' # noqa
scheduled_job = [job for job in scheduled_jobs if job.func_name == func][0]
assert scheduled_job.meta['cron_string'] == EVERY_HOUR

Expand Down Expand Up @@ -94,7 +94,7 @@ def test_ingestion_job_is_queued_for_new_files(self, mock, test_files, caplog):
initial_job_count = rq_queue.count

with caplog.at_level(logging.INFO):
ingest_activity_data()
company_activity_identification_task()
assert 'Checking for new Company Activity data files' in caplog.text
assert f'Scheduled ingestion of {new_file}' in caplog.text

Expand Down Expand Up @@ -129,7 +129,7 @@ def test_ingestion_job_is_not_queued_for_already_ingested_file(self, mock, test_
initial_job_count = rq_queue.count

with caplog.at_level(logging.INFO):
ingest_activity_data()
company_activity_identification_task()
assert f'{new_file} has already been ingested' in caplog.text
assert rq_queue.count == initial_job_count

Expand Down Expand Up @@ -160,7 +160,7 @@ def test_job_not_queued_when_already_on_queue(self, mock, test_files, caplog):
initial_job_count = rq_queue.count

with caplog.at_level(logging.INFO):
ingest_activity_data()
company_activity_identification_task()
assert f'{new_file} has already been queued for ingestion' in caplog.text
assert rq_queue.count == initial_job_count

Expand Down Expand Up @@ -189,7 +189,7 @@ def test_job_not_queued_when_already_running(self, mock_worker, mock_scheduler,
mock_worker_instance.get_current_job = MagicMock(return_value=mock_job)
mock_worker.all.return_value = [mock_worker_instance]

ingest_activity_data()
company_activity_identification_task()
assert rq_queue.count == initial_job_count

@pytest.mark.django_db
Expand All @@ -213,7 +213,7 @@ def test_no_running_job(self, mock_worker, mock_scheduler, test_files):
mock_worker_instance = Worker(['long-running'], connection=redis)
mock_worker_instance.get_current_job = MagicMock(return_value=None)
mock_worker.all.return_value = [mock_worker_instance]
ingest_activity_data()
company_activity_identification_task()
assert rq_queue.count == initial_job_count + 1

@pytest.mark.django_db
Expand All @@ -232,7 +232,7 @@ def test_child_job(self, caplog, test_files):
IngestedFile.objects.create(filepath=file)

with caplog.at_level(logging.INFO):
ingest_activity_data()
company_activity_identification_task()
assert f'Ingesting file: {new_file} started' in caplog.text

@mock_aws
Expand All @@ -242,5 +242,5 @@ def test_empty_s3_bucket(self, caplog):
"""
setup_s3_bucket(BUCKET, [])
with caplog.at_level(logging.INFO):
ingest_activity_data()
company_activity_identification_task()
assert 'No files found' in caplog.text
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from datahub.company_activity.models import StovaAttendee
from datahub.company_activity.tasks.constants import BUCKET, REGION, STOVA_ATTENDEE_PREFIX
from datahub.company_activity.tasks.ingest_stova_attendees import (
ingest_stova_attendee_data,
stova_attendee_identification_task,
stova_attendee_ingestion_task,
StovaAttendeeIngestionTask,
)
Expand Down Expand Up @@ -129,7 +129,7 @@ def test_stova_data_file_ingestion(self, caplog, test_file, test_file_path):
create_stova_event_records()

with caplog.at_level(logging.INFO):
ingest_stova_attendee_data()
stova_attendee_identification_task()
assert 'Stova attendee identification task started.' in caplog.text
assert 'Stova attendee identification task finished.' in caplog.text
assert (
Expand Down Expand Up @@ -404,7 +404,7 @@ def test_stova_attendee_not_created_if_event_does_not_exist(
setup_s3_files(BUCKET, test_file, test_file_path)

with caplog.at_level(logging.INFO):
ingest_stova_attendee_data()
stova_attendee_identification_task()
# These are from the fixture file.
assert (
'The event associated with this attendee does not exist, skipping attendee with '
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from datahub.company_activity.models import StovaEvent
from datahub.company_activity.tasks.constants import BUCKET, REGION, STOVA_EVENT_PREFIX
from datahub.company_activity.tasks.ingest_stova_events import (
ingest_stova_event_data,
stova_ingestion_task,
stova_event_identification_task,
stova_event_ingestion_task,
StovaEventIngestionTask,
)
from datahub.company_activity.tests.factories import (
Expand Down Expand Up @@ -124,7 +124,7 @@ def test_stova_data_file_ingestion(self, caplog, test_file, test_file_path):
setup_s3_bucket(BUCKET)
setup_s3_files(BUCKET, test_file, test_file_path)
with caplog.at_level(logging.INFO):
ingest_stova_event_data()
stova_event_identification_task()
assert 'Stova event identification task started.' in caplog.text
assert 'Stova event identification task finished.' in caplog.text
assert f'Stova event ingestion task started for file {test_file_path}' in caplog.text
Expand All @@ -149,7 +149,7 @@ def test_skip_previously_ingested_records(self, test_file_path, test_base_stova_
test_file = gzip.compress(record.encode('utf-8'))
setup_s3_bucket(BUCKET)
setup_s3_files(BUCKET, test_file, test_file_path)
stova_ingestion_task(test_file_path)
stova_event_ingestion_task(test_file_path)
assert StovaEvent.objects.filter(stova_event_id=123456789).count() == 1

@pytest.mark.django_db
Expand All @@ -163,7 +163,7 @@ def test_invalid_file(self, test_file_path):
init(transport=mock_transport)
setup_s3_bucket(BUCKET)
with pytest.raises(Exception) as e:
stova_ingestion_task(test_file_path)
stova_event_ingestion_task(test_file_path)
exception = e.value.args[0]
assert 'The specified key does not exist' in exception
expected = "key: 'data-flow/exports/ExportAventriEvents/" 'stovaEventFake2.jsonl.gz'
Expand Down

0 comments on commit 40423ce

Please sign in to comment.