Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Fix flaky test TestHealthCheck.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Mar 11, 2024
1 parent d5c74d2 commit cd7bbab
Showing 1 changed file with 81 additions and 123 deletions.
204 changes: 81 additions & 123 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"testing"
"time"

"github.com/mailgun/errors"
guber "github.com/mailgun/gubernator/v2"
"github.com/mailgun/gubernator/v2/cluster"
"github.com/mailgun/holster/v4/clock"
Expand All @@ -48,30 +49,12 @@ import (

// Setup and shutdown the mock gubernator cluster for the entire test suite
func TestMain(m *testing.M) {
if err := cluster.StartWith([]guber.PeerInfo{
{GRPCAddress: "127.0.0.1:9990", HTTPAddress: "127.0.0.1:9980", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9991", HTTPAddress: "127.0.0.1:9981", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9992", HTTPAddress: "127.0.0.1:9982", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9993", HTTPAddress: "127.0.0.1:9983", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9994", HTTPAddress: "127.0.0.1:9984", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9995", HTTPAddress: "127.0.0.1:9985", DataCenter: cluster.DataCenterNone},

// DataCenterOne
{GRPCAddress: "127.0.0.1:9890", HTTPAddress: "127.0.0.1:9880", DataCenter: cluster.DataCenterOne},
{GRPCAddress: "127.0.0.1:9891", HTTPAddress: "127.0.0.1:9881", DataCenter: cluster.DataCenterOne},
{GRPCAddress: "127.0.0.1:9892", HTTPAddress: "127.0.0.1:9882", DataCenter: cluster.DataCenterOne},
{GRPCAddress: "127.0.0.1:9893", HTTPAddress: "127.0.0.1:9883", DataCenter: cluster.DataCenterOne},
}); err != nil {
err := startGubernator()
if err != nil {
fmt.Println(err)
os.Exit(1)
}

// Populate peer clients. Avoids data races when goroutines conflict trying
// to instantiate client singletons.
for _, peer := range cluster.GetDaemons() {
_ = peer.MustClient()
}

code := m.Run()
cluster.Stop()

Expand All @@ -80,8 +63,8 @@ func TestMain(m *testing.M) {
}

func TestOverTheLimit(t *testing.T) {
client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.NoError(t, err)

tests := []struct {
Remaining int64
Expand Down Expand Up @@ -115,7 +98,7 @@ func TestOverTheLimit(t *testing.T) {
},
},
})
require.Nil(t, err)
require.NoError(t, err)

rl := resp.Responses[0]

Expand All @@ -135,7 +118,7 @@ func TestMultipleAsync(t *testing.T) {

t.Logf("Asking Peer: %s", cluster.GetPeers()[0].GRPCAddress)
client, errs := guber.DialV1Server(cluster.GetPeers()[0].GRPCAddress, nil)
require.Nil(t, errs)
require.NoError(t, errs)

resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
Expand All @@ -159,7 +142,7 @@ func TestMultipleAsync(t *testing.T) {
},
},
})
require.Nil(t, err)
require.NoError(t, err)

require.Len(t, resp.Responses, 2)

Expand All @@ -178,8 +161,8 @@ func TestTokenBucket(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

addr := cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress
client, errs := guber.DialV1Server(addr, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(addr, nil)
require.NoError(t, err)

tests := []struct {
name string
Expand Down Expand Up @@ -221,7 +204,7 @@ func TestTokenBucket(t *testing.T) {
},
},
})
require.Nil(t, err)
require.NoError(t, err)

rl := resp.Responses[0]

Expand All @@ -238,8 +221,8 @@ func TestTokenBucket(t *testing.T) {
func TestTokenBucketGregorian(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.NoError(t, err)

tests := []struct {
Name string
Expand Down Expand Up @@ -296,7 +279,7 @@ func TestTokenBucketGregorian(t *testing.T) {
},
},
})
require.Nil(t, err)
require.NoError(t, err)

rl := resp.Responses[0]

Expand All @@ -314,8 +297,8 @@ func TestTokenBucketNegativeHits(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

addr := cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress
client, errs := guber.DialV1Server(addr, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(addr, nil)
require.NoError(t, err)

tests := []struct {
name string
Expand Down Expand Up @@ -368,7 +351,7 @@ func TestTokenBucketNegativeHits(t *testing.T) {
},
},
})
require.Nil(t, err)
require.NoError(t, err)

rl := resp.Responses[0]

Expand All @@ -384,8 +367,8 @@ func TestTokenBucketNegativeHits(t *testing.T) {

func TestDrainOverLimit(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()
client, errs := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.NoError(t, err)

tests := []struct {
Name string
Expand Down Expand Up @@ -494,8 +477,8 @@ func TestTokenBucketRequestMoreThanAvailable(t *testing.T) {
func TestLeakyBucket(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.NoError(t, err)

tests := []struct {
Name string
Expand Down Expand Up @@ -621,8 +604,8 @@ func TestLeakyBucket(t *testing.T) {
func TestLeakyBucketWithBurst(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.NoError(t, err)

tests := []struct {
Name string
Expand Down Expand Up @@ -728,8 +711,8 @@ func TestLeakyBucketWithBurst(t *testing.T) {
func TestLeakyBucketGregorian(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.NoError(t, err)

tests := []struct {
Name string
Expand Down Expand Up @@ -798,8 +781,8 @@ func TestLeakyBucketGregorian(t *testing.T) {
func TestLeakyBucketNegativeHits(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress, nil)
require.NoError(t, err)

tests := []struct {
Name string
Expand Down Expand Up @@ -911,8 +894,8 @@ func TestLeakyBucketRequestMoreThanAvailable(t *testing.T) {
}

func TestMissingFields(t *testing.T) {
client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.NoError(t, err)

tests := []struct {
Req *guber.RateLimitReq
Expand Down Expand Up @@ -967,7 +950,7 @@ func TestMissingFields(t *testing.T) {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{test.Req},
})
require.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, test.Error, resp.Responses[0].Error, i)
assert.Equal(t, test.Status, resp.Responses[0].Status, i)
}
Expand Down Expand Up @@ -1358,8 +1341,8 @@ func TestGlobalResetRemaining(t *testing.T) {
}

func TestChangeLimit(t *testing.T) {
client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.NoError(t, err)

tests := []struct {
Remaining int64
Expand Down Expand Up @@ -1440,7 +1423,7 @@ func TestChangeLimit(t *testing.T) {
},
},
})
require.Nil(t, err)
require.NoError(t, err)

rl := resp.Responses[0]

Expand All @@ -1453,8 +1436,8 @@ func TestChangeLimit(t *testing.T) {
}

func TestResetRemaining(t *testing.T) {
client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.NoError(t, err)

tests := []struct {
Remaining int64
Expand Down Expand Up @@ -1513,7 +1496,7 @@ func TestResetRemaining(t *testing.T) {
},
},
})
require.Nil(t, err)
require.NoError(t, err)

rl := resp.Responses[0]

Expand All @@ -1525,83 +1508,28 @@ func TestResetRemaining(t *testing.T) {
}

func TestHealthCheck(t *testing.T) {
name := t.Name()
key := guber.RandomString(10)
client, err := guber.DialV1Server(cluster.DaemonAt(0).GRPCListeners[0].Addr().String(), nil)
require.NoError(t, err)

// Check that the cluster is healthy to start with
healthResp, err := client.HealthCheck(context.Background(), &guber.HealthCheckReq{})
require.NoError(t, err)

require.Equal(t, "healthy", healthResp.GetStatus())

// Create a global rate limit that will need to be sent to all peers in the cluster
_, err = client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Behavior: guber.Behavior_BATCHING,
Duration: guber.Second * 3,
Hits: 1,
Limit: 5,
},
},
})
require.Nil(t, err)

// Stop the rest of the cluster to ensure errors occur on our instance
for i := 1; i < cluster.NumOfDaemons(); i++ {
d := cluster.DaemonAt(i)
require.NotNil(t, d)
d.Close()
// Check that the cluster is healthy to start with.
for _, peer := range cluster.GetDaemons() {
healthResp, err := peer.MustClient().HealthCheck(context.Background(), &guber.HealthCheckReq{})
require.NoError(t, err)
assert.Equal(t, "healthy", healthResp.Status)
}

// Hit the global rate limit again this time causing a connection error
_, err = client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Second * 3,
Hits: 1,
Limit: 5,
},
},
})
require.Nil(t, err)
// Stop the cluster to ensure errors occur on our instance.
cluster.Stop()

// Check the health again to get back the connection error.
testutil.UntilPass(t, 20, 300*clock.Millisecond, func(t testutil.TestingT) {
// Check the health again to get back the connection error
healthResp, err = client.HealthCheck(context.Background(), &guber.HealthCheckReq{})
if assert.Nil(t, err) {
return
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, peer := range cluster.GetDaemons() {
_, err := peer.MustClient().HealthCheck(ctx, &guber.HealthCheckReq{})
assert.Error(t, err, "connect: connection refused")
}

assert.Equal(t, "unhealthy", healthResp.GetStatus())
assert.Contains(t, healthResp.GetMessage(), "connect: connection refused")
})

// Restart stopped instances
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*15)
defer cancel()
require.NoError(t, cluster.Restart(ctx))

// wait for every peer instance to come back online
numPeers := int32(len(cluster.GetPeers()))
for _, peer := range cluster.GetPeers() {
peerClient, err := guber.DialV1Server(peer.GRPCAddress, nil)
require.NoError(t, err)
testutil.UntilPass(t, 10, 300*clock.Millisecond, func(t testutil.TestingT) {
healthResp, err = peerClient.HealthCheck(context.Background(), &guber.HealthCheckReq{})
assert.Equal(t, "healthy", healthResp.GetStatus())
assert.Equal(t, numPeers, healthResp.PeerCount)
})
}
// Restart so cluster is ready for next test.
require.NoError(t, startGubernator())
}

func TestLeakyBucketDivBug(t *testing.T) {
Expand Down Expand Up @@ -2389,3 +2317,33 @@ func sendHit(t *testing.T, d *guber.Daemon, req *guber.RateLimitReq, expectStatu
func epochMillis(t time.Time) int64 {
return t.UnixNano() / 1_000_000
}

func startGubernator() error {
err := cluster.StartWith([]guber.PeerInfo{
{GRPCAddress: "127.0.0.1:9990", HTTPAddress: "127.0.0.1:9980", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9991", HTTPAddress: "127.0.0.1:9981", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9992", HTTPAddress: "127.0.0.1:9982", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9993", HTTPAddress: "127.0.0.1:9983", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9994", HTTPAddress: "127.0.0.1:9984", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9995", HTTPAddress: "127.0.0.1:9985", DataCenter: cluster.DataCenterNone},

// DataCenterOne
{GRPCAddress: "127.0.0.1:9890", HTTPAddress: "127.0.0.1:9880", DataCenter: cluster.DataCenterOne},
{GRPCAddress: "127.0.0.1:9891", HTTPAddress: "127.0.0.1:9881", DataCenter: cluster.DataCenterOne},
{GRPCAddress: "127.0.0.1:9892", HTTPAddress: "127.0.0.1:9882", DataCenter: cluster.DataCenterOne},
{GRPCAddress: "127.0.0.1:9893", HTTPAddress: "127.0.0.1:9883", DataCenter: cluster.DataCenterOne},
})
if err != nil {
return errors.Wrap(err, "while starting cluster")
}

// Populate peer clients. Avoids data races when goroutines conflict trying
// to instantiate client singletons.
for _, peer := range cluster.GetDaemons() {
_, err = peer.Client()
if err != nil {
return errors.Wrap(err, "while connecting client")
}
}
return nil
}

0 comments on commit cd7bbab

Please sign in to comment.