From 5c21d1d280e1df69737214f11ef5395c9150dd0c Mon Sep 17 00:00:00 2001 From: Oliver Roberts Date: Wed, 22 Jan 2025 13:50:33 +0000 Subject: [PATCH] Document `nest_records` argument --- datahub/ingest/utils.py | 12 ++++++++---- .../test/test_tasks/test_ingest_eyb_marketing.py | 4 ++-- .../test/test_tasks/test_ingest_eyb_triage.py | 2 +- .../test/test_tasks/test_ingest_eyb_user.py | 4 ++-- .../test_tasks/test_lead_linking_company_contact.py | 4 ++-- 5 files changed, 15 insertions(+), 11 deletions(-) diff --git a/datahub/ingest/utils.py b/datahub/ingest/utils.py index b0854e9bf..fa4e764b5 100644 --- a/datahub/ingest/utils.py +++ b/datahub/ingest/utils.py @@ -4,11 +4,15 @@ from datahub.ingest.boto3 import S3ObjectProcessor -def compressed_json_faker(records: list[dict] = None, nested: bool = True) -> bytes: - """Serializes and compresses records into a zipped JSON, encoded with UTF-8.""" +def compressed_json_faker(records: list[dict] = None, nest_records: bool = True) -> bytes: + """Serializes and compresses records into a zipped JSON, encoded with UTF-8. + + The `nest_records` argument indicates wether records should be nested under + an 'object' key, which is common for those transported by Data Flow pipelines. + """ json_lines = [ - json.dumps({'object': record}, default=str) - if nested else json.dumps(record, default=str) + json.dumps({'object': record}, default=str) if nest_records + else json.dumps(record, default=str) for record in records ] compressed_content = gzip.compress('\n'.join(json_lines).encode('utf-8')) diff --git a/datahub/investment_lead/test/test_tasks/test_ingest_eyb_marketing.py b/datahub/investment_lead/test/test_tasks/test_ingest_eyb_marketing.py index 995992948..5ed0d8b51 100644 --- a/datahub/investment_lead/test/test_tasks/test_ingest_eyb_marketing.py +++ b/datahub/investment_lead/test/test_tasks/test_ingest_eyb_marketing.py @@ -81,7 +81,7 @@ def test_ingestion_task_success( marketing_object_key, s3_object_processor, caplog, ): records = [eyb_lead_marketing_record_faker()] - object_definition = (marketing_object_key, compressed_json_faker(records, nested=False)) + object_definition = (marketing_object_key, compressed_json_faker(records, nest_records=False)) upload_objects_to_s3(s3_object_processor, [object_definition]) with caplog.at_level(logging.INFO): @@ -110,7 +110,7 @@ def test_ingestion_task_does_not_update_existing( 'utm_source': 'Advert', }), ] - object_definition = (marketing_object_key, compressed_json_faker(records, nested=False)) + object_definition = (marketing_object_key, compressed_json_faker(records, nest_records=False)) upload_objects_to_s3(s3_object_processor, [object_definition]) eyb_marketing_ingestion_task(marketing_object_key) diff --git a/datahub/investment_lead/test/test_tasks/test_ingest_eyb_triage.py b/datahub/investment_lead/test/test_tasks/test_ingest_eyb_triage.py index ed1f22a1c..ac74c32ca 100644 --- a/datahub/investment_lead/test/test_tasks/test_ingest_eyb_triage.py +++ b/datahub/investment_lead/test/test_tasks/test_ingest_eyb_triage.py @@ -83,7 +83,7 @@ def test_ingestion_task_schedules_user_identification_task( triage_object_key, s3_object_processor, caplog, ): records = [eyb_lead_triage_record_faker()] - object_definition = (triage_object_key, compressed_json_faker(records, nested=True)) + object_definition = (triage_object_key, compressed_json_faker(records, nest_records=True)) upload_objects_to_s3(s3_object_processor, [object_definition]) with ( diff --git a/datahub/investment_lead/test/test_tasks/test_ingest_eyb_user.py b/datahub/investment_lead/test/test_tasks/test_ingest_eyb_user.py index 5bf95382b..993bbdb31 100644 --- a/datahub/investment_lead/test/test_tasks/test_ingest_eyb_user.py +++ b/datahub/investment_lead/test/test_tasks/test_ingest_eyb_user.py @@ -83,7 +83,7 @@ def test_ingestion_task_triggers_company_linking( user_object_key, s3_object_processor, caplog, ): records = [eyb_lead_user_record_faker()] - object_definition = (user_object_key, compressed_json_faker(records, nested=True)) + object_definition = (user_object_key, compressed_json_faker(records, nest_records=True)) upload_objects_to_s3(s3_object_processor, [object_definition]) with ( @@ -102,7 +102,7 @@ def test_ingestion_task_schedules_marketing_identification_task( user_object_key, s3_object_processor, caplog, ): records = [eyb_lead_user_record_faker()] - object_definition = (user_object_key, compressed_json_faker(records, nested=True)) + object_definition = (user_object_key, compressed_json_faker(records, nest_records=True)) upload_objects_to_s3(s3_object_processor, [object_definition]) with ( diff --git a/datahub/investment_lead/test/test_tasks/test_lead_linking_company_contact.py b/datahub/investment_lead/test/test_tasks/test_lead_linking_company_contact.py index b47b4a172..29618c1b4 100644 --- a/datahub/investment_lead/test/test_tasks/test_lead_linking_company_contact.py +++ b/datahub/investment_lead/test/test_tasks/test_lead_linking_company_contact.py @@ -95,7 +95,7 @@ def test_create_company_and_contact_success( (user_object_key, user_records, user_object_processor), ]: object_definition = ( - object_key, compressed_json_faker(records, nested=True), + object_key, compressed_json_faker(records, nest_records=True), ) upload_objects_to_s3(object_processor, [object_definition]) @@ -156,7 +156,7 @@ def test_linking_existing_company_contact_success( (user_object_key, user_records, user_object_processor), ]: object_definition = ( - object_key, compressed_json_faker(records, nested=True), + object_key, compressed_json_faker(records, nest_records=True), ) upload_objects_to_s3(object_processor, [object_definition])