Skip to content

Commit

Permalink
Feat: query service: logs pipelines timestamp parsing processor (#4105)
Browse files Browse the repository at this point in the history
* chore: relocate tests for trace and grok parsing processor

* chore: add test for timestamp parsing processor

* feat: update PipelineOperator model for time parser fields and get tests passing

* chore: add test cases for validating time parser fails silently on mismatched logs

* chore: add helper for generating regex for strptime layouts

* feat: time_parser ignore logs whose parseFrom value doesn't match strptime layout

* chore: escape regex special chars if any in the layout string before preparing regex

* chore: add operator.If on time_parser when using layout type epoch

* chore: finish up with operator.If on time_parser for  layout type

* chore: postable pipeline validation for time parser

* chore: some cleanup

* chore: some more cleanup

* chore: add validation for strptime layouts in postable pipelines

---------

Co-authored-by: Srikanth Chekuri <[email protected]>
  • Loading branch information
raj-k-singh and srikanthccv authored Nov 29, 2023
1 parent 1f0fdfd commit 1b6b3c2
Show file tree
Hide file tree
Showing 8 changed files with 463 additions and 13 deletions.
13 changes: 8 additions & 5 deletions pkg/query-service/app/logparsingpipeline/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ type PipelineOperator struct {
Name string `json:"name,omitempty" yaml:"-"`

// optional keys depending on the type
ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"`
Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"`
Regex string `json:"regex,omitempty" yaml:"regex,omitempty"`
ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"`
Timestamp *TimestampParser `json:"timestamp,omitempty" yaml:"timestamp,omitempty"`
ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"`
Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"`
Regex string `json:"regex,omitempty" yaml:"regex,omitempty"`
ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"`
*TraceParser `yaml:",inline,omitempty"`
Field string `json:"field,omitempty" yaml:"field,omitempty"`
Value string `json:"value,omitempty" yaml:"value,omitempty"`
Expand All @@ -63,6 +62,10 @@ type PipelineOperator struct {
Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"`
Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"`
Default string `json:"default,omitempty" yaml:"default,omitempty"`

// time_parser fields.
Layout string `json:"layout,omitempty" yaml:"layout,omitempty"`
LayoutType string `json:"layout_type,omitempty" yaml:"layout_type,omitempty"`
}

type TimestampParser struct {
Expand Down
38 changes: 35 additions & 3 deletions pkg/query-service/app/logparsingpipeline/pipelineBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
continue
}

operators := getOperators(v.Config)
operators, err := getOperators(v.Config)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to prepare operators")
}

if len(operators) == 0 {
continue
}
Expand Down Expand Up @@ -68,7 +72,7 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
return processors, names, nil
}

func getOperators(ops []PipelineOperator) []PipelineOperator {
func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) {
filteredOp := []PipelineOperator{}
for i, operator := range ops {
if operator.Enabled {
Expand Down Expand Up @@ -106,14 +110,42 @@ func getOperators(ops []PipelineOperator) []PipelineOperator {

} else if operator.Type == "trace_parser" {
cleanTraceParser(&operator)

} else if operator.Type == "time_parser" {
parseFromParts := strings.Split(operator.ParseFrom, ".")
parseFromPath := strings.Join(parseFromParts, "?.")

operator.If = fmt.Sprintf(`%s != nil`, parseFromPath)

if operator.LayoutType == "strptime" {
regex, err := RegexForStrptimeLayout(operator.Layout)
if err != nil {
return nil, fmt.Errorf("could not generate time_parser processor: %w", err)
}

operator.If = fmt.Sprintf(
`%s && %s matches "%s"`, operator.If, parseFromPath, regex,
)
} else if operator.LayoutType == "epoch" {
valueRegex := `^\\s*[0-9]+\\s*$`
if strings.Contains(operator.Layout, ".") {
valueRegex = `^\\s*[0-9]+\\.[0-9]+\\s*$`
}

operator.If = fmt.Sprintf(
`%s && string(%s) matches "%s"`, operator.If, parseFromPath, valueRegex,
)

}
// TODO(Raj): Maybe add support for gotime too eventually
}

filteredOp = append(filteredOp, operator)
} else if i == len(ops)-1 && len(filteredOp) != 0 {
filteredOp[len(filteredOp)-1].Output = ""
}
}
return filteredOp
return filteredOp, nil
}

func cleanTraceParser(operator *PipelineOperator) {
Expand Down
80 changes: 77 additions & 3 deletions pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logparsingpipeline

import (
"context"
"fmt"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -198,7 +199,8 @@ var prepareProcessorTestData = []struct {
func TestPreparePipelineProcessor(t *testing.T) {
for _, test := range prepareProcessorTestData {
Convey(test.Name, t, func() {
res := getOperators(test.Operators)
res, err := getOperators(test.Operators)
So(err, ShouldBeNil)
So(res, ShouldResemble, test.Output)
})
}
Expand Down Expand Up @@ -256,11 +258,13 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) {
}
}

testCases := []struct {
type pipelineTestCase struct {
Name string
Operator PipelineOperator
NonMatchingLog model.SignozLog
}{
}

testCases := []pipelineTestCase{
{
"regex processor should ignore log with missing field",
PipelineOperator{
Expand Down Expand Up @@ -342,12 +346,82 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) {
Field: "attributes.test",
},
makeTestLog("mismatching log", map[string]string{}),
}, {
"time parser should ignore logs with missing field.",
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "strptime",
Layout: "%Y-%m-%dT%H:%M:%S.%f%z",
},
makeTestLog("mismatching log", map[string]string{}),
}, {
"time parser should ignore logs timestamp values that don't contain expected strptime layout.",
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "strptime",
Layout: "%Y-%m-%dT%H:%M:%S.%f%z",
},
makeTestLog("mismatching log", map[string]string{
"test_timestamp": "2023-11-27T12:03:28A239907+0530",
}),
}, {
"time parser should ignore logs timestamp values that don't contain an epoch",
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: "s",
},
makeTestLog("mismatching log", map[string]string{
"test_timestamp": "not-an-epoch",
}),
},
// TODO(Raj): see if there is an error scenario for grok parser.
// TODO(Raj): see if there is an error scenario for trace parser.
// TODO(Raj): see if there is an error scenario for Add operator.
}

