From 2a18e85ae2ac6ad6a86b82df23cebae17d397784 Mon Sep 17 00:00:00 2001 From: Alexandre Thenorio Date: Tue, 6 Aug 2024 14:57:34 +0200 Subject: [PATCH] fix!: shutdown otel resource before exiting relates to [PE-963](https://einride.atlassian.net/browse/PE-963) BREAKING CHANGE: removed SHUTDOWNDELAY feature --- README.md | 1 - cloudotel/metricexporter.go | 28 +++++++++++++++------------- cloudotel/traceexporter.go | 21 ++++++++------------- cloudtrace/exporter.go | 2 +- go.mod | 2 ++ go.sum | 4 ++-- run.go | 36 +++++++++++++++++------------------- run_test.go | 22 ---------------------- 8 files changed, 45 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index 009f643f..baf281bf 100644 --- a/README.md +++ b/README.md @@ -124,7 +124,6 @@ cloudrunner CLIENT_RETRY_RETRYABLESTATUSCODES []codes.Code cloudrunner REQUESTLOGGER_MESSAGESIZELIMIT int 1024 cloudrunner REQUESTLOGGER_CODETOLEVEL map[codes.Code]zapcore.Level cloudrunner REQUESTLOGGER_STATUSTOLEVEL map[int]zapcore.Level -cloudrunner SHUTDOWNDELAY time.Duration Build-time configuration of grpc-server: diff --git a/cloudotel/metricexporter.go b/cloudotel/metricexporter.go index 73c54f69..361b223b 100644 --- a/cloudotel/metricexporter.go +++ b/cloudotel/metricexporter.go @@ -2,13 +2,13 @@ package cloudotel import ( "context" + "errors" "fmt" "strings" "time" metricexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric" "go.einride.tech/cloudrunner/cloudruntime" - "go.einride.tech/cloudrunner/cloudzap" hostinstrumentation "go.opentelemetry.io/contrib/instrumentation/host" runtimeinstrumentation "go.opentelemetry.io/contrib/instrumentation/runtime" "go.opentelemetry.io/otel" @@ -18,7 +18,6 @@ import ( sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" - "go.uber.org/zap" ) // MetricExporterConfig configures the metrics exporter. @@ -35,9 +34,9 @@ func StartMetricExporter( ctx context.Context, exporterConfig MetricExporterConfig, resource *resource.Resource, -) (func(), error) { +) (func(context.Context) error, error) { if !exporterConfig.Enabled { - return func() {}, nil + return func(context.Context) error { return nil }, nil } projectID, ok := cloudruntime.ResolveProjectID(ctx) if !ok { @@ -81,23 +80,26 @@ func StartMetricExporter( ), ) otel.SetMeterProvider(provider) - shutdown := func() { - if err := provider.Shutdown(context.Background()); err != nil { - if logger, ok := cloudzap.GetLogger(ctx); ok { - logger.Warn("error stopping metric provider, final metric export might have failed", zap.Error(err)) - } + shutdown := func(ctx context.Context) error { + if err := provider.Shutdown(ctx); err != nil { + return fmt.Errorf("error stopping metric provider, final metric export might have failed: %v", err) } + return nil } if exporterConfig.RuntimeInstrumentation { if err := runtimeinstrumentation.Start(); err != nil { - shutdown() - return nil, fmt.Errorf("start metric exporter: start runtime instrumentation: %w", err) + return nil, errors.Join( + shutdown(ctx), + fmt.Errorf("start metric exporter: start runtime instrumentation: %w", err), + ) } } if exporterConfig.HostInstrumentation { if err := hostinstrumentation.Start(); err != nil { - shutdown() - return nil, fmt.Errorf("start metric exporter: start host instrumentation: %w", err) + return nil, errors.Join( + shutdown(ctx), + fmt.Errorf("start metric exporter: start host instrumentation: %w", err), + ) } } return shutdown, nil diff --git a/cloudotel/traceexporter.go b/cloudotel/traceexporter.go index 5e09e6d3..c2802267 100644 --- a/cloudotel/traceexporter.go +++ b/cloudotel/traceexporter.go @@ -8,13 +8,11 @@ import ( traceexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace" gcppropagator "github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator" "go.einride.tech/cloudrunner/cloudruntime" - "go.einride.tech/cloudrunner/cloudzap" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/bridge/opencensus" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.uber.org/zap" ) // TraceExporterConfig configures the trace exporter. @@ -29,18 +27,14 @@ func StartTraceExporter( ctx context.Context, exporterConfig TraceExporterConfig, resource *resource.Resource, -) (func(), error) { +) (func(context.Context) error, error) { if !exporterConfig.Enabled { - return func() {}, nil + return func(context.Context) error { return nil }, nil } projectID, ok := cloudruntime.ResolveProjectID(ctx) if !ok { return nil, fmt.Errorf("start trace exporter: unknown project ID") } - logger, ok := cloudzap.GetLogger(ctx) - if !ok { - return nil, fmt.Errorf("start trace exporter: no logger in context") - } exporter, err := traceexporter.New( traceexporter.WithProjectID(projectID), traceexporter.WithTimeout(exporterConfig.Timeout), @@ -62,13 +56,14 @@ func StartTraceExporter( )) opencensus.InstallTraceBridge() - cleanup := func() { - if err := tracerProvider.ForceFlush(context.Background()); err != nil { - logger.Error("error shutting down trace exporter", zap.Error(err)) + cleanup := func(ctx context.Context) error { + if err := tracerProvider.ForceFlush(ctx); err != nil { + return fmt.Errorf("error shutting down trace exporter: %v", err) } - if err := tracerProvider.Shutdown(context.Background()); err != nil { - logger.Error("error shutting down trace exporter", zap.Error(err)) + if err := tracerProvider.Shutdown(ctx); err != nil { + return fmt.Errorf("error shutting down trace exporter: %v", err) } + return nil } return cleanup, nil } diff --git a/cloudtrace/exporter.go b/cloudtrace/exporter.go index 70472161..91dea6a5 100644 --- a/cloudtrace/exporter.go +++ b/cloudtrace/exporter.go @@ -16,6 +16,6 @@ func StartExporter( ctx context.Context, exporterConfig ExporterConfig, resource *resource.Resource, -) (func(), error) { +) (func(context.Context) error, error) { return cloudotel.StartTraceExporter(ctx, exporterConfig, resource) } diff --git a/go.mod b/go.mod index c273b4ac..2fe4c374 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,8 @@ require ( gotest.tools/v3 v3.5.1 ) +require github.com/rogpeppe/go-internal v1.12.0 // indirect + require ( cloud.google.com/go v0.115.0 // indirect cloud.google.com/go/auth v0.7.3 // indirect diff --git a/go.sum b/go.sum index 35c527b1..ed65e6db 100644 --- a/go.sum +++ b/go.sum @@ -120,8 +120,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU= github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/shirou/gopsutil/v4 v4.24.6 h1:9qqCSYF2pgOU+t+NgJtp7Co5+5mHF/HyKBUckySQL64= github.com/shirou/gopsutil/v4 v4.24.6/go.mod h1:aoebb2vxetJ/yIDZISmduFvVNPHqXQ9SEJwRXxkf0RA= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= diff --git a/run.go b/run.go index 97a03ee7..07c752a5 100644 --- a/run.go +++ b/run.go @@ -2,6 +2,7 @@ package cloudrunner import ( "context" + "errors" "flag" "fmt" "log/slog" @@ -45,11 +46,6 @@ type runConfig struct { Client cloudclient.Config // RequestLogger contains request logging config. RequestLogger cloudrequestlog.Config - // Artificial shutdown delay, allows for the service to - // process all incoming requests properly, before cancelling - // the root context. - // Note: Values higher than 10s will not be respected by cloudrun itself. - ShutdownDelay time.Duration } // Run a service. @@ -110,18 +106,6 @@ func Run(fn func(context.Context) error, options ...Option) (err error) { slog.SetDefault(newSlogger(logger)) run.loggerMiddleware.Logger = logger ctx = cloudzap.WithLogger(ctx, logger) - // Set up shutdown delay - if run.config.ShutdownDelay.Seconds() != 0 { - sigCtx := ctx - ctx, cancel = context.WithCancel(context.WithoutCancel(ctx)) - go func() { - <-sigCtx.Done() - logger.Info("delaying shutdown", zap.Duration("duration", run.config.ShutdownDelay)) - time.Sleep(run.config.ShutdownDelay) - cancel() - }() - defer cancel() - } if err := cloudprofiler.Start(run.config.Profiler); err != nil { return fmt.Errorf("cloudrunner.Run: %w", err) } @@ -133,14 +117,28 @@ func Run(fn func(context.Context) error, options ...Option) (err error) { if err != nil { return fmt.Errorf("cloudrunner.Run: %w", err) } - defer stopTraceExporter() stopMetricExporter, err := cloudotel.StartMetricExporter(ctx, run.config.MetricExporter, resource) if err != nil { return fmt.Errorf("cloudrunner.Run: %w", err) } - defer stopMetricExporter() cloudotel.RegisterErrorHandler(ctx) buildInfo, _ := debug.ReadBuildInfo() + go func() { + <-ctx.Done() + // Cloud Run sends a SIGTERM and allows for 10 seconds before it completely shuts down + // the instance. + // See https://cloud.google.com/run/docs/container-contract#instance-shutdown for more details. + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + logger.Info("shutting down") + err := errors.Join( + stopTraceExporter(shutdownCtx), + stopMetricExporter(shutdownCtx), + ) + if err != nil { + logger.Warn("unable to call shutdown routines:\n", zap.Error(err)) + } + }() logger.Info( "up and running", zap.Object("config", config), diff --git a/run_test.go b/run_test.go index bf3f2a19..7c34076d 100644 --- a/run_test.go +++ b/run_test.go @@ -3,14 +3,10 @@ package cloudrunner_test import ( "context" "log" - "syscall" - "testing" - "time" "go.einride.tech/cloudrunner" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" - "gotest.tools/v3/assert" ) func ExampleRun_helloWorld() { @@ -22,24 +18,6 @@ func ExampleRun_helloWorld() { } } -func Test_helloWorldShutdownTimeout(t *testing.T) { - t.Setenv("SHUTDOWNDELAY", "1s") - if err := cloudrunner.Run(func(ctx context.Context) error { - cloudrunner.Logger(ctx).Info("hello world") - beforeKill := time.Now() - go func() { - // Simulating seeding a SIGTERM call. - _ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM) - }() - <-ctx.Done() - afterKill := time.Now() - assert.Assert(t, afterKill.Sub(beforeKill).Seconds() > 1.0) - return nil - }); err != nil { - log.Fatal(err) - } -} - func ExampleRun_gRPCServer() { if err := cloudrunner.Run(func(ctx context.Context) error { grpcServer := cloudrunner.NewGRPCServer(ctx)