Skip to content

Commit

Permalink
Add JQ filter
Browse files Browse the repository at this point in the history
Adding JQ filter, which allows us to...filter messages based on a configured JQ command. Similar to already existing JQ mapper, but JQ filter requires output of a command to have boolean type.

As there is some shared logic between the mapper and the new filter, I extracted common stuff to `jq_common.go` module.
  • Loading branch information
pondzix committed Oct 9, 2024
1 parent a57b6af commit ff7829b
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 153 deletions.
79 changes: 79 additions & 0 deletions pkg/transform/filter/jq_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package filter

import (
"errors"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"
"github.com/snowplow/snowbridge/pkg/transform"
)

// JQFilterConfig represents the configuration for the JQ transformation
type JQFilterConfig struct {
JQCommand string `hcl:"jq_command"`
RunTimeoutMs int `hcl:"timeout_ms,optional"`
SpMode bool `hcl:"snowplow_mode,optional"`
}

// JQFilterConfigPair is a configuration pair for the jq mapper transformation
var JQFilterConfigPair = config.ConfigurationPair{
Name: "jq_filter",
Handle: jqFilterAdapterGenerator(jqFilterConfigFunction),
}

func jqFilterConfigFunction(cfg *JQFilterConfig) (transform.TransformationFunction, error) {
return transform.GojqTransformationFunction(cfg.JQCommand, cfg.RunTimeoutMs, cfg.SpMode, filterOutput)
}

func jqFilterAdapterGenerator(f func(*JQFilterConfig) (transform.TransformationFunction, error)) jqFilterAdapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*JQFilterConfig)
if !ok {
return nil, errors.New("invalid input, expected JQFilterConfig")
}

return f(cfg)
}
}

// This is where actual filtering is implemented, based on a JQ command output.
func filterOutput(jqOutput transform.JqCommandOutput) transform.TransformationFunction {
return func(message *models.Message, interState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
shouldKeepMessage, isBoolean := jqOutput.(bool)

// maybe crash instead?
if !isBoolean {
message.SetError(errors.New("jq filter doesn't evaluate to boolean value"))
return nil, nil, message, nil
}

if !shouldKeepMessage {
return nil, message, nil, nil
}

return message, nil, nil, interState
}
}

type jqFilterAdapter func(i interface{}) (interface{}, error)

func (f jqFilterAdapter) ProvideDefault() (interface{}, error) {
return &JQFilterConfig{
RunTimeoutMs: 100,
}, nil
}

func (f jqFilterAdapter) Create(i interface{}) (interface{}, error) {
return f(i)
}
136 changes: 136 additions & 0 deletions pkg/transform/filter/jq_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package filter

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/snowplow/snowbridge/pkg/models"
"github.com/snowplow/snowbridge/pkg/transform"
)

func TestJQFilter_SpMode_true_keep(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowTsv1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `has("app_id")`, RunTimeoutMs: 100, SpMode: true}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)
assert.Empty(dropped)
assert.Empty(invalid)
assert.Equal(string(transform.SnowplowTsv1), string(kept.Data))
}

func TestJQFilter_SpMode_true_drop(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowTsv1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `has("non_existent_key")`, RunTimeoutMs: 100, SpMode: true}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)
assert.Empty(kept)
assert.Empty(invalid)
assert.Equal(string(transform.SnowplowTsv1), string(dropped.Data))
}

func TestJQFilter_SpMode_false_keep(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowJSON1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `has("app_id")`, RunTimeoutMs: 100, SpMode: false}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)
assert.Empty(dropped)
assert.Empty(invalid)
assert.Equal(string(transform.SnowplowJSON1), string(kept.Data))
}

func TestJQFilter_SpMode_false_drop(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowJSON1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `has("non_existent_key")`, RunTimeoutMs: 100, SpMode: false}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)
assert.Empty(kept)
assert.Empty(invalid)
assert.Equal(string(transform.SnowplowJSON1), string(dropped.Data))
}

func TestJQFilter_epoch(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowTsv1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `.collector_tstamp | epoch | . < 10`, RunTimeoutMs: 100, SpMode: true}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)
assert.Empty(kept)
assert.Empty(invalid)
assert.Equal(string(transform.SnowplowTsv1), string(dropped.Data))
}

func TestJQFilter_non_boolean_output(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowTsv1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `.collector_tstamp | epoch`, RunTimeoutMs: 100, SpMode: true}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)

assert.Empty(kept)
assert.Empty(dropped)
assert.Equal("jq filter doesn't evaluate to boolean value", invalid.GetError().Error())
}

func TestJQFilter_invalid_jq_command(t *testing.T) {
assert := assert.New(t)

config := &JQFilterConfig{JQCommand: `blabla`, RunTimeoutMs: 100, SpMode: true}
filter, err := jqFilterConfigFunction(config)

assert.Nil(filter)
assert.Equal("error compiling jq query: function not defined: blabla/0", err.Error())
}

func createFilter(t *testing.T, config *JQFilterConfig) transform.TransformationFunction {
filter, err := jqFilterConfigFunction(config)
if err != nil {
t.Fatalf("failed to create transformation function with error: %q", err.Error())
}
return filter
}
Loading

0 comments on commit ff7829b

Please sign in to comment.