// Some more timeparser test cases
epochLayouts := []string{"s", "ms", "us", "ns", "s.ms", "s.us", "s.ns"}
epochTestValues := []string{
"1136214245", "1136214245123", "1136214245123456",
"1136214245123456789", "1136214245.123",
"1136214245.123456", "1136214245.123456789",
}
for _, epochLayout := range epochLayouts {
for _, testValue := range epochTestValues {
testCases = append(testCases, pipelineTestCase{
fmt.Sprintf(
"time parser should ignore log with timestamp value %s that doesn't match layout type %s",
testValue, epochLayout,
),
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: epochLayout,
},
makeTestLog("mismatching log", map[string]string{
"test_timestamp": testValue,
}),
})
}
}

for _, testCase := range testCases {
testPipelines := []Pipeline{makeTestPipeline([]PipelineOperator{testCase.Operator})}

Expand Down
34 changes: 34 additions & 0 deletions pkg/query-service/app/logparsingpipeline/postablePipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
"golang.org/x/exp/slices"
)

// PostablePipelines are a list of user defined pielines
Expand Down Expand Up @@ -164,6 +165,39 @@ func isValidOperator(op PipelineOperator) error {
if len(op.Fields) == 0 {
return fmt.Errorf(fmt.Sprintf("fields of %s retain operator cannot be empty", op.ID))
}

case "time_parser":
if op.ParseFrom == "" {
return fmt.Errorf("parse from of time parsing processor %s cannot be empty", op.ID)
}
if op.LayoutType != "epoch" && op.LayoutType != "strptime" {
// TODO(Raj): Maybe add support for gotime format
return fmt.Errorf(
"invalid format type '%s' of time parsing processor %s", op.LayoutType, op.ID,
)
}
if op.Layout == "" {
return fmt.Errorf(fmt.Sprintf("format can not be empty for time parsing processor %s", op.ID))
}

validEpochLayouts := []string{"s", "ms", "us", "ns", "s.ms", "s.us", "s.ns"}
if op.LayoutType == "epoch" && !slices.Contains(validEpochLayouts, op.Layout) {
return fmt.Errorf(
"invalid epoch format '%s' of time parsing processor %s", op.LayoutType, op.ID,
)
}

// TODO(Raj): Add validation for strptime layouts via
// collector simulator maybe.
if op.LayoutType == "strptime" {
_, err := RegexForStrptimeLayout(op.Layout)
if err != nil {
return fmt.Errorf(
"invalid strptime format '%s' of time parsing processor %s: %w", op.LayoutType, op.ID, err,
)
}
}

default:
return fmt.Errorf(fmt.Sprintf("operator type %s not supported for %s, use one of (grok_parser, regex_parser, copy, move, add, remove, trace_parser, retain)", op.Type, op.ID))
}
Expand Down
51 changes: 51 additions & 0 deletions pkg/query-service/app/logparsingpipeline/postablePipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,57 @@ var operatorTest = []struct {
},
},
IsValid: false,
}, {
Name: "Timestamp Parser - valid",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: "s",
},
IsValid: true,
}, {
Name: "Timestamp Parser - invalid - bad parsefrom attribute",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "timestamp",
LayoutType: "epoch",
Layout: "s",
},
IsValid: false,
}, {
Name: "Timestamp Parser - unsupported layout_type",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
// TODO(Raj): Maybe add support for gotime format
LayoutType: "gotime",
Layout: "Mon Jan 2 15:04:05 -0700 MST 2006",
},
IsValid: false,
}, {
Name: "Timestamp Parser - invalid epoch layout",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: "%Y-%m-%d",
},
IsValid: false,
}, {
Name: "Timestamp Parser - invalid strptime layout",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "strptime",
Layout: "%U",
},
IsValid: false,
},
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/query-service/app/logparsingpipeline/preview_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestPipelinePreview(t *testing.T) {

}

func TestGrokParsingPreview(t *testing.T) {
func TestGrokParsingProcessor(t *testing.T) {
require := require.New(t)

testPipelines := []Pipeline{
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestGrokParsingPreview(t *testing.T) {
require.Equal("route/server.go:71", processed.Attributes_string["location"])
}

func TestTraceParsingPreview(t *testing.T) {
func TestTraceParsingProcessor(t *testing.T) {
require := require.New(t)

testPipelines := []Pipeline{
Expand Down
Loading

0 comments on commit 1b6b3c2

Please sign in to comment.