From 432c84791c4b342e5ace6344faa9c0713b274683 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Fri, 8 Mar 2024 12:11:21 -0500 Subject: [PATCH] Fix inconsistency in response status when calling a non-owner peer. --- functional_test.go | 215 ++++++++++++++++++++++++++++++++------------- global.go | 36 +++++--- gubernator.go | 13 ++- 3 files changed, 189 insertions(+), 75 deletions(-) diff --git a/functional_test.go b/functional_test.go index 4381bc82..03dcc051 100644 --- a/functional_test.go +++ b/functional_test.go @@ -34,6 +34,7 @@ import ( guber "github.com/mailgun/gubernator/v2" "github.com/mailgun/gubernator/v2/cluster" "github.com/mailgun/holster/v4/clock" + "github.com/mailgun/holster/v4/syncutil" "github.com/mailgun/holster/v4/testutil" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" @@ -968,16 +969,12 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) { key := randomKey() // Determine owner and non-owner peers. - ownerPeerInfo, err := cluster.FindOwningPeer(name, key) + owner, err := cluster.FindOwningDaemon(name, key) require.NoError(t, err) - ownerDaemon, err := cluster.FindOwningDaemon(name, key) + // ownerAddr := owner.ownerPeerInfo.GRPCAddress + peers, err := cluster.ListNonOwningDaemons(name, key) require.NoError(t, err) - owner := ownerPeerInfo.GRPCAddress - nonOwner := cluster.PeerAt(0).GRPCAddress - if nonOwner == owner { - nonOwner = cluster.PeerAt(1).GRPCAddress - } - require.NotEqual(t, owner, nonOwner) + nonOwner := peers[0] // Connect to owner and non-owner peers in round robin. dialOpts := []grpc.DialOption{ @@ -985,22 +982,22 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) { grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), } - address := fmt.Sprintf("static:///%s,%s", owner, nonOwner) + address := fmt.Sprintf("static:///%s,%s", owner.PeerInfo.GRPCAddress, nonOwner.PeerInfo.GRPCAddress) conn, err := grpc.DialContext(ctx, address, dialOpts...) require.NoError(t, err) client := guber.NewV1Client(conn) - sendHit := func(status guber.Status, i int) { - ctx, cancel := context.WithTimeout(ctx, 10*clock.Second) + sendHit := func(client guber.V1Client, status guber.Status, i int) { + ctx, cancel := context.WithTimeout(context.Background(), 10*clock.Second) defer cancel() resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ Requests: []*guber.RateLimitReq{ { Name: name, UniqueKey: key, - Algorithm: guber.Algorithm_LEAKY_BUCKET, + Algorithm: guber.Algorithm_TOKEN_BUCKET, Behavior: guber.Behavior_GLOBAL, - Duration: guber.Minute * 5, + Duration: 5 * guber.Minute, Hits: 1, Limit: 2, }, @@ -1008,29 +1005,34 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) { }) require.NoError(t, err, i) item := resp.Responses[0] - assert.Equal(t, "", item.GetError(), fmt.Sprintf("mismatch error, iteration %d", i)) - assert.Equal(t, status, item.GetStatus(), fmt.Sprintf("mismatch status, iteration %d", i)) + assert.Equal(t, "", item.Error, fmt.Sprintf("unexpected error, iteration %d", i)) + assert.Equal(t, status, item.Status, fmt.Sprintf("mismatch status, iteration %d", i)) } + require.NoError(t, waitForBroadcast(1*clock.Minute, owner, 0)) + require.NoError(t, waitForBroadcast(1*clock.Minute, nonOwner, 0)) + // Send two hits that should be processed by the owner and non-owner and // deplete the limit consistently. - sendHit(guber.Status_UNDER_LIMIT, 1) - sendHit(guber.Status_UNDER_LIMIT, 2) - require.NoError(t, waitForBroadcast(clock.Second*3, ownerDaemon, 1)) + sendHit(client, guber.Status_UNDER_LIMIT, 1) + sendHit(client, guber.Status_UNDER_LIMIT, 2) + require.NoError(t, waitForBroadcast(3*clock.Second, owner, 1)) // All successive hits should return OVER_LIMIT. for i := 2; i <= 10; i++ { - sendHit(guber.Status_OVER_LIMIT, i) + sendHit(client, guber.Status_OVER_LIMIT, i) } } func TestGlobalRateLimitsPeerOverLimit(t *testing.T) { name := t.Name() key := randomKey() + owner, err := cluster.FindOwningDaemon(name, key) + require.NoError(t, err) peers, err := cluster.ListNonOwningDaemons(name, key) require.NoError(t, err) - sendHit := func(expectedStatus guber.Status, hits int64) { + sendHit := func(expectedStatus guber.Status, hits, expectedRemaining int64) { ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) defer cancel() resp, err := peers[0].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{ @@ -1047,24 +1049,31 @@ func TestGlobalRateLimitsPeerOverLimit(t *testing.T) { }, }) assert.NoError(t, err) - assert.Equal(t, "", resp.Responses[0].GetError()) - assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus()) + item := resp.Responses[0] + assert.Equal(t, "", item.Error, "unexpected error") + assert.Equal(t, expectedStatus, item.Status, "mismatch status") + assert.Equal(t, expectedRemaining, item.Remaining, "mismatch remaining") } - owner, err := cluster.FindOwningDaemon(name, key) - require.NoError(t, err) + + require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...)) // Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining - sendHit(guber.Status_UNDER_LIMIT, 1) - sendHit(guber.Status_UNDER_LIMIT, 1) + sendHit(guber.Status_UNDER_LIMIT, 1, 1) + sendHit(guber.Status_UNDER_LIMIT, 1, 0) + // Wait for the broadcast from the owner to the peer - require.NoError(t, waitForBroadcast(clock.Second*3, owner, 1)) + require.NoError(t, waitForBroadcast(3*clock.Second, owner, 1)) + // Since the remainder is 0, the peer should set OVER_LIMIT instead of waiting for the owner // to respond with OVER_LIMIT. - sendHit(guber.Status_OVER_LIMIT, 1) + sendHit(guber.Status_OVER_LIMIT, 1, 0) + // Wait for the broadcast from the owner to the peer - require.NoError(t, waitForBroadcast(clock.Second*3, owner, 2)) + require.NoError(t, waitForBroadcast(3*clock.Second, owner, 2)) + // The status should still be OVER_LIMIT - sendHit(guber.Status_OVER_LIMIT, 0) + sendHit(guber.Status_UNDER_LIMIT, 0, 0) + sendHit(guber.Status_OVER_LIMIT, 1, 0) } func TestGlobalRateLimitsPeerOverLimitLeaky(t *testing.T) { @@ -1072,6 +1081,8 @@ func TestGlobalRateLimitsPeerOverLimitLeaky(t *testing.T) { key := randomKey() peers, err := cluster.ListNonOwningDaemons(name, key) require.NoError(t, err) + owner, err := cluster.FindOwningDaemon(name, key) + require.NoError(t, err) sendHit := func(client guber.V1Client, expectedStatus guber.Status, hits int64) { ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) @@ -1093,27 +1104,20 @@ func TestGlobalRateLimitsPeerOverLimitLeaky(t *testing.T) { assert.Equal(t, "", resp.Responses[0].GetError()) assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus()) } - owner, err := cluster.FindOwningDaemon(name, key) - require.NoError(t, err) + + require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...)) // Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1) sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1) + // Wait for the broadcast from the owner to the peers require.NoError(t, waitForBroadcast(clock.Second*3, owner, 1)) + // Ask a different peer if the status is over the limit sendHit(peers[1].MustClient(), guber.Status_OVER_LIMIT, 1) } -func getMetricRequest(url string, name string) (*model.Sample, error) { - resp, err := http.Get(url) - if err != nil { - return nil, err - } - defer resp.Body.Close() - return getMetric(resp.Body, name) -} - func TestChangeLimit(t *testing.T) { client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) require.Nil(t, errs) @@ -1530,6 +1534,8 @@ func TestGlobalBehavior(t *testing.T) { require.NoError(t, err) t.Logf("Owner peer: %s", owner.InstanceID) + require.NoError(t, waitForIdle(1*time.Minute, cluster.GetDaemons()...)) + broadcastCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_broadcast_duration_count") updateCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_global_send_duration_count") upgCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/UpdatePeerGlobals\"}") @@ -1896,6 +1902,56 @@ func TestGlobalBehavior(t *testing.T) { }) } +// Request metrics and parse into map. +// Optionally pass names to filter metrics by name. +func getMetrics(HTTPAddr string, names ...string) (map[string]*model.Sample, error) { + url := fmt.Sprintf("http://%s/metrics", HTTPAddr) + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + decoder := expfmt.SampleDecoder{ + Dec: expfmt.NewDecoder(resp.Body, expfmt.FmtText), + Opts: &expfmt.DecodeOptions{ + Timestamp: model.Now(), + }, + } + nameSet := make(map[string]struct{}) + for _, name := range names { + nameSet[name] = struct{}{} + } + metrics := make(map[string]*model.Sample) + + for { + var smpls model.Vector + err := decoder.Decode(&smpls) + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + for _, smpl := range smpls { + name := smpl.Metric.String() + if _, ok := nameSet[name]; ok || len(nameSet) == 0 { + metrics[name] = smpl + } + } + } + + return metrics, nil +} + +func getMetricRequest(url string, name string) (*model.Sample, error) { + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + return getMetric(resp.Body, name) +} + func getMetric(in io.Reader, name string) (*model.Sample, error) { dec := expfmt.SampleDecoder{ Dec: expfmt.NewDecoder(in, expfmt.FmtText), @@ -1926,31 +1982,29 @@ func getMetric(in io.Reader, name string) (*model.Sample, error) { } // waitForBroadcast waits until the broadcast count for the daemon changes to -// the expected value. Returns an error if the expected value is not found -// before the context is cancelled. +// at least the expected value and the broadcast queue is empty. +// Returns an error if timeout waiting for conditions to be met. func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect int) error { - // fmt.Printf("waitForBroadcast() peer: %s\n", d.InstanceID) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() for { - m, err := getMetricRequest(fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress), - "gubernator_broadcast_duration_count") + metrics, err := getMetrics(d.Config().HTTPListenAddress, + "gubernator_broadcast_duration_count", "gubernator_global_queue_length") if err != nil { return err } - // fmt.Printf("gubernator_broadcast_duration_count: %f\n", m.Value) + gbdc := metrics["gubernator_broadcast_duration_count"] + ggql := metrics["gubernator_global_queue_length"] - // It's possible a broadcast occurred twice if waiting for multiple peer to - // forward updates to the owner. - if int(m.Value) >= expect { - // Give the nodes some time to process the broadcasts - // clock.Sleep(clock.Millisecond * 500) + // It's possible a broadcast occurred twice if waiting for multiple + // peers to forward updates to non-owners. + if int(gbdc.Value) >= expect && ggql.Value == 0 { return nil } select { - case <-clock.After(time.Millisecond * 100): + case <-clock.After(100 * clock.Millisecond): case <-ctx.Done(): return ctx.Err() } @@ -1958,35 +2012,72 @@ func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect int) error } // waitForUpdate waits until the global hits update count for the daemon -// changes to the expected value. Returns an error if the expected value is not -// found before the context is cancelled. +// changes to at least the expected value and the global update queue is empty. +// Returns an error if timeout waiting for conditions to be met. func waitForUpdate(timeout clock.Duration, d *guber.Daemon, expect int) error { - // fmt.Printf("waitForUpdate() peer: %s\n", d.InstanceID) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() for { - m, err := getMetricRequest(fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress), - "gubernator_global_send_duration_count") + metrics, err := getMetrics(d.Config().HTTPListenAddress, + "gubernator_global_send_duration_count", "gubernator_global_send_queue_length") if err != nil { return err } - // fmt.Printf("gubernator_global_send_duration_count: %f\n", m.Value) + gsdc := metrics["gubernator_global_send_duration_count"] + gsql := metrics["gubernator_global_send_queue_length"] - // It's possible a broadcast occurred twice if waiting for multiple peer to + // It's possible a hit occurred twice if waiting for multiple peers to // forward updates to the owner. - if int(m.Value) >= expect { + if int(gsdc.Value) >= expect && gsql.Value == 0 { return nil } select { - case <-clock.After(time.Millisecond * 100): + case <-clock.After(100 * clock.Millisecond): case <-ctx.Done(): return ctx.Err() } } } +// waitForIdle waits until both global broadcast and global hits queues are +// empty. +func waitForIdle(timeout clock.Duration, daemons ...*guber.Daemon) error { + var wg syncutil.WaitGroup + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for _, d := range daemons { + wg.Run(func(raw any) error { + d := raw.(*guber.Daemon) + for { + metrics, err := getMetrics(d.Config().HTTPListenAddress, + "gubernator_global_queue_length", "gubernator_global_send_queue_length") + if err != nil { + return err + } + ggql := metrics["gubernator_global_queue_length"] + gsql := metrics["gubernator_global_send_queue_length"] + + if ggql.Value == 0 && gsql.Value == 0 { + return nil + } + + select { + case <-clock.After(100 * clock.Millisecond): + case <-ctx.Done(): + return ctx.Err() + } + } + }, d) + } + errs := wg.Wait() + if len(errs) > 0 { + return errs[0] + } + return nil +} + func getMetricValue(t *testing.T, d *guber.Daemon, name string) float64 { m, err := getMetricRequest(fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress), name) diff --git a/global.go b/global.go index 81701adb..d8ba66f8 100644 --- a/global.go +++ b/global.go @@ -34,6 +34,7 @@ type globalManager struct { log FieldLogger instance *V1Instance // todo circular import? V1Instance also holds a reference to globalManager metricGlobalSendDuration prometheus.Summary + metricGlobalSendQueueLength prometheus.Gauge metricBroadcastDuration prometheus.Summary metricBroadcastCounter *prometheus.CounterVec metricGlobalQueueLength prometheus.Gauge @@ -51,6 +52,10 @@ func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager Help: "The duration of GLOBAL async sends in seconds.", Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001}, }), + metricGlobalSendQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "gubernator_global_send_queue_length", + Help: "The count of requests queued up for global broadcast. This is only used for GetRateLimit requests using global behavior.", + }), metricBroadcastDuration: prometheus.NewSummary(prometheus.SummaryOpts{ Name: "gubernator_broadcast_duration", Help: "The duration of GLOBAL broadcasts to peers in seconds.", @@ -95,6 +100,7 @@ func (gm *globalManager) runAsyncHits() { select { case r := <-gm.hitsQueue: + gm.metricGlobalSendQueueLength.Set(float64(len(hits))) // Aggregate the hits into a single request key := r.HashKey() _, ok := hits[key] @@ -108,6 +114,7 @@ func (gm *globalManager) runAsyncHits() { if len(hits) == gm.conf.GlobalBatchLimit { gm.sendHits(hits) hits = make(map[string]*RateLimitReq) + gm.metricGlobalSendQueueLength.Set(0) return true } @@ -121,6 +128,7 @@ func (gm *globalManager) runAsyncHits() { if len(hits) != 0 { gm.sendHits(hits) hits = make(map[string]*RateLimitReq) + gm.metricGlobalSendQueueLength.Set(0) } case <-done: return false @@ -132,6 +140,7 @@ func (gm *globalManager) runAsyncHits() { // sendHits takes the hits collected by runAsyncHits and sends them to their // owning peers func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { + // fmt.Printf("sendHits() %s, hits: %d\n", gm.instance.conf.InstanceID, len(hits)) type pair struct { client *PeerClient req GetPeerRateLimitsReq @@ -189,29 +198,32 @@ func (gm *globalManager) runBroadcasts() { select { case r := <-gm.updatesQueue: updates[r.HashKey()] = r + gm.metricGlobalQueueLength.Set(float64(len(updates))) - // Send the hits if we reached our batch limit + // Send the broadcast if we reached our batch limit if len(updates) >= gm.conf.GlobalBatchLimit { gm.metricBroadcastCounter.WithLabelValues("queue_full").Inc() - gm.broadcastPeers(context.Background(), updates) + gm.broadcastPeers(updates) updates = make(map[string]*RateLimitReq) + gm.metricGlobalQueueLength.Set(0) return true } - // If this is our first queued hit since last send + // If this is our first queued updated since last send // queue the next interval if len(updates) == 1 { interval.Next() } case <-interval.C: - if len(updates) != 0 { - gm.metricBroadcastCounter.WithLabelValues("timer").Inc() - gm.broadcastPeers(context.Background(), updates) - updates = make(map[string]*RateLimitReq) - } else { - gm.metricGlobalQueueLength.Set(0) + if len(updates) == 0 { + break } + gm.metricBroadcastCounter.WithLabelValues("timer").Inc() + gm.broadcastPeers(updates) + updates = make(map[string]*RateLimitReq) + gm.metricGlobalQueueLength.Set(0) + case <-done: return false } @@ -220,12 +232,12 @@ func (gm *globalManager) runBroadcasts() { } // broadcastPeers broadcasts global rate limit statuses to all other peers -func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]*RateLimitReq) { +func (gm *globalManager) broadcastPeers(updates map[string]*RateLimitReq) { + // fmt.Printf("broadcastPeers() %s, updates: %d\n", gm.instance.conf.InstanceID, len(updates)) defer prometheus.NewTimer(gm.metricBroadcastDuration).ObserveDuration() + ctx := context.Background() var req UpdatePeerGlobalsReq - gm.metricGlobalQueueLength.Set(float64(len(updates))) - for _, r := range updates { // Copy the original since we are removing the GLOBAL behavior rl := proto.Clone(r).(*RateLimitReq) diff --git a/gubernator.go b/gubernator.go index 42f137d0..dc4be285 100644 --- a/gubernator.go +++ b/gubernator.go @@ -405,7 +405,13 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq) // Global rate limits are always stored as RateLimitResp regardless of algorithm rl, ok := item.Value.(*RateLimitResp) if ok { - return rl, nil + rl2 := proto.Clone(rl).(*RateLimitResp) + if req.Hits > rl2.Remaining { + rl2.Status = Status_OVER_LIMIT + } else { + rl2.Status = Status_UNDER_LIMIT + } + return rl2, nil } // We get here if the owning node hasn't asynchronously forwarded it's updates to us yet and // our cache still holds the rate limit we created on the first hit. @@ -427,6 +433,7 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq) // UpdatePeerGlobals updates the local cache with a list of global rate limits. This method should only // be called by a peer who is the owner of a global rate limit. func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (*UpdatePeerGlobalsResp, error) { + // fmt.Printf("UpdatePeerGlobals() %s, req: %s", s.conf.InstanceID, spew.Sdump(r)) defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.UpdatePeerGlobals")).ObserveDuration() for _, g := range r.Globals { item := &CacheItem{ @@ -494,6 +501,7 @@ func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits rl = &RateLimitResp{Error: err.Error()} // metricCheckErrorCounter is updated within getLocalRateLimit(), not in GetPeerRateLimits. } + // fmt.Printf("GetPeerRateLimits() %s, hits: %d, resp: %#v\n", s.conf.InstanceID, req.Hits, rl) respChan <- respOut{rin.idx, rl} return nil @@ -570,6 +578,7 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_ return nil, errors.Wrap(err, "during workerPool.GetRateLimit") } + // fmt.Printf("getLocalRateLimit() %s, resp: %#v\n", s.conf.InstanceID, resp) metricGetRateLimitCounter.WithLabelValues("local").Inc() // If global behavior and owning peer, broadcast update to all peers. @@ -711,6 +720,7 @@ func (s *V1Instance) Describe(ch chan<- *prometheus.Desc) { s.global.metricBroadcastDuration.Describe(ch) s.global.metricGlobalQueueLength.Describe(ch) s.global.metricGlobalSendDuration.Describe(ch) + s.global.metricGlobalSendQueueLength.Describe(ch) } // Collect fetches metrics from the server for use by prometheus @@ -729,6 +739,7 @@ func (s *V1Instance) Collect(ch chan<- prometheus.Metric) { s.global.metricBroadcastDuration.Collect(ch) s.global.metricGlobalQueueLength.Collect(ch) s.global.metricGlobalSendDuration.Collect(ch) + s.global.metricGlobalSendQueueLength.Collect(ch) } // HasBehavior returns true if the provided behavior is set