Skip to content

Commit

Permalink
Document nest_records argument
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverjwroberts committed Jan 22, 2025
1 parent 40423ce commit 5c21d1d
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 11 deletions.
12 changes: 8 additions & 4 deletions datahub/ingest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
from datahub.ingest.boto3 import S3ObjectProcessor


def compressed_json_faker(records: list[dict] = None, nested: bool = True) -> bytes:
"""Serializes and compresses records into a zipped JSON, encoded with UTF-8."""
def compressed_json_faker(records: list[dict] = None, nest_records: bool = True) -> bytes:
"""Serializes and compresses records into a zipped JSON, encoded with UTF-8.
The `nest_records` argument indicates wether records should be nested under
an 'object' key, which is common for those transported by Data Flow pipelines.
"""
json_lines = [
json.dumps({'object': record}, default=str)
if nested else json.dumps(record, default=str)
json.dumps({'object': record}, default=str) if nest_records
else json.dumps(record, default=str)
for record in records
]
compressed_content = gzip.compress('\n'.join(json_lines).encode('utf-8'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def test_ingestion_task_success(
marketing_object_key, s3_object_processor, caplog,
):
records = [eyb_lead_marketing_record_faker()]
object_definition = (marketing_object_key, compressed_json_faker(records, nested=False))
object_definition = (marketing_object_key, compressed_json_faker(records, nest_records=False))
upload_objects_to_s3(s3_object_processor, [object_definition])

with caplog.at_level(logging.INFO):
Expand Down Expand Up @@ -110,7 +110,7 @@ def test_ingestion_task_does_not_update_existing(
'utm_source': 'Advert',
}),
]
object_definition = (marketing_object_key, compressed_json_faker(records, nested=False))
object_definition = (marketing_object_key, compressed_json_faker(records, nest_records=False))
upload_objects_to_s3(s3_object_processor, [object_definition])

eyb_marketing_ingestion_task(marketing_object_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_ingestion_task_schedules_user_identification_task(
triage_object_key, s3_object_processor, caplog,
):
records = [eyb_lead_triage_record_faker()]
object_definition = (triage_object_key, compressed_json_faker(records, nested=True))
object_definition = (triage_object_key, compressed_json_faker(records, nest_records=True))
upload_objects_to_s3(s3_object_processor, [object_definition])

with (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_ingestion_task_triggers_company_linking(
user_object_key, s3_object_processor, caplog,
):
records = [eyb_lead_user_record_faker()]
object_definition = (user_object_key, compressed_json_faker(records, nested=True))
object_definition = (user_object_key, compressed_json_faker(records, nest_records=True))
upload_objects_to_s3(s3_object_processor, [object_definition])

with (
Expand All @@ -102,7 +102,7 @@ def test_ingestion_task_schedules_marketing_identification_task(
user_object_key, s3_object_processor, caplog,
):
records = [eyb_lead_user_record_faker()]
object_definition = (user_object_key, compressed_json_faker(records, nested=True))
object_definition = (user_object_key, compressed_json_faker(records, nest_records=True))
upload_objects_to_s3(s3_object_processor, [object_definition])

with (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_create_company_and_contact_success(
(user_object_key, user_records, user_object_processor),
]:
object_definition = (
object_key, compressed_json_faker(records, nested=True),
object_key, compressed_json_faker(records, nest_records=True),
)
upload_objects_to_s3(object_processor, [object_definition])

Expand Down Expand Up @@ -156,7 +156,7 @@ def test_linking_existing_company_contact_success(
(user_object_key, user_records, user_object_processor),
]:
object_definition = (
object_key, compressed_json_faker(records, nested=True),
object_key, compressed_json_faker(records, nest_records=True),
)
upload_objects_to_s3(object_processor, [object_definition])

Expand Down

0 comments on commit 5c21d1d

Please sign in to comment.