Skip to content
This repository has been archived by the owner on Nov 7, 2023. It is now read-only.

Handle invalid argument, unavailable ErrorCodes (and refactor OTLP+Arrow receiver tests) #15

Merged
merged 8 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ require (
require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/apache/arrow/go/v10 v10.0.0 // indirect
github.com/apache/arrow/go/v11 v11.0.0-20221108144011-89a65b06a199 // indirect
github.com/apache/arrow/go/v11 v11.0.0-20221116153901-e38141421d43 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/f5/otel-arrow-adapter v0.0.0-20221111033209-0140de5d0a01 // indirect
github.com/f5/otel-arrow-adapter v0.0.0-20221116234146-d8ecadf9751b // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
Expand Down
60 changes: 56 additions & 4 deletions cmd/otelcorecol/go.sum

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions exporter/otlpexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.18

require (
github.com/apache/arrow/go/v10 v10.0.0
github.com/f5/otel-arrow-adapter v0.0.0-20221111033209-0140de5d0a01
github.com/f5/otel-arrow-adapter v0.0.0-20221116234146-d8ecadf9751b
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.8.1
go.opentelemetry.io/collector v0.63.1
Expand All @@ -19,7 +19,8 @@ require (

require (
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/v11 v11.0.0-20221108144011-89a65b06a199 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/apache/arrow/go/v11 v11.0.0-20221116153901-e38141421d43 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
Expand Down
71 changes: 67 additions & 4 deletions exporter/otlpexporter/go.sum

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions exporter/otlphttpexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ require (

require (
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/v11 v11.0.0-20221108144011-89a65b06a199 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/apache/arrow/go/v11 v11.0.0-20221116153901-e38141421d43 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/f5/otel-arrow-adapter v0.0.0-20221111033209-0140de5d0a01 // indirect
github.com/f5/otel-arrow-adapter v0.0.0-20221116234146-d8ecadf9751b // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
71 changes: 67 additions & 4 deletions exporter/otlphttpexporter/go.sum

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ go 1.18
require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/cenkalti/backoff/v4 v4.1.3
github.com/f5/otel-arrow-adapter v0.0.0-20221111033209-0140de5d0a01
github.com/f5/otel-arrow-adapter v0.0.0-20221116234146-d8ecadf9751b
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
github.com/golang/snappy v0.0.4
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/klauspost/compress v1.15.11
github.com/knadh/koanf v1.4.4
Expand Down Expand Up @@ -49,7 +48,8 @@ require (

require (
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/v11 v11.0.0-20221108144011-89a65b06a199 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/apache/arrow/go/v11 v11.0.0-20221116153901-e38141421d43 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
57 changes: 53 additions & 4 deletions go.sum

Large diffs are not rendered by default.

79 changes: 43 additions & 36 deletions receiver/otlpreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"fmt"

arrowpb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1"
batchEvent "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record"
arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -49,14 +49,15 @@ type Receiver struct {
arrowpb.UnimplementedArrowStreamServiceServer

obsrecv *obsreport.Receiver
arrowConsumer *batchEvent.Consumer
arrowConsumer arrowRecord.ConsumerAPI
jmacd marked this conversation as resolved.
Show resolved Hide resolved
}

// New creates a new Receiver reference.
func New(
id config.ComponentID,
cs Consumers,
set component.ReceiverCreateSettings,
ac arrowRecord.ConsumerAPI,
) *Receiver {
obs := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: id,
Expand All @@ -66,7 +67,7 @@ func New(
return &Receiver{
Consumers: cs,
obsrecv: obs,
arrowConsumer: batchEvent.NewConsumer(),
arrowConsumer: ac,
}
}

Expand All @@ -87,23 +88,26 @@ func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStre
return err
}

// Process records:
err = r.processRecords(ctx, req)
if err != nil {
return err
}
// Process records: an error in this code path does
// not necessarily break the stream.
invalid, err := r.processRecords(ctx, req)

// TODO: We are not required to return a Status per
// request, what should the logic be? For now sending
// one status per request received:
// Note: Statuses can be batched: TODO: should we?
resp := &arrowpb.BatchStatus{}
status := &arrowpb.StatusMessage{
BatchId: req.GetBatchId(),
StatusCode: arrowpb.StatusCode_OK,
// TODO: `StatusMessage` has some provisions
// for returning information other than OK w/o
// breaking the stream, and I am not sure what
// those conditions are (e.g., retry suggestions).
BatchId: req.GetBatchId(),
}
if err == nil {
status.StatusCode = arrowpb.StatusCode_OK
} else {
status.StatusCode = arrowpb.StatusCode_ERROR
status.ErrorMessage = err.Error()

if invalid {
status.ErrorCode = arrowpb.ErrorCode_INVALID_ARGUMENT
} else {
status.ErrorCode = arrowpb.ErrorCode_UNAVAILABLE
}
}
resp.Statuses = append(resp.Statuses, status)

Expand All @@ -114,54 +118,57 @@ func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStre
}
}

func (r *Receiver) processRecords(ctx context.Context, records *arrowpb.BatchArrowRecords) error {
// processRecords returns an error and a boolean indicating whether
// the error (true) was from processing the data (i.e., invalid
// argument) or (false) from the consuming pipeline. The boolean is
// not used when success (nil error) is returned.
func (r *Receiver) processRecords(ctx context.Context, records *arrowpb.BatchArrowRecords) (invalid bool, _ error) {
jmacd marked this conversation as resolved.
Show resolved Hide resolved
payloads := records.GetOtlpArrowPayloads()
if len(payloads) == 0 {
return nil
return false, nil
}
// TODO: Use the obsreport object to instrument (somehow)
switch payloads[0].Type {
case arrowpb.OtlpArrowPayloadType_METRICS:
// otlp, err := r.arrowConsumer.MetricsFrom(records)
// if err != nil {
// return err
// }
// for _, logs := range otlp {
// err = r.Metrics().ConsumeMetrics(ctx, logs)
// if err != nil {
// return err
// }
// }
return ErrNoMetricsConsumer
otlp, err := r.arrowConsumer.MetricsFrom(records)
if err != nil {
return true, err
}
for _, logs := range otlp {
jmacd marked this conversation as resolved.
Show resolved Hide resolved
err = r.Metrics().ConsumeMetrics(ctx, logs)
jmacd marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, err
}
}

case arrowpb.OtlpArrowPayloadType_LOGS:
otlp, err := r.arrowConsumer.LogsFrom(records)
if err != nil {
return err
return true, err
}

for _, logs := range otlp {
err = r.Logs().ConsumeLogs(ctx, logs)
if err != nil {
return err
return false, err
}
}

case arrowpb.OtlpArrowPayloadType_SPANS:
otlp, err := r.arrowConsumer.TracesFrom(records)
if err != nil {
return err
return true, err
}

for _, traces := range otlp {
err = r.Traces().ConsumeTraces(ctx, traces)
if err != nil {
return err
return false, err
}
}

default:
return ErrUnrecognizedPayload
return true, ErrUnrecognizedPayload
}
return nil
return false, nil
}
Loading