diff --git a/docs/configuration_transformations_docs_test.go b/docs/configuration_transformations_docs_test.go index 80349fc5..38557cde 100644 --- a/docs/configuration_transformations_docs_test.go +++ b/docs/configuration_transformations_docs_test.go @@ -141,6 +141,10 @@ func testTransformationConfig(t *testing.T, filepath string, fullExample bool) { configObject = &transform.SetPkConfig{} case "spEnrichedToJson": configObject = &transform.EnrichedToJSONConfig{} + case "spCollectorPayloadThriftToJSON": + configObject = &transform.CollectorPayloadThriftToJSONConfig{} + case "spJSONToCollectorPayloadThrift": + configObject = &transform.JSONToCollectorPayloadThriftConfig{} case "js": configObject = &engine.JSEngineConfig{} case "lua": diff --git a/pkg/transform/snowplow_collector_payload_thrift_to_json.go b/pkg/transform/snowplow_collector_payload_thrift_to_json.go new file mode 100644 index 00000000..b4bef34c --- /dev/null +++ b/pkg/transform/snowplow_collector_payload_thrift_to_json.go @@ -0,0 +1,83 @@ +// +// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Snowplow Community License Version 1.0, +// and you may not use this file except in compliance with the Snowplow Community License Version 1.0. +// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + +package transform + +import ( + "context" + "errors" + + "github.com/snowplow/snowbridge/config" + "github.com/snowplow/snowbridge/pkg/models" + + collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload" +) + +// CollectorPayloadThriftToJSONConfig is a configuration object for the spCollectorPayloadThriftToJSON transformation +type CollectorPayloadThriftToJSONConfig struct { +} + +type collectorPayloadThriftToJSONAdapter func(i interface{}) (interface{}, error) + +// Create implements the ComponentCreator interface. +func (f collectorPayloadThriftToJSONAdapter) Create(i interface{}) (interface{}, error) { + return f(i) +} + +// ProvideDefault implements the ComponentConfigurable interface +func (f collectorPayloadThriftToJSONAdapter) ProvideDefault() (interface{}, error) { + // Provide defaults + cfg := &CollectorPayloadThriftToJSONConfig{} + + return cfg, nil +} + +// adapterGenerator returns a spCollectorPayloadThriftToJSON transformation adapter. +func collectorPayloadThriftToJSONAdapterGenerator(f func(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error)) collectorPayloadThriftToJSONAdapter { + return func(i interface{}) (interface{}, error) { + cfg, ok := i.(*CollectorPayloadThriftToJSONConfig) + if !ok { + return nil, errors.New("invalid input, expected collectorPayloadThriftToJSONConfig") + } + + return f(cfg) + } +} + +// collectorPayloadThriftToJSONConfigFunction returns an spCollectorPayloadThriftToJSON transformation function, from an collectorPayloadThriftToJSONConfig. +func collectorPayloadThriftToJSONConfigFunction(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error) { + return SpCollectorPayloadThriftToJSON, nil +} + +// CollectorPayloadThriftToJSONConfigPair is a configuration pair for the spCollectorPayloadThriftToJSON transformation +var CollectorPayloadThriftToJSONConfigPair = config.ConfigurationPair{ + Name: "spCollectorPayloadThriftToJSON", + Handle: collectorPayloadThriftToJSONAdapterGenerator(collectorPayloadThriftToJSONConfigFunction), +} + +// SpCollectorPayloadThriftToJSON is a specific transformation implementation to transform a Thrift encoded Collector Payload +// to a JSON string representation. +func SpCollectorPayloadThriftToJSON(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + ctx := context.Background() + + // Deserialize the Collector Payload to a struct + res, deserializeErr := collectorpayload.BinaryDeserializer(ctx, message.Data) + if deserializeErr != nil { + message.SetError(deserializeErr) + return nil, nil, message, nil + } + + // Re-encode as a JSON string to be able to leverage it downstream + resJSON, jsonErr := collectorpayload.ToJSON(res) + if jsonErr != nil { + message.SetError(jsonErr) + return nil, nil, message, nil + } + + message.Data = resJSON + return message, nil, nil, intermediateState +} diff --git a/pkg/transform/snowplow_json_to_collector_payload_thrift.go b/pkg/transform/snowplow_json_to_collector_payload_thrift.go new file mode 100644 index 00000000..012758ac --- /dev/null +++ b/pkg/transform/snowplow_json_to_collector_payload_thrift.go @@ -0,0 +1,85 @@ +// +// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Snowplow Community License Version 1.0, +// and you may not use this file except in compliance with the Snowplow Community License Version 1.0. +// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + +package transform + +import ( + "context" + "encoding/json" + "errors" + + "github.com/snowplow/snowbridge/config" + "github.com/snowplow/snowbridge/pkg/models" + + collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload" + collectorpayloadmodel1 "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload/gen-go/model1" +) + +// JSONToCollectorPayloadThriftConfig is a configuration object for the spJSONToCollectorPayloadThrift transformation +type JSONToCollectorPayloadThriftConfig struct { +} + +// JSONToCollectorPayloadThriftAdapter is a configuration object for the spJSONToCollectorPayloadThrift transformation +type JSONToCollectorPayloadThriftAdapter func(i interface{}) (interface{}, error) + +// Create implements the ComponentCreator interface. +func (f JSONToCollectorPayloadThriftAdapter) Create(i interface{}) (interface{}, error) { + return f(i) +} + +// ProvideDefault implements the ComponentConfigurable interface +func (f JSONToCollectorPayloadThriftAdapter) ProvideDefault() (interface{}, error) { + // Provide defaults + cfg := &JSONToCollectorPayloadThriftConfig{} + + return cfg, nil +} + +// JSONToCollectorPayloadThriftAdapterGenerator returns a spJSONToCollectorPayloadThrift transformation adapter. +func JSONToCollectorPayloadThriftAdapterGenerator(f func(c *JSONToCollectorPayloadThriftConfig) (TransformationFunction, error)) JSONToCollectorPayloadThriftAdapter { + return func(i interface{}) (interface{}, error) { + cfg, ok := i.(*JSONToCollectorPayloadThriftConfig) + if !ok { + return nil, errors.New("invalid input, expected JSONToCollectorPayloadThriftConfig") + } + + return f(cfg) + } +} + +// JSONToCollectorPayloadThriftConfigFunction returns an spJSONToCollectorPayloadThrift transformation function, from an JSONToCollectorPayloadThriftConfig. +func JSONToCollectorPayloadThriftConfigFunction(c *JSONToCollectorPayloadThriftConfig) (TransformationFunction, error) { + return SpJSONToCollectorPayloadThrift, nil +} + +// JSONToCollectorPayloadThriftConfigPair is a configuration pair for the spJSONToCollectorPayloadThrift transformation +var JSONToCollectorPayloadThriftConfigPair = config.ConfigurationPair{ + Name: "spJSONToCollectorPayloadThrift", + Handle: JSONToCollectorPayloadThriftAdapterGenerator(JSONToCollectorPayloadThriftConfigFunction), +} + +// SpJSONToCollectorPayloadThrift is a specific transformation implementation to transform a raw message into a valid Thrift encoded Collector Payload +// so that it can be pushed directly into the egress stream of a Collector. +func SpJSONToCollectorPayloadThrift(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + var p *collectorpayloadmodel1.CollectorPayload + unmarshallErr := json.Unmarshal(message.Data, &p) + if unmarshallErr != nil { + message.SetError(unmarshallErr) + return nil, nil, message, nil + } + + ctx := context.Background() + + res, serializeErr := collectorpayload.BinarySerializer(ctx, p) + if serializeErr != nil { + message.SetError(serializeErr) + return nil, nil, message, nil + } + + message.Data = res + return message, nil, nil, intermediateState +} diff --git a/pkg/transform/transformconfig/transform_config.go b/pkg/transform/transformconfig/transform_config.go index 7de543bc..e7806c1f 100644 --- a/pkg/transform/transformconfig/transform_config.go +++ b/pkg/transform/transformconfig/transform_config.go @@ -23,6 +23,8 @@ var SupportedTransformations = []config.ConfigurationPair{ filter.ContextFilterConfigPair, transform.SetPkConfigPair, transform.EnrichedToJSONConfigPair, + transform.CollectorPayloadThriftToJSONConfigPair, + transform.JSONToCollectorPayloadThriftConfigPair, engine.LuaConfigPair, engine.JSConfigPair, }