Skip to content

Commit

Permalink
adding unit tests for the address changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
epsniff committed Mar 5, 2021
1 parent ee927d4 commit 6cda925
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 20 deletions.
59 changes: 40 additions & 19 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/lytics/retry"
etcdv3 "go.etcd.io/etcd/clientv3"
)

Expand All @@ -34,6 +35,7 @@ var (
ErrWatchClosedUnexpectedly = errors.New("registry: watch closed unexpectedly")
ErrUnspecifiedNetAddressIP = errors.New("registry: unspecified net address ip")
ErrKeepAliveClosedUnexpectedly = errors.New("registry: keep alive closed unexpectedly")
ErrFailedAcquireAddressLock = errors.New("registry: address lock exists after timeout")
)

var (
Expand Down Expand Up @@ -122,32 +124,52 @@ func New(client *etcdv3.Client) (*Registry, error) {
}, nil
}

func registryLockKey(address string) string {
return fmt.Sprintf("%v.%v", "registry.uniq-lock", address)
}

func (rr *Registry) waitForAddress(ctx context.Context, address string) error {
key := fmt.Sprintf("%v.%v", "registry.uniq-lock", address)
key := registryLockKey(address)

res, err := rr.kv.Get(ctx, key, etcdv3.WithLimit(1))
if err != nil {
return fmt.Errorf("waitForAddress failed to get address lock: error: %w", err)
}
if res.Count != 0 {
const infiniteRetries = 10000
var locked bool = true
var rErr error
retry.X(infiniteRetries, time.Second, func() bool {
select {
case <-time.After(rr.LeaseDuration):
case <-ctx.Done():
return nil
// we timed out waiting for it to expire
return false
default:
}
}

res, err = rr.kv.Get(ctx, key, etcdv3.WithLimit(1))
if err != nil {
return fmt.Errorf("waitForAddress failed to get address lock: error: %w", err)
res, err := rr.kv.Get(ctx, key, etcdv3.WithLimit(1))
if err != nil {
// don't retry on an error to etcd, just expect the process/pod to restart
rErr = err
return false
}
if res.Count != 0 {
// the lock is being held by someone
return true
}
// the lock is cleared
locked = false
return false
})

if rErr != nil && rErr != context.Canceled {
return fmt.Errorf("registry: failed get address lock: error: %w", rErr)
}
if res.Count != 0 {
return fmt.Errorf("waitForAddress lease exists after timeout")

if locked {
return ErrFailedAcquireAddressLock
}

_, err = rr.kv.Put(ctx, key, "", etcdv3.WithLease(rr.leaseID))
tctx, cancel := context.WithTimeout(ctx, rr.Timeout)
_, err := rr.kv.Put(tctx, key, "", etcdv3.WithLease(rr.leaseID))
cancel()
if err != nil {
return fmt.Errorf("waitForAddress failed to write address lock: error: %w", err)
return fmt.Errorf("registry: failed to write address lock: error: %w", err)
}

return nil
Expand Down Expand Up @@ -179,9 +201,8 @@ func (rr *Registry) Start(addr net.Addr) error {
rr.leaseID = res.ID

// Ensure that we're the owner of the address by taking an etcd lock
timeout, cancel = context.WithTimeout(context.Background(), rr.Timeout)
err = rr.waitForAddress(timeout, address)
cancel()
tctx, cancel := context.WithTimeout(context.TODO(), rr.LeaseDuration*2) // retry until Lease is up...
err = rr.waitForAddress(tctx, address)
if err != nil {
return err
}
Expand Down
138 changes: 138 additions & 0 deletions registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,144 @@ func TestStartStop(t *testing.T) {
}
}

func TestStartStopWaitForLeaseToExpireBetween(t *testing.T) {
client, r, addr := bootstrap(t, start)
defer client.Close()

// this should remove the lease which should clean up the registry lock on the address
// which allows the next call to Start to begin without waiting.
r.Stop()
select {
case <-r.done:
default:
t.Fatal("registry failed to stop")
}

st := time.Now()
r, err := New(client)
if err != nil {
t.Fatal(err)
}
r.LeaseDuration = 10 * time.Second
err = r.Start(addr)
if err != nil {
t.Fatal(err)
}
rt := time.Since(st)
if rt > (time.Second) {
t.Fatalf("runtime was too long, maybe because of waiting for leases to expire? ")
}

r.Stop()
select {
case <-r.done:
default:
t.Fatal("registry failed to stop")
}
}

func TestWaitForLeaseThatNeverExpires(t *testing.T) {
t.Parallel()

start := false
client, _, addr := bootstrap(t, start)
defer client.Close()

kv := etcdv3.NewKV(client)

// synthetically create the lock
address, err := formatAddress(addr)
if err != nil {
t.Fatal(err)
}
_, err = kv.Put(context.Background(), registryLockKey(address), "")
if err != nil {
t.Fatal(err)
}
defer func() {
// cleanup for test
_, err = kv.Delete(context.Background(), registryLockKey(address))
if err != nil {
t.Fatal(err)
}
}()

r1, err := New(client)
if err != nil {
t.Fatal(err)
}
r1.LeaseDuration = 10 * time.Second

st := time.Now()
err = r1.Start(addr)
if err == nil {
t.Fatalf("expected an error but got none")
}
if err != ErrFailedAcquireAddressLock {
t.Fatalf("expected an error `lease exists after timeout`, got %v", err)
}

// ensure that we waited 10 seconds...
rt := time.Since(st)
if rt < (10 * time.Second) {
t.Fatalf("runtime was too short, it should have tried for at least 2*LeaseDuration? ")
}
}

func TestWaitForLeaseThatDoesExpires(t *testing.T) {
t.Parallel()

start := false
client, _, addr := bootstrap(t, start)
defer client.Close()

kv := etcdv3.NewKV(client)

// synthetically create the lock
address, err := formatAddress(addr)
if err != nil {
t.Fatal(err)
}
r1, err := New(client)
if err != nil {
t.Fatal(err)
}
r1.LeaseDuration = 10 * time.Second

_, err = kv.Put(context.Background(), registryLockKey(address), "")
if err != nil {
t.Fatal(err)
}
time.AfterFunc(5*time.Second, func() {
// cleanup lock so that the registry can startup.
_, err = kv.Delete(context.Background(), registryLockKey(address))
if err != nil {
t.Fatal(err)
}
})

st := time.Now()
err = r1.Start(addr)
if err != nil {
t.Fatalf("unexpected error: err: %v", err)
}
// ensure that we waited 10 seconds...
rt := time.Since(st)
if rt < (4 * time.Second) {
t.Fatalf("runtime was too short, we didn't free the lock until 5 seconds")
}
if rt > (7 * time.Second) {
t.Fatalf("runtime was too long, we freed the lock after 5 seconds")
}

r1.Stop()
select {
case <-r1.done:
default:
t.Fatal("registry failed to stop")
}
}

func TestRegister(t *testing.T) {
client, r, _ := bootstrap(t, start)
defer client.Close()
Expand Down
2 changes: 1 addition & 1 deletion testetcd/etcdserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewEmbedded() *Embedded {

select {
case <-e.Server.ReadyNotify():
fmt.Printf("Started embedded etcd on url: %v", clientURL)
fmt.Printf("Started embedded etcd on url: %v\n", clientURL)

case <-time.After(10 * time.Second):
e.Server.Stop() // trigger a shutdown
Expand Down

0 comments on commit 6cda925

Please sign in to comment.