From 8efd9336c754a7e0144ab3bb9b20bfb75df1860d Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 18 Feb 2024 19:17:16 -0300 Subject: [PATCH 1/8] fix: goroutine leaks --- cluster/cluster_test.go | 6 +++++- cmd/gubernator/main.go | 2 +- daemon.go | 7 ++++++- global.go | 2 ++ go.mod | 6 +++--- go.sum | 7 +++++-- gubernator.go | 16 +++++++--------- 7 files changed, 29 insertions(+), 17 deletions(-) diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 900d0ced..1f5a7ba8 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -23,12 +23,16 @@ import ( "github.com/mailgun/gubernator/v2/cluster" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/goleak" ) func TestStartMultipleInstances(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) err := cluster.Start(2) require.NoError(t, err) - defer cluster.Stop() + t.Cleanup(cluster.Stop) assert.Equal(t, 2, len(cluster.GetPeers())) assert.Equal(t, 2, len(cluster.GetDaemons())) diff --git a/cmd/gubernator/main.go b/cmd/gubernator/main.go index 2d3d6fe8..f0308691 100644 --- a/cmd/gubernator/main.go +++ b/cmd/gubernator/main.go @@ -31,7 +31,7 @@ import ( "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" - "k8s.io/klog" + "k8s.io/klog/v2" ) var log = logrus.WithField("category", "gubernator") diff --git a/daemon.go b/daemon.go index a220136b..aa0cd208 100644 --- a/daemon.go +++ b/daemon.go @@ -19,6 +19,7 @@ package gubernator import ( "context" "crypto/tls" + "io" "log" "net" "net/http" @@ -49,6 +50,7 @@ type Daemon struct { V1Server *V1Instance log FieldLogger + logWriter *io.PipeWriter pool PoolInterface conf DaemonConfig httpSrv *http.Server @@ -282,7 +284,8 @@ func (s *Daemon) Start(ctx context.Context) error { s.promRegister, promhttp.HandlerFor(s.promRegister, promhttp.HandlerOpts{}), )) mux.Handle("/", gateway) - log := log.New(newLogWriter(s.log), "", 0) + s.logWriter = newLogWriter(s.log) + log := log.New(s.logWriter, "", 0) s.httpSrv = &http.Server{Addr: s.conf.HTTPListenAddress, Handler: mux, ErrorLog: log} s.HTTPListener, err = net.Listen("tcp", s.conf.HTTPListenAddress) @@ -376,6 +379,8 @@ func (s *Daemon) Close() { s.log.Infof("GRPC close for %s ...", s.GRPCListeners[i].Addr()) srv.GracefulStop() } + s.logWriter.Close() + _ = s.V1Server.Close() s.wg.Stop() s.statsHandler.Close() s.gwCancel() diff --git a/global.go b/global.go index 78431960..c83a0d9f 100644 --- a/global.go +++ b/global.go @@ -116,6 +116,7 @@ func (gm *globalManager) runAsyncHits() { hits = make(map[string]*RateLimitReq) } case <-done: + interval.Stop() return false } return true @@ -203,6 +204,7 @@ func (gm *globalManager) runBroadcasts() { gm.metricGlobalQueueLength.Set(0) } case <-done: + interval.Stop() return false } return true diff --git a/go.mod b/go.mod index e32b4cd3..af975b8c 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( go.opentelemetry.io/otel v1.21.0 go.opentelemetry.io/otel/sdk v1.21.0 go.opentelemetry.io/otel/trace v1.21.0 + go.uber.org/goleak v1.3.0 golang.org/x/net v0.18.0 golang.org/x/time v0.3.0 google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b @@ -30,7 +31,7 @@ require ( k8s.io/api v0.23.3 k8s.io/apimachinery v0.23.3 k8s.io/client-go v0.23.3 - k8s.io/klog v0.3.1 + k8s.io/klog/v2 v2.120.1 ) require ( @@ -41,7 +42,7 @@ require ( github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect @@ -91,7 +92,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/klog/v2 v2.30.0 // indirect k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect diff --git a/go.sum b/go.sum index 9b2e3287..e7b7b7e2 100644 --- a/go.sum +++ b/go.sum @@ -141,6 +141,8 @@ github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= @@ -452,6 +454,7 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= @@ -881,12 +884,12 @@ k8s.io/apimachinery v0.23.3/go.mod h1:BEuFMMBaIbcOqVIJqNZJXGFTP4W6AycEpb5+m/97hr k8s.io/client-go v0.23.3 h1:23QYUmCQ/W6hW78xIwm3XqZrrKZM+LWDqW2zfo+szJs= k8s.io/client-go v0.23.3/go.mod h1:47oMd+YvAOqZM7pcQ6neJtBiFH7alOyfunYN48VsmwE= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= -k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68= -k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.30.0 h1:bUO6drIvCIsvZ/XFgfxoGFQU/a4Qkh0iAlvUR7vlHJw= k8s.io/klog/v2 v2.30.0/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= +k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 h1:E3J9oCLlaobFUqsjG9DfKbP2BmgwBL2p7pn0A3dG9W4= k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65/go.mod h1:sX9MT8g7NVZM5lVL/j8QyCCJe8YSMW30QvGZWaCIDIk= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= diff --git a/gubernator.go b/gubernator.go index 59c26eca..9f8e608a 100644 --- a/gubernator.go +++ b/gubernator.go @@ -149,17 +149,15 @@ func (s *V1Instance) Close() (err error) { return nil } - if s.conf.Loader == nil { - return nil - } - s.global.Close() - err = s.workerPool.Store(ctx) - if err != nil { - s.log.WithError(err). - Error("Error in workerPool.Store") - return errors.Wrap(err, "Error in workerPool.Store") + if s.conf.Loader != nil { + err = s.workerPool.Store(ctx) + if err != nil { + s.log.WithError(err). + Error("Error in workerPool.Store") + return errors.Wrap(err, "Error in workerPool.Store") + } } err = s.workerPool.Close() From 377e3381b38946ffe44f102d8bdee88b534cb33e Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 18 Feb 2024 19:29:08 -0300 Subject: [PATCH 2/8] go mod tidy --- go.sum | 3 --- 1 file changed, 3 deletions(-) diff --git a/go.sum b/go.sum index e7b7b7e2..6ada5946 100644 --- a/go.sum +++ b/go.sum @@ -139,8 +139,6 @@ github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7 github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -886,7 +884,6 @@ k8s.io/client-go v0.23.3/go.mod h1:47oMd+YvAOqZM7pcQ6neJtBiFH7alOyfunYN48VsmwE= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= -k8s.io/klog/v2 v2.30.0 h1:bUO6drIvCIsvZ/XFgfxoGFQU/a4Qkh0iAlvUR7vlHJw= k8s.io/klog/v2 v2.30.0/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= From d14b32e52ec4e2e4bb0e950b5e54467b4601b8b3 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 18 Feb 2024 23:09:58 -0300 Subject: [PATCH 3/8] os.Exit() does not run deferred fns --- functional_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/functional_test.go b/functional_test.go index 4abd2e25..d0954fb6 100644 --- a/functional_test.go +++ b/functional_test.go @@ -64,8 +64,11 @@ func TestMain(m *testing.M) { fmt.Println(err) os.Exit(1) } - defer cluster.Stop() - os.Exit(m.Run()) + code := m.Run() + cluster.Stop() + + // os.Exit doesn't run deferred functions + os.Exit(code) } func TestOverTheLimit(t *testing.T) { From bbb7e7a66e059f68c03a56ee5e3da44bb42e1fbd Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 18 Feb 2024 23:35:05 -0300 Subject: [PATCH 4/8] prefer defer --- global.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/global.go b/global.go index c83a0d9f..c7b7ef77 100644 --- a/global.go +++ b/global.go @@ -82,6 +82,7 @@ func (gm *globalManager) QueueUpdate(r *RateLimitReq) { // be sent to their owning peers. func (gm *globalManager) runAsyncHits() { var interval = NewInterval(gm.conf.GlobalSyncWait) + defer interval.Stop() hits := make(map[string]*RateLimitReq) gm.wg.Until(func(done chan struct{}) bool { @@ -116,7 +117,6 @@ func (gm *globalManager) runAsyncHits() { hits = make(map[string]*RateLimitReq) } case <-done: - interval.Stop() return false } return true @@ -174,6 +174,7 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { // runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster. func (gm *globalManager) runBroadcasts() { var interval = NewInterval(gm.conf.GlobalSyncWait) + defer interval.Stop() updates := make(map[string]*RateLimitReq) gm.wg.Until(func(done chan struct{}) bool { @@ -204,7 +205,6 @@ func (gm *globalManager) runBroadcasts() { gm.metricGlobalQueueLength.Set(0) } case <-done: - interval.Stop() return false } return true From a44560e600c42d8ef7f8a10f34264ce1fbcd7df1 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 18 Feb 2024 23:58:09 -0300 Subject: [PATCH 5/8] more leaks identified --- functional_test.go | 15 +++++++++++++++ gubernator.go | 4 ++++ 2 files changed, 19 insertions(+) diff --git a/functional_test.go b/functional_test.go index d0954fb6..c3dfc1d2 100644 --- a/functional_test.go +++ b/functional_test.go @@ -45,6 +45,13 @@ var algos = []struct { {Name: "Leaky bucket", Algorithm: guber.Algorithm_LEAKY_BUCKET}, } +// runFunc implements goleak.TestingM around a function reference. +//type runFunc func() int +// +//func (f runFunc) Run() int { +// return f() +//} + // Setup and shutdown the mock gubernator cluster for the entire test suite func TestMain(m *testing.M) { if err := cluster.StartWith([]guber.PeerInfo{ @@ -69,6 +76,14 @@ func TestMain(m *testing.M) { // os.Exit doesn't run deferred functions os.Exit(code) + + //goleak.VerifyTestMain( + // runFunc(func() int { + // exitCode := m.Run() + // cluster.Stop() + // return exitCode + // }), + //) } func TestOverTheLimit(t *testing.T) { diff --git a/gubernator.go b/gubernator.go index 9f8e608a..257f9b2c 100644 --- a/gubernator.go +++ b/gubernator.go @@ -167,6 +167,10 @@ func (s *V1Instance) Close() (err error) { return errors.Wrap(err, "Error in workerPool.Close") } + //for _, client := range s.conf.LocalPicker.Peers() { + // client.Shutdown(ctx) + //} + s.isClosed = true return nil } From 058cb63499f1597f5741c78991f56c157c5ef41f Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Mon, 19 Feb 2024 18:23:26 -0300 Subject: [PATCH 6/8] remove code that is commented --- functional_test.go | 8 -------- gubernator.go | 4 ---- 2 files changed, 12 deletions(-) diff --git a/functional_test.go b/functional_test.go index c3dfc1d2..e050b952 100644 --- a/functional_test.go +++ b/functional_test.go @@ -76,14 +76,6 @@ func TestMain(m *testing.M) { // os.Exit doesn't run deferred functions os.Exit(code) - - //goleak.VerifyTestMain( - // runFunc(func() int { - // exitCode := m.Run() - // cluster.Stop() - // return exitCode - // }), - //) } func TestOverTheLimit(t *testing.T) { diff --git a/gubernator.go b/gubernator.go index 257f9b2c..9f8e608a 100644 --- a/gubernator.go +++ b/gubernator.go @@ -167,10 +167,6 @@ func (s *V1Instance) Close() (err error) { return errors.Wrap(err, "Error in workerPool.Close") } - //for _, client := range s.conf.LocalPicker.Peers() { - // client.Shutdown(ctx) - //} - s.isClosed = true return nil } From 3ad29206bff96abb661fae929bdcdb214286b9d0 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Mon, 19 Feb 2024 18:25:09 -0300 Subject: [PATCH 7/8] remove commented code --- functional_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/functional_test.go b/functional_test.go index e050b952..d0954fb6 100644 --- a/functional_test.go +++ b/functional_test.go @@ -45,13 +45,6 @@ var algos = []struct { {Name: "Leaky bucket", Algorithm: guber.Algorithm_LEAKY_BUCKET}, } -// runFunc implements goleak.TestingM around a function reference. -//type runFunc func() int -// -//func (f runFunc) Run() int { -// return f() -//} - // Setup and shutdown the mock gubernator cluster for the entire test suite func TestMain(m *testing.M) { if err := cluster.StartWith([]guber.PeerInfo{ From 43e70eb5fce211efb94306bac8b93800d1bd933f Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Mon, 19 Feb 2024 18:34:26 -0300 Subject: [PATCH 8/8] fix test --- global.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/global.go b/global.go index c7b7ef77..c83a0d9f 100644 --- a/global.go +++ b/global.go @@ -82,7 +82,6 @@ func (gm *globalManager) QueueUpdate(r *RateLimitReq) { // be sent to their owning peers. func (gm *globalManager) runAsyncHits() { var interval = NewInterval(gm.conf.GlobalSyncWait) - defer interval.Stop() hits := make(map[string]*RateLimitReq) gm.wg.Until(func(done chan struct{}) bool { @@ -117,6 +116,7 @@ func (gm *globalManager) runAsyncHits() { hits = make(map[string]*RateLimitReq) } case <-done: + interval.Stop() return false } return true @@ -174,7 +174,6 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { // runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster. func (gm *globalManager) runBroadcasts() { var interval = NewInterval(gm.conf.GlobalSyncWait) - defer interval.Stop() updates := make(map[string]*RateLimitReq) gm.wg.Until(func(done chan struct{}) bool { @@ -205,6 +204,7 @@ func (gm *globalManager) runBroadcasts() { gm.metricGlobalQueueLength.Set(0) } case <-done: + interval.Stop() return false } return true