Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collector: improve logging of graceful stream shutdown #118

Merged
merged 8 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions collector/examples/bridge/edge-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,5 @@ service:
telemetry:
metrics:
address: 127.0.0.1:8888
logs:
level: info
7 changes: 6 additions & 1 deletion collector/examples/bridge/saas-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ receivers:
# to the next hop.
include_metadata: true

keepalive:
server_parameters:
max_connection_age: 10s
max_connection_age_grace: 10s

# Enable arrow for the bridge.
arrow:
enabled: true

exporters:
logging:
verbosity: detailed
verbosity: normal

otlphttp:
# You can use an HTTP listener on port 5001 to see the headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"github.com/f5/otel-arrow-adapter/collector/gen/exporter/otlpexporter/internal/arrow/grpcmock"
"github.com/f5/otel-arrow-adapter/collector/gen/internal/testdata"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
)

var (
Expand Down Expand Up @@ -70,7 +70,7 @@ func newTestTelemetry(t *testing.T, noisy noisyTest) (component.TelemetrySetting
if noisy {
return telset, nil
}
core, obslogs := observer.New(zapcore.DebugLevel)
core, obslogs := observer.New(zapcore.InfoLevel)
telset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core()))
return telset, obslogs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,38 @@ func newExporterNoisyTestCase(t *testing.T, numStreams int) *exporterTestCase {
return newExporterTestCaseCommon(t, Noisy, numStreams, false, nil)
}

func copyBatch[T any](real func(T) (*arrowpb.BatchArrowRecords, error)) func(T) (*arrowpb.BatchArrowRecords, error) {
// Because Arrow-IPC uses zero copy, we have to copy inside the test
// instead of sharing pointers to BatchArrowRecords.
return func(data T) (*arrowpb.BatchArrowRecords, error) {
in, err := real(data)
if err != nil {
return nil, err
}

hcpy := make([]byte, len(in.Headers))
copy(hcpy, in.Headers)

pays := make([]*arrowpb.OtlpArrowPayload, len(in.OtlpArrowPayloads))

for i, inp := range in.OtlpArrowPayloads {
rcpy := make([]byte, len(inp.Record))
copy(rcpy, inp.Record)
pays[i] = &arrowpb.OtlpArrowPayload{
SubStreamId: inp.SubStreamId,
Type: inp.Type,
Record: rcpy,
}
}

return &arrowpb.BatchArrowRecords{
BatchId: in.BatchId,
Headers: hcpy,
OtlpArrowPayloads: pays,
}, nil
}
}

