From a51e6223ad89dbfd6c05fba2746861b00bf8f664 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 3 Jan 2025 08:40:45 +0100 Subject: [PATCH] Close gzip readers after use (#9770) Signed-off-by: Arve Knudsen --- pkg/continuoustest/client_test.go | 3 +++ pkg/distributor/otel.go | 14 ++++++++------ .../indexheader/stream_binary_reader.go | 3 ++- pkg/util/http.go | 8 ++++++-- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pkg/continuoustest/client_test.go b/pkg/continuoustest/client_test.go index 20071ef5835..322aecdf84d 100644 --- a/pkg/continuoustest/client_test.go +++ b/pkg/continuoustest/client_test.go @@ -37,6 +37,9 @@ func TestOTLPHttpClient_WriteSeries(t *testing.T) { // Handle compression reader, err := gzip.NewReader(request.Body) require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, reader.Close()) + }) // Then Unmarshal body, err := io.ReadAll(reader) diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index 6c110043a9e..1f4621b8446 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/httpgrpc/server" "github.com/grafana/dskit/middleware" + "github.com/grafana/dskit/runutil" "github.com/grafana/dskit/tenant" "github.com/pierrec/lz4/v4" "github.com/pkg/errors" @@ -84,10 +85,10 @@ func OTLPHandler( return httpgrpc.Errorf(http.StatusUnsupportedMediaType, "unsupported compression: %s. Only \"gzip\", \"lz4\", or no compression supported", contentEncoding) } - var decoderFunc func(io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) + var decoderFunc func(io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) switch contentType { case pbContentType: - decoderFunc = func(reader io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) { + decoderFunc = func(reader io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) { exportReq := pmetricotlp.NewExportRequest() unmarshaler := otlpProtoUnmarshaler{ request: &exportReq, @@ -104,7 +105,7 @@ func OTLPHandler( } case jsonContentType: - decoderFunc = func(reader io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) { + decoderFunc = func(reader io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) { exportReq := pmetricotlp.NewExportRequest() sz := int(r.ContentLength) if sz > 0 { @@ -114,16 +115,17 @@ func OTLPHandler( buf := buffers.Get(sz) switch compression { case util.Gzip: - var err error - reader, err = gzip.NewReader(reader) + gzReader, err := gzip.NewReader(reader) if err != nil { return exportReq, 0, errors.Wrap(err, "create gzip reader") } + defer runutil.CloseWithLogOnErr(logger, gzReader, "close gzip reader") + reader = gzReader case util.Lz4: reader = io.NopCloser(lz4.NewReader(reader)) } - reader = http.MaxBytesReader(nil, reader, int64(maxRecvMsgSize)) + reader = http.MaxBytesReader(nil, io.NopCloser(reader), int64(maxRecvMsgSize)) if _, err := buf.ReadFrom(reader); err != nil { if util.IsRequestBodyTooLarge(err) { return exportReq, 0, httpgrpc.Error(http.StatusRequestEntityTooLarge, distributorMaxOTLPRequestSizeErr{ diff --git a/pkg/storegateway/indexheader/stream_binary_reader.go b/pkg/storegateway/indexheader/stream_binary_reader.go index 8a605b736ab..88d9ab1657b 100644 --- a/pkg/storegateway/indexheader/stream_binary_reader.go +++ b/pkg/storegateway/indexheader/stream_binary_reader.go @@ -197,8 +197,9 @@ func (r *StreamBinaryReader) loadFromSparseIndexHeader(logger *spanlogger.SpanLo gzipped := bytes.NewReader(sparseData) gzipReader, err := gzip.NewReader(gzipped) if err != nil { - return fmt.Errorf("failed to create sparse index-header reader: %w", err) + return fmt.Errorf("failed to create sparse index-header gzip reader: %w", err) } + defer runutil.CloseWithLogOnErr(logger, gzipReader, "close sparse index-header gzip reader") sparseData, err = io.ReadAll(gzipReader) if err != nil { diff --git a/pkg/util/http.go b/pkg/util/http.go index 52bfb713282..7a50d3c430b 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -220,11 +220,15 @@ func decompressRequest(buffers *RequestBuffers, reader io.Reader, expectedSize, return decompressSnappyFromBuffer(buffers, buf, maxSize, sp) } case Gzip: - var err error - reader, err = gzip.NewReader(reader) + gzReader, err := gzip.NewReader(reader) if err != nil { return nil, errors.Wrap(err, "create gzip reader") } + + defer func() { + _ = gzReader.Close() + }() + reader = gzReader case Lz4: reader = lz4.NewReader(reader) default: