Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for 'Redshift' shredding format #32

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions snowplow_analytics_sdk/event_transformer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Copyright (c) 2016 Snowplow Analytics Ltd. All rights reserved.
Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved.
This program is licensed to you under the Apache License Version 2.0,
and you may not use this file except in compliance with the Apache License
Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
Expand All @@ -10,7 +10,7 @@
express or implied. See the Apache License Version 2.0 for the specific
language governing permissions and limitations there under.
Authors: Fred Blundun
Copyright: Copyright (c) 2016 Snowplow Analytics Ltd
Copyright: Copyright (c) 2016-2017 Snowplow Analytics Ltd
License: Apache License Version 2.0
"""

Expand All @@ -34,7 +34,8 @@ def convert_bool(key, value):
return [(key, True)]
elif value == '0':
return [(key, False)]
raise SnowplowEventTransformationException(["Invalid value {} for field {}".format(value, key)])
raise SnowplowEventTransformationException(
["Invalid value {} for field {}".format(value, key)])


def convert_double(key, value):
Expand All @@ -45,12 +46,12 @@ def convert_tstamp(key, value):
return [(key, value.replace(' ', 'T') + 'Z')]


def convert_contexts(key, value):
return json_shredder.parse_contexts(value)
def convert_contexts(key, value, shred_format='elasticsearch'):
return json_shredder.parse_contexts(value, shred_format)


def convert_unstruct(key, value):
return json_shredder.parse_unstruct(value)
def convert_unstruct(key, value, shred_format='elasticsearch'):
return json_shredder.parse_unstruct(value, shred_format)


# Ordered list of names of enriched event fields together with the function required to convert them to JSON
Expand Down Expand Up @@ -189,31 +190,41 @@ def convert_unstruct(key, value):
)


def transform(line, known_fields=ENRICHED_EVENT_FIELD_TYPES, add_geolocation_data=True):
def transform(line, known_fields=ENRICHED_EVENT_FIELD_TYPES, add_geolocation_data=True, shred_format='elasticsearch'):
"""
Convert a Snowplow enriched event TSV into a JSON
"""
return jsonify_good_event(line.split('\t'), known_fields, add_geolocation_data)
return jsonify_good_event(line.split('\t'), known_fields, add_geolocation_data, shred_format)


def jsonify_good_event(event, known_fields=ENRICHED_EVENT_FIELD_TYPES, add_geolocation_data=True):
def jsonify_good_event(event, known_fields=ENRICHED_EVENT_FIELD_TYPES, add_geolocation_data=True,
shred_format='elasticsearch'):
"""
Convert a Snowplow enriched event in the form of an array of fields into a JSON
"""
if len(event) != len(known_fields):
raise SnowplowEventTransformationException(
["Expected {} fields, received {} fields.".format(len(known_fields), len(event))]
["Expected {} fields, received {} fields.".format(
len(known_fields), len(event))]
)
else:
output = {}
errors = []
if add_geolocation_data and event[LATITUDE_INDEX] != '' and event[LONGITUDE_INDEX] != '':
output['geo_location'] = event[LATITUDE_INDEX] + ',' + event[LONGITUDE_INDEX]
output['geo_location'] = event[LATITUDE_INDEX] + \
',' + event[LONGITUDE_INDEX]
for i in range(len(event)):
key = known_fields[i][0]
if event[i] != '':
try:
kvpairs = known_fields[i][1](key, event[i])
field = known_fields[i][0]
function = known_fields[i][1]
if field == 'unstruct_event':
kvpairs = function(key, event[i], shred_format)
elif field == 'contexts' or field == 'derived_contexts':
kvpairs = function(key, event[i], shred_format)
else:
kvpairs = function(key, event[i])
for kvpair in kvpairs:
output[kvpair[0]] = kvpair[1]
except SnowplowEventTransformationException as sete:
Expand Down
73 changes: 63 additions & 10 deletions snowplow_analytics_sdk/json_shredder.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,30 @@ def fix_schema(prefix, schema):
"""
Create an Elasticsearch field name from a schema string
"""
vendor, name, format, version = parse_schema(schema, underscore_vendor=True)
model = version.split('-')[0]
if prefix != "":
return "{}_{}_{}_{}".format(prefix, vendor, name, model)
else:
return "{}_{}_{}".format(vendor, name, model)


def parse_schema(schema, underscore_vendor=True):
"""
Parse and clean an individual schema string
"""
schema_dict = extract_schema(schema)
snake_case_organization = schema_dict['vendor'].replace('.', '_').lower()
snake_case_name = re.sub('([^A-Z_])([A-Z])', '\g<1>_\g<2>', schema_dict['name']).lower()
model = schema_dict['version'].split('-')[0]
return "{}_{}_{}_{}".format(prefix, snake_case_organization, snake_case_name, model)
if underscore_vendor:
vendor = schema_dict['vendor'].replace('.', '_').lower()
else:
vendor = schema_dict['vendor'].lower()
name = re.sub('([^A-Z_])([A-Z])', '\g<1>_\g<2>', schema_dict['name']).lower()
format = schema_dict.get('format')
version = schema_dict['version']
return vendor, name, format, version


def parse_contexts(contexts):
def parse_contexts(contexts, shred_format='elasticsearch'):
"""
Convert a contexts JSON to an Elasticsearch-compatible list of key-value pairs
For example, the JSON
Expand Down Expand Up @@ -106,9 +122,24 @@ def parse_contexts(contexts):
my_json = json.loads(contexts)
data = my_json['data']
distinct_contexts = {}

for context in data:
schema = fix_schema("contexts", context['schema'])
inner_data = context['data']
vendor, name, format, version = parse_schema(context['schema'], underscore_vendor=False)
if shred_format == 'redshift':
schema = fix_schema("", context['schema'])
inner_data = {
'data': context['data'],
'schema': {
'vendor': vendor,
'name': name,
'format': format,
'version': version
}
}
else:
schema = fix_schema("contexts", context['schema'])
inner_data = context['data']

if schema not in distinct_contexts:
distinct_contexts[schema] = [inner_data]
else:
Expand All @@ -119,7 +150,7 @@ def parse_contexts(contexts):
return output


def parse_unstruct(unstruct):
def parse_unstruct(unstruct, shred_format='elasticsearch'):
"""
Convert an unstructured event JSON to a list containing one Elasticsearch-compatible key-value pair
For example, the JSON
Expand All @@ -139,6 +170,12 @@ def parse_unstruct(unstruct):
[
(
"unstruct_com_snowplowanalytics_snowplow_link_click_1", {
"schema": {
"vendor": "com.snowploanalytics.snowplow",
"name": "link_click",
"format": "jsonschema",
"version": "1-0-1"
}
"key": "value"
}
)
Expand All @@ -148,8 +185,24 @@ def parse_unstruct(unstruct):
data = my_json['data']
schema = data['schema']
if 'data' in data:
inner_data = data['data']
if shred_format == 'redshift':
inner_data = {}
inner_data['data'] = data['data']
else:
inner_data = data['data']
else:
raise SnowplowEventTransformationException(["Could not extract inner data field from unstructured event"])
fixed_schema = fix_schema("unstruct_event", schema)

vendor, name, format, version = parse_schema(data['schema'], underscore_vendor=False)
if shred_format == 'redshift':
fixed_schema = fix_schema("", schema)
inner_data['schema'] = {
'vendor': vendor,
'name': name,
'format': format,
'version': version
}
elif shred_format == 'elasticsearch':
fixed_schema = fix_schema("unstruct_event", schema)

return [(fixed_schema, inner_data)]
4 changes: 2 additions & 2 deletions tests/test_event_transformer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Copyright (c) 2016 Snowplow Analytics Ltd. All rights reserved.
Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved.
This program is licensed to you under the Apache License Version 2.0,
and you may not use this file except in compliance with the Apache License
Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
Expand All @@ -10,7 +10,7 @@
express or implied. See the Apache License Version 2.0 for the specific
language governing permissions and limitations there under.
Authors: Fred Blundun
Copyright: Copyright (c) 2016 Snowplow Analytics Ltd
Copyright: Copyright (c) 2016-2017 Snowplow Analytics Ltd
License: Apache License Version 2.0
"""

Expand Down
Loading