func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, numStreams int, disableDowngrade bool, metadataFunc func(ctx context.Context) (map[string]string, error)) *exporterTestCase {
ctc := newCommonTestCase(t, noisy)

Expand All @@ -108,11 +140,11 @@ func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, numStreams int, di
real := arrowRecord.NewProducer()

prod.EXPECT().BatchArrowRecordsFromTraces(gomock.Any()).AnyTimes().DoAndReturn(
real.BatchArrowRecordsFromTraces)
copyBatch(real.BatchArrowRecordsFromTraces))
prod.EXPECT().BatchArrowRecordsFromLogs(gomock.Any()).AnyTimes().DoAndReturn(
real.BatchArrowRecordsFromLogs)
copyBatch(real.BatchArrowRecordsFromLogs))
prod.EXPECT().BatchArrowRecordsFromMetrics(gomock.Any()).AnyTimes().DoAndReturn(
real.BatchArrowRecordsFromMetrics)
copyBatch(real.BatchArrowRecordsFromMetrics))
prod.EXPECT().Close().Times(1).Return(nil)
return prod
}, ctc.serviceClient, ctc.perRPCCredentials)
Expand Down Expand Up @@ -413,12 +445,12 @@ func TestArrowExporterStreamRace(t *testing.T) {
var tries atomic.Int32

tc.streamCall.AnyTimes().DoAndReturn(tc.repeatedNewStream(func() testChannel {
tc := newUnresponsiveTestChannel()
noResponse := newUnresponsiveTestChannel()
// Immediately unblock to return the EOF to the stream
// receiver and shut down the stream.
go tc.unblock()
go noResponse.unblock()
tries.Add(1)
return tc
return noResponse
}))

var wg sync.WaitGroup
Expand Down
95 changes: 67 additions & 28 deletions collector/gen/exporter/otlpexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"strings"
"sync"

arrowpb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1"
Expand Down Expand Up @@ -106,6 +107,19 @@ func (s *Stream) setBatchChannel(batchID string, errCh chan error) {
s.waiters[batchID] = errCh
}

func (s *Stream) logStreamError(err error) {
isEOF := errors.Is(err, io.EOF)
isCanceled := errors.Is(err, context.Canceled)

if !isEOF && !isCanceled {
s.telemetry.Logger.Error("arrow stream error", zap.Error(err))
} else if isEOF {
s.telemetry.Logger.Debug("arrow stream end")
} else if isCanceled {
s.telemetry.Logger.Debug("arrow stream canceled")
}
}

// run blocks the calling goroutine while executing stream logic. run
// will return when the reader and writer are finished. errors will be logged.
func (s *Stream) run(bgctx context.Context, client arrowpb.ArrowStreamServiceClient, grpcOptions []grpc.CallOption) {
Expand Down Expand Up @@ -144,7 +158,10 @@ func (s *Stream) run(bgctx context.Context, client arrowpb.ArrowStreamServiceCli
ww.Add(1)
go func() {
defer ww.Done()
s.write(ctx)
err := s.write(ctx)
if err != nil {
s.logStreamError(err)
}
}()

// the result from read() is processed after cancel and wait,
Expand All @@ -158,22 +175,45 @@ func (s *Stream) run(bgctx context.Context, client arrowpb.ArrowStreamServiceCli
if err != nil {
// This branch is reached with an unimplemented status
// with or without the WaitForReady flag.
if status, ok := status.FromError(err); ok && status.Code() == codes.Unimplemented {
// This (client == nil) signals the controller
// to downgrade when all streams have returned
// in that status.
//
// TODO: Note there are partial failure modes
// that will continue to function in a
// degraded mode, such as when half of the
// streams are successful and half of streams
// take this return path. Design a graceful
// recovery mechanism?
s.client = nil
s.telemetry.Logger.Info("arrow is not supported", zap.Error(err))
} else if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
// TODO: Should we add debug-level logs for EOF and Canceled?
s.telemetry.Logger.Error("arrow recv", zap.Error(err))
status, ok := status.FromError(err)

if ok {
switch status.Code() {
case codes.Unimplemented:
// This (client == nil) signals the controller
// to downgrade when all streams have returned
// in that status.
//
// TODO: Note there are partial failure modes
// that will continue to function in a
// degraded mode, such as when half of the
// streams are successful and half of streams
// take this return path. Design a graceful
// recovery mechanism?
s.client = nil
s.telemetry.Logger.Info("arrow is not supported",
zap.String("message", status.Message()),
)

case codes.Unavailable:
// gRPC returns this when max connection age is reached.
// The message string will contain NO_ERROR if it's a
// graceful shutdown.
if strings.Contains(status.Message(), "NO_ERROR") {
jmacd marked this conversation as resolved.
Show resolved Hide resolved
s.telemetry.Logger.Debug("arrow stream shutdown")
} else {
s.telemetry.Logger.Error("arrow stream unavailable",
zap.String("message", status.Message()),
)
}
default:
s.telemetry.Logger.Error("arrow stream unknown",
zap.Uint32("code", uint32(status.Code())),
zap.String("message", status.Message()),
)
}
} else {
s.logStreamError(err)
}
}

Expand All @@ -188,7 +228,7 @@ func (s *Stream) run(bgctx context.Context, client arrowpb.ArrowStreamServiceCli
// write repeatedly places this stream into the next-available queue, then
// performs a blocking send(). This returns when the data is in the write buffer,
// the caller waiting on its error channel.
func (s *Stream) write(ctx context.Context) {
func (s *Stream) write(ctx context.Context) error {
// headers are encoding using hpack, reusing a buffer on each call.
var hdrsBuf bytes.Buffer
hdrsEnc := hpack.NewEncoder(&hdrsBuf)
Expand All @@ -208,7 +248,7 @@ func (s *Stream) write(ctx context.Context) {
// is a potential sender race since the stream
// is currently in the ready set.
s.prioritizer.removeReady(s)
return
return ctx.Err()
}
// Note: For the two return statements below there is no potential
// sender race because the stream is not available, as indicated by
Expand All @@ -218,9 +258,9 @@ func (s *Stream) write(ctx context.Context) {
if err != nil {
// This is some kind of internal error. We will restart the
// stream and mark this record as a permanent one.
err = fmt.Errorf("encode: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
s.telemetry.Logger.Error("arrow encode", zap.Error(err))
return
return err
}

// Optionally include outgoing metadata, if present.
Expand All @@ -235,9 +275,9 @@ func (s *Stream) write(ctx context.Context) {
// This case is like the encode-failure case
// above, we will restart the stream but consider
// this a permenent error.
err = fmt.Errorf("hpack: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
s.telemetry.Logger.Error("hpack encode", zap.Error(err))
return
return err
}
}
batch.Headers = hdrsBuf.Bytes()
Expand All @@ -248,10 +288,8 @@ func (s *Stream) write(ctx context.Context) {

if err := s.client.Send(batch); err != nil {
// The error will be sent to errCh during cleanup for this stream.
// Note there are common cases like EOF and Canceled that we may
// wish to suppress in the logs if they become a problem.
s.telemetry.Logger.Error("arrow send", zap.Error(err))
return
// Note: do not wrap this error, it may contain a Status.
return err
}
}
}
Expand All @@ -265,11 +303,12 @@ func (s *Stream) read(_ context.Context) error {
for {
resp, err := s.client.Recv()
if err != nil {
// Note: do not wrap, contains a Status.
return err
}

if err = s.processBatchStatus(resp.Statuses); err != nil {
return err
return fmt.Errorf("process: %w", err)
}
}
}
Expand Down
40 changes: 34 additions & 6 deletions collector/gen/receiver/otlpreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package arrow // import "github.com/f5/otel-arrow-adapter/collector/gen/receiver

import (
"context"
"errors"
"fmt"
"io"
"strings"
Expand All @@ -24,7 +25,9 @@ import (
arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record"
"go.uber.org/zap"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -224,6 +227,35 @@ func (h *headerReceiver) newContext(ctx context.Context, hdrs map[string][]strin
})
}

// logStreamError decides whether to log an error to the console.
func (r *Receiver) logStreamError(err error) {
status, ok := status.FromError(err)

if ok {
switch status.Code() {
case codes.Canceled:
r.telemetry.Logger.Debug("arrow stream canceled")
default:
r.telemetry.Logger.Error("arrow stream error",
zap.Uint32("code", uint32(status.Code())),
zap.String("message", status.Message()),
)
}
return
}

isEOF := errors.Is(err, io.EOF)
isCanceled := errors.Is(err, context.Canceled)

if !isEOF && !isCanceled {
r.telemetry.Logger.Error("arrow stream error", zap.Error(err))
} else if isEOF {
r.telemetry.Logger.Debug("arrow stream end")
} else {
r.telemetry.Logger.Debug("arrow stream canceled")
}
}

func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStreamServer) error {
streamCtx := serverStream.Context()
ac := r.newConsumer()
Expand All @@ -241,11 +273,7 @@ func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStre
req, err := serverStream.Recv()

if err != nil {
if err == io.EOF {
r.telemetry.Logger.Debug("arrow recv eof", zap.Error(err))
} else {
r.telemetry.Logger.Error("arrow recv error", zap.Error(err))
}
r.logStreamError(err)
return err
}

Expand Down Expand Up @@ -298,7 +326,7 @@ func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStre

err = serverStream.Send(resp)
if err != nil {
r.telemetry.Logger.Error("arrow send error", zap.Error(err))
r.logStreamError(err)
return err
}
}
Expand Down
Loading