From 7082aeb35536844dccd7b71fd54fec1c7fc64ed8 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 16 Mar 2023 17:31:41 -0700 Subject: [PATCH 1/6] Improve graceful shutdown logging on exporter side --- collector/examples/bridge/edge-collector.yaml | 2 + collector/examples/bridge/saas-collector.yaml | 7 ++- .../otlpexporter/internal/arrow/stream.go | 59 ++++++++++++++----- 3 files changed, 51 insertions(+), 17 deletions(-) diff --git a/collector/examples/bridge/edge-collector.yaml b/collector/examples/bridge/edge-collector.yaml index 99e706bc..da8d2eea 100644 --- a/collector/examples/bridge/edge-collector.yaml +++ b/collector/examples/bridge/edge-collector.yaml @@ -55,3 +55,5 @@ service: telemetry: metrics: address: 127.0.0.1:8888 + logs: + level: debug diff --git a/collector/examples/bridge/saas-collector.yaml b/collector/examples/bridge/saas-collector.yaml index fe206b99..f9fa59ea 100644 --- a/collector/examples/bridge/saas-collector.yaml +++ b/collector/examples/bridge/saas-collector.yaml @@ -11,13 +11,18 @@ receivers: # to the next hop. include_metadata: true + keepalive: + server_parameters: + max_connection_age: 10s + max_connection_age_grace: 10s + # Enable arrow for the bridge. arrow: enabled: true exporters: logging: - verbosity: detailed + verbosity: normal otlphttp: # You can use an HTTP listener on port 5001 to see the headers diff --git a/collector/gen/exporter/otlpexporter/internal/arrow/stream.go b/collector/gen/exporter/otlpexporter/internal/arrow/stream.go index c8cf6b3b..87299ad6 100644 --- a/collector/gen/exporter/otlpexporter/internal/arrow/stream.go +++ b/collector/gen/exporter/otlpexporter/internal/arrow/stream.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "strings" "sync" arrowpb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1" @@ -158,22 +159,48 @@ func (s *Stream) run(bgctx context.Context, client arrowpb.ArrowStreamServiceCli if err != nil { // This branch is reached with an unimplemented status // with or without the WaitForReady flag. - if status, ok := status.FromError(err); ok && status.Code() == codes.Unimplemented { - // This (client == nil) signals the controller - // to downgrade when all streams have returned - // in that status. - // - // TODO: Note there are partial failure modes - // that will continue to function in a - // degraded mode, such as when half of the - // streams are successful and half of streams - // take this return path. Design a graceful - // recovery mechanism? - s.client = nil - s.telemetry.Logger.Info("arrow is not supported", zap.Error(err)) - } else if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { - // TODO: Should we add debug-level logs for EOF and Canceled? - s.telemetry.Logger.Error("arrow recv", zap.Error(err)) + status, ok := status.FromError(err) + + if ok { + switch status.Code() { + case codes.Unimplemented: + // This (client == nil) signals the controller + // to downgrade when all streams have returned + // in that status. + // + // TODO: Note there are partial failure modes + // that will continue to function in a + // degraded mode, such as when half of the + // streams are successful and half of streams + // take this return path. Design a graceful + // recovery mechanism? + s.client = nil + s.telemetry.Logger.Info("arrow is not supported", zap.Error(err)) + + case codes.Unavailable: + // gRPC returns this when max connection age is reached. + // The message string will contain NO_ERROR if it's a + // graceful shutdown. + if strings.Contains(status.Message(), "NO_ERROR") { + s.telemetry.Logger.Debug("arrow stream graceful shutdown") + } else { + s.telemetry.Logger.Error("arrow stream unavailable", zap.Error(err)) + } + default: + s.telemetry.Logger.Error("arrow stream unknown", zap.Error(err), zap.Reflect("details", status.Proto())) + } + } else { + isEOF := errors.Is(err, io.EOF) + isCanceled := errors.Is(err, context.Canceled) + + if !isEOF && !isCanceled { + // TODO: Should we add debug-level logs for EOF and Canceled? + s.telemetry.Logger.Error("arrow recv", zap.Error(err)) + } else if isEOF { + s.telemetry.Logger.Debug("arrow stream eof") + } else if isCanceled { + s.telemetry.Logger.Debug("arrow stream canceled") + } } } From e385c94b8a3d7c1c8ebe7d5a09dabb28cc96506a Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 16 Mar 2023 22:12:07 -0700 Subject: [PATCH 2/6] Improve graceful shutdown on receiver side --- collector/examples/bridge/edge-collector.yaml | 2 +- .../otlpexporter/internal/arrow/stream.go | 66 +++++++++++-------- .../otlpreceiver/internal/arrow/arrow.go | 40 +++++++++-- 3 files changed, 74 insertions(+), 34 deletions(-) diff --git a/collector/examples/bridge/edge-collector.yaml b/collector/examples/bridge/edge-collector.yaml index da8d2eea..1ca44c0f 100644 --- a/collector/examples/bridge/edge-collector.yaml +++ b/collector/examples/bridge/edge-collector.yaml @@ -56,4 +56,4 @@ service: metrics: address: 127.0.0.1:8888 logs: - level: debug + level: info diff --git a/collector/gen/exporter/otlpexporter/internal/arrow/stream.go b/collector/gen/exporter/otlpexporter/internal/arrow/stream.go index 87299ad6..cca318ef 100644 --- a/collector/gen/exporter/otlpexporter/internal/arrow/stream.go +++ b/collector/gen/exporter/otlpexporter/internal/arrow/stream.go @@ -107,6 +107,19 @@ func (s *Stream) setBatchChannel(batchID string, errCh chan error) { s.waiters[batchID] = errCh } +func (s *Stream) logStreamError(err error) { + isEOF := errors.Is(err, io.EOF) + isCanceled := errors.Is(err, context.Canceled) + + if !isEOF && !isCanceled { + s.telemetry.Logger.Error("arrow stream error", zap.Error(err)) + } else if isEOF { + s.telemetry.Logger.Debug("arrow stream end") + } else if isCanceled { + s.telemetry.Logger.Debug("arrow stream canceled") + } +} + // run blocks the calling goroutine while executing stream logic. run // will return when the reader and writer are finished. errors will be logged. func (s *Stream) run(bgctx context.Context, client arrowpb.ArrowStreamServiceClient, grpcOptions []grpc.CallOption) { @@ -145,7 +158,10 @@ func (s *Stream) run(bgctx context.Context, client arrowpb.ArrowStreamServiceCli ww.Add(1) go func() { defer ww.Done() - s.write(ctx) + err := s.write(ctx) + if err != nil { + s.logStreamError(err) + } }() // the result from read() is processed after cancel and wait, @@ -175,32 +191,29 @@ func (s *Stream) run(bgctx context.Context, client arrowpb.ArrowStreamServiceCli // take this return path. Design a graceful // recovery mechanism? s.client = nil - s.telemetry.Logger.Info("arrow is not supported", zap.Error(err)) + s.telemetry.Logger.Info("arrow is not supported", + zap.String("message", status.Message()), + ) case codes.Unavailable: // gRPC returns this when max connection age is reached. // The message string will contain NO_ERROR if it's a // graceful shutdown. if strings.Contains(status.Message(), "NO_ERROR") { - s.telemetry.Logger.Debug("arrow stream graceful shutdown") + s.telemetry.Logger.Debug("arrow stream shutdown") } else { - s.telemetry.Logger.Error("arrow stream unavailable", zap.Error(err)) + s.telemetry.Logger.Error("arrow stream unavailable", + zap.String("message", status.Message()), + ) } default: - s.telemetry.Logger.Error("arrow stream unknown", zap.Error(err), zap.Reflect("details", status.Proto())) + s.telemetry.Logger.Error("arrow stream unknown", + zap.Uint32("code", uint32(status.Code())), + zap.String("message", status.Message()), + ) } } else { - isEOF := errors.Is(err, io.EOF) - isCanceled := errors.Is(err, context.Canceled) - - if !isEOF && !isCanceled { - // TODO: Should we add debug-level logs for EOF and Canceled? - s.telemetry.Logger.Error("arrow recv", zap.Error(err)) - } else if isEOF { - s.telemetry.Logger.Debug("arrow stream eof") - } else if isCanceled { - s.telemetry.Logger.Debug("arrow stream canceled") - } + s.logStreamError(err) } } @@ -215,7 +228,7 @@ func (s *Stream) run(bgctx context.Context, client arrowpb.ArrowStreamServiceCli // write repeatedly places this stream into the next-available queue, then // performs a blocking send(). This returns when the data is in the write buffer, // the caller waiting on its error channel. -func (s *Stream) write(ctx context.Context) { +func (s *Stream) write(ctx context.Context) error { // headers are encoding using hpack, reusing a buffer on each call. var hdrsBuf bytes.Buffer hdrsEnc := hpack.NewEncoder(&hdrsBuf) @@ -235,7 +248,7 @@ func (s *Stream) write(ctx context.Context) { // is a potential sender race since the stream // is currently in the ready set. s.prioritizer.removeReady(s) - return + return ctx.Err() } // Note: For the two return statements below there is no potential // sender race because the stream is not available, as indicated by @@ -245,9 +258,9 @@ func (s *Stream) write(ctx context.Context) { if err != nil { // This is some kind of internal error. We will restart the // stream and mark this record as a permanent one. + err = fmt.Errorf("encode: %w", err) wri.errCh <- consumererror.NewPermanent(err) - s.telemetry.Logger.Error("arrow encode", zap.Error(err)) - return + return err } // Optionally include outgoing metadata, if present. @@ -262,9 +275,9 @@ func (s *Stream) write(ctx context.Context) { // This case is like the encode-failure case // above, we will restart the stream but consider // this a permenent error. + err = fmt.Errorf("hpack: %w", err) wri.errCh <- consumererror.NewPermanent(err) - s.telemetry.Logger.Error("hpack encode", zap.Error(err)) - return + return err } } batch.Headers = hdrsBuf.Bytes() @@ -275,10 +288,8 @@ func (s *Stream) write(ctx context.Context) { if err := s.client.Send(batch); err != nil { // The error will be sent to errCh during cleanup for this stream. - // Note there are common cases like EOF and Canceled that we may - // wish to suppress in the logs if they become a problem. - s.telemetry.Logger.Error("arrow send", zap.Error(err)) - return + // Note: do not wrap this error, it may contain a Status. + return err } } } @@ -292,11 +303,12 @@ func (s *Stream) read(_ context.Context) error { for { resp, err := s.client.Recv() if err != nil { + // Note: do not wrap, contains a Status. return err } if err = s.processBatchStatus(resp.Statuses); err != nil { - return err + return fmt.Errorf("process: %w", err) } } } diff --git a/collector/gen/receiver/otlpreceiver/internal/arrow/arrow.go b/collector/gen/receiver/otlpreceiver/internal/arrow/arrow.go index 43c1ec8b..6b041501 100644 --- a/collector/gen/receiver/otlpreceiver/internal/arrow/arrow.go +++ b/collector/gen/receiver/otlpreceiver/internal/arrow/arrow.go @@ -16,6 +16,7 @@ package arrow // import "github.com/f5/otel-arrow-adapter/collector/gen/receiver import ( "context" + "errors" "fmt" "io" "strings" @@ -24,7 +25,9 @@ import ( arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" "go.uber.org/zap" "golang.org/x/net/http2/hpack" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" @@ -224,6 +227,35 @@ func (h *headerReceiver) newContext(ctx context.Context, hdrs map[string][]strin }) } +// logStreamError decides whether to log an error to the console. +func (r *Receiver) logStreamError(err error) { + status, ok := status.FromError(err) + + if ok { + switch status.Code() { + case codes.Canceled: + r.telemetry.Logger.Debug("arrow stream canceled") + default: + r.telemetry.Logger.Error("arrow stream error", + zap.Uint32("code", uint32(status.Code())), + zap.String("message", status.Message()), + ) + } + return + } + + isEOF := errors.Is(err, io.EOF) + isCanceled := errors.Is(err, context.Canceled) + + if !isEOF && !isCanceled { + r.telemetry.Logger.Error("arrow stream error", zap.Error(err)) + } else if isEOF { + r.telemetry.Logger.Debug("arrow stream end") + } else { + r.telemetry.Logger.Debug("arrow stream canceled") + } +} + func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStreamServer) error { streamCtx := serverStream.Context() ac := r.newConsumer() @@ -241,11 +273,7 @@ func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStre req, err := serverStream.Recv() if err != nil { - if err == io.EOF { - r.telemetry.Logger.Debug("arrow recv eof", zap.Error(err)) - } else { - r.telemetry.Logger.Error("arrow recv error", zap.Error(err)) - } + r.logStreamError(err) return err } @@ -298,7 +326,7 @@ func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStre err = serverStream.Send(resp) if err != nil { - r.telemetry.Logger.Error("arrow send error", zap.Error(err)) + r.logStreamError(err) return err } } From ad8ed395d984cbc4829cb6eab5cd89cdd0c8a78b Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 20 Mar 2023 23:31:21 -0700 Subject: [PATCH 3/6] test at info level --- .../gen/exporter/otlpexporter/internal/arrow/common_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/collector/gen/exporter/otlpexporter/internal/arrow/common_test.go b/collector/gen/exporter/otlpexporter/internal/arrow/common_test.go index 0e9cf7ca..cf7aedb1 100644 --- a/collector/gen/exporter/otlpexporter/internal/arrow/common_test.go +++ b/collector/gen/exporter/otlpexporter/internal/arrow/common_test.go @@ -32,10 +32,10 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" "github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter/internal/arrow/grpcmock" "github.com/f5/otel-arrow-adapter/collector/gen/internal/testdata" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" ) var ( @@ -70,7 +70,7 @@ func newTestTelemetry(t *testing.T, noisy noisyTest) (component.TelemetrySetting if noisy { return telset, nil } - core, obslogs := observer.New(zapcore.DebugLevel) + core, obslogs := observer.New(zapcore.InfoLevel) telset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core())) return telset, obslogs } From da96334f4fee4ec67e09f288396c71c51565c7d3 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 21 Mar 2023 10:14:24 -0700 Subject: [PATCH 4/6] fix receiver tests w/ copyBatch() --- .../otlpreceiver/internal/arrow/arrow_test.go | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/collector/gen/receiver/otlpreceiver/internal/arrow/arrow_test.go b/collector/gen/receiver/otlpreceiver/internal/arrow/arrow_test.go index 577361db..7ef7e2df 100644 --- a/collector/gen/receiver/otlpreceiver/internal/arrow/arrow_test.go +++ b/collector/gen/receiver/otlpreceiver/internal/arrow/arrow_test.go @@ -85,6 +85,7 @@ type commonTestCase struct { consume chan consumeResult streamErr chan error + // testProducer is for convenience -- not thread safe, see copyBatch(). testProducer *arrowRecord.Producer ctxCall *gomock.Call @@ -465,6 +466,8 @@ func TestReceiverConsumeError(t *testing.T) { } require.NoError(t, err) + batch = copyBatch(batch) + ctc.stream.EXPECT().Send(statusUnavailableFor(batch.BatchId, "consumer unhealthy")).Times(1).Return(nil) ctc.start(ctc.newRealConsumer) @@ -523,6 +526,8 @@ func TestReceiverInvalidData(t *testing.T) { } require.NoError(t, err) + batch = copyBatch(batch) + ctc.stream.EXPECT().Send(statusInvalidFor(batch.BatchId, "Permanent error: test invalid error")).Times(1).Return(nil) ctc.start(ctc.newErrorConsumer) @@ -534,6 +539,32 @@ func TestReceiverInvalidData(t *testing.T) { } } +func copyBatch(in *arrowpb.BatchArrowRecords) *arrowpb.BatchArrowRecords { + // Because Arrow-IPC uses zero copy, we have to copy inside the test + // instead of sharing pointers to BatchArrowRecords. + + hcpy := make([]byte, len(in.Headers)) + copy(hcpy, in.Headers) + + pays := make([]*arrowpb.OtlpArrowPayload, len(in.OtlpArrowPayloads)) + + for i, inp := range in.OtlpArrowPayloads { + rcpy := make([]byte, len(inp.Record)) + copy(rcpy, inp.Record) + pays[i] = &arrowpb.OtlpArrowPayload{ + SubStreamId: inp.SubStreamId, + Type: inp.Type, + Record: rcpy, + } + } + + return &arrowpb.BatchArrowRecords{ + BatchId: in.BatchId, + Headers: hcpy, + OtlpArrowPayloads: pays, + } +} + func TestReceiverEOF(t *testing.T) { tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) @@ -556,6 +587,8 @@ func TestReceiverEOF(t *testing.T) { batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) require.NoError(t, err) + batch = copyBatch(batch) + ctc.putBatch(batch, nil) } close(ctc.receive) @@ -611,9 +644,12 @@ func testReceiverHeaders(t *testing.T, includeMeta bool) { for _, md := range expectData { td := testdata.GenerateTraces(2) + batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) require.NoError(t, err) + batch = copyBatch(batch) + if len(md) != 0 { hpb.Reset() for key, vals := range md { @@ -1002,9 +1038,12 @@ func testReceiverAuthHeaders(t *testing.T, includeMeta bool, dataAuth bool) { for _, md := range expectData { td := testdata.GenerateTraces(2) + batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) require.NoError(t, err) + batch = copyBatch(batch) + if len(md) != 0 { hpb.Reset() From 0eb0b0199167b43d2312605f9d70b6618740abcb Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 21 Mar 2023 22:24:22 -0700 Subject: [PATCH 5/6] add copyBatch support in exporter tests for safety --- .../internal/arrow/exporter_test.go | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/collector/gen/exporter/otlpexporter/internal/arrow/exporter_test.go b/collector/gen/exporter/otlpexporter/internal/arrow/exporter_test.go index a106d30e..1d9f173d 100644 --- a/collector/gen/exporter/otlpexporter/internal/arrow/exporter_test.go +++ b/collector/gen/exporter/otlpexporter/internal/arrow/exporter_test.go @@ -91,6 +91,38 @@ func newExporterNoisyTestCase(t *testing.T, numStreams int) *exporterTestCase { return newExporterTestCaseCommon(t, Noisy, numStreams, false, nil) } +func copyBatch[T any](real func(T) (*arrowpb.BatchArrowRecords, error)) func(T) (*arrowpb.BatchArrowRecords, error) { + // Because Arrow-IPC uses zero copy, we have to copy inside the test + // instead of sharing pointers to BatchArrowRecords. + return func(data T) (*arrowpb.BatchArrowRecords, error) { + in, err := real(data) + if err != nil { + return nil, err + } + + hcpy := make([]byte, len(in.Headers)) + copy(hcpy, in.Headers) + + pays := make([]*arrowpb.OtlpArrowPayload, len(in.OtlpArrowPayloads)) + + for i, inp := range in.OtlpArrowPayloads { + rcpy := make([]byte, len(inp.Record)) + copy(rcpy, inp.Record) + pays[i] = &arrowpb.OtlpArrowPayload{ + SubStreamId: inp.SubStreamId, + Type: inp.Type, + Record: rcpy, + } + } + + return &arrowpb.BatchArrowRecords{ + BatchId: in.BatchId, + Headers: hcpy, + OtlpArrowPayloads: pays, + }, nil + } +} + func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, numStreams int, disableDowngrade bool, metadataFunc func(ctx context.Context) (map[string]string, error)) *exporterTestCase { ctc := newCommonTestCase(t, noisy) @@ -108,11 +140,11 @@ func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, numStreams int, di real := arrowRecord.NewProducer() prod.EXPECT().BatchArrowRecordsFromTraces(gomock.Any()).AnyTimes().DoAndReturn( - real.BatchArrowRecordsFromTraces) + copyBatch(real.BatchArrowRecordsFromTraces)) prod.EXPECT().BatchArrowRecordsFromLogs(gomock.Any()).AnyTimes().DoAndReturn( - real.BatchArrowRecordsFromLogs) + copyBatch(real.BatchArrowRecordsFromLogs)) prod.EXPECT().BatchArrowRecordsFromMetrics(gomock.Any()).AnyTimes().DoAndReturn( - real.BatchArrowRecordsFromMetrics) + copyBatch(real.BatchArrowRecordsFromMetrics)) prod.EXPECT().Close().Times(1).Return(nil) return prod }, ctc.serviceClient, ctc.perRPCCredentials) @@ -413,12 +445,12 @@ func TestArrowExporterStreamRace(t *testing.T) { var tries atomic.Int32 tc.streamCall.AnyTimes().DoAndReturn(tc.repeatedNewStream(func() testChannel { - tc := newUnresponsiveTestChannel() + noResponse := newUnresponsiveTestChannel() // Immediately unblock to return the EOF to the stream // receiver and shut down the stream. - go tc.unblock() + go noResponse.unblock() tries.Add(1) - return tc + return noResponse })) var wg sync.WaitGroup From ec0d956f84905bd6f031d675b9ee212a4a77aa0b Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 21 Mar 2023 22:24:44 -0700 Subject: [PATCH 6/6] errors.As->errors.Is fixes data race --- pkg/otel/arrow_record/producer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/otel/arrow_record/producer.go b/pkg/otel/arrow_record/producer.go index 4a61ff72..dd623a75 100644 --- a/pkg/otel/arrow_record/producer.go +++ b/pkg/otel/arrow_record/producer.go @@ -334,7 +334,7 @@ func recordBuilder[T pmetric.Metrics | plog.Logs | ptrace.Traces](builder func() } switch { - case errors.As(err, &schema.ErrSchemaNotUpToDate): + case errors.Is(err, schema.ErrSchemaNotUpToDate): schemaNotUpToDateCount++ if schemaNotUpToDateCount > 5 { panic("Too many consecutive schema updates. This shouldn't happen.")