Skip to content

Commit

Permalink
Merge pull request #155 from lytics/waiting-for-previous-leases-to-ex…
Browse files Browse the repository at this point in the history
…pire

adding a wait for previous leases to expire if they don't exist
  • Loading branch information
epsniff authored Mar 5, 2021
2 parents 0cb48fc + 6cda925 commit adba082
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.12.x, 1.13.x]
go-version: [1.x.x]
platform: [ubuntu-latest]
runs-on: ${{ matrix.platform }}
steps:
Expand Down
60 changes: 60 additions & 0 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/lytics/retry"
etcdv3 "go.etcd.io/etcd/clientv3"
)

Expand All @@ -34,6 +35,7 @@ var (
ErrWatchClosedUnexpectedly = errors.New("registry: watch closed unexpectedly")
ErrUnspecifiedNetAddressIP = errors.New("registry: unspecified net address ip")
ErrKeepAliveClosedUnexpectedly = errors.New("registry: keep alive closed unexpectedly")
ErrFailedAcquireAddressLock = errors.New("registry: address lock exists after timeout")
)

var (
Expand Down Expand Up @@ -122,6 +124,57 @@ func New(client *etcdv3.Client) (*Registry, error) {
}, nil
}

func registryLockKey(address string) string {
return fmt.Sprintf("%v.%v", "registry.uniq-lock", address)
}

func (rr *Registry) waitForAddress(ctx context.Context, address string) error {
key := registryLockKey(address)

const infiniteRetries = 10000
var locked bool = true
var rErr error
retry.X(infiniteRetries, time.Second, func() bool {
select {
case <-ctx.Done():
// we timed out waiting for it to expire
return false
default:
}

res, err := rr.kv.Get(ctx, key, etcdv3.WithLimit(1))
if err != nil {
// don't retry on an error to etcd, just expect the process/pod to restart
rErr = err
return false
}
if res.Count != 0 {
// the lock is being held by someone
return true
}
// the lock is cleared
locked = false
return false
})

if rErr != nil && rErr != context.Canceled {
return fmt.Errorf("registry: failed get address lock: error: %w", rErr)
}

if locked {
return ErrFailedAcquireAddressLock
}

tctx, cancel := context.WithTimeout(ctx, rr.Timeout)
_, err := rr.kv.Put(tctx, key, "", etcdv3.WithLease(rr.leaseID))
cancel()
if err != nil {
return fmt.Errorf("registry: failed to write address lock: error: %w", err)
}

return nil
}

// Start Registry.
func (rr *Registry) Start(addr net.Addr) error {
rr.mu.Lock()
Expand All @@ -147,6 +200,13 @@ func (rr *Registry) Start(addr net.Addr) error {
}
rr.leaseID = res.ID

// Ensure that we're the owner of the address by taking an etcd lock
tctx, cancel := context.WithTimeout(context.TODO(), rr.LeaseDuration*2) // retry until Lease is up...
err = rr.waitForAddress(tctx, address)
if err != nil {
return err
}

// Start the keep alive for the lease.
keepAliveCtx, keepAliveCancel := context.WithCancel(context.Background())
keepAlive, err := rr.lease.KeepAlive(keepAliveCtx, rr.leaseID)
Expand Down
138 changes: 138 additions & 0 deletions registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,144 @@ func TestStartStop(t *testing.T) {
}
}

func TestStartStopWaitForLeaseToExpireBetween(t *testing.T) {
client, r, addr := bootstrap(t, start)
defer client.Close()

// this should remove the lease which should clean up the registry lock on the address
// which allows the next call to Start to begin without waiting.
r.Stop()
select {
case <-r.done:
default:
t.Fatal("registry failed to stop")
}

st := time.Now()
r, err := New(client)
if err != nil {
t.Fatal(err)
}
r.LeaseDuration = 10 * time.Second
err = r.Start(addr)
if err != nil {
t.Fatal(err)
}
rt := time.Since(st)
if rt > (time.Second) {
t.Fatalf("runtime was too long, maybe because of waiting for leases to expire? ")
}

r.Stop()
select {
case <-r.done:
default:
t.Fatal("registry failed to stop")
}
}

func TestWaitForLeaseThatNeverExpires(t *testing.T) {
t.Parallel()

start := false
client, _, addr := bootstrap(t, start)
defer client.Close()

kv := etcdv3.NewKV(client)

// synthetically create the lock
address, err := formatAddress(addr)
if err != nil {
t.Fatal(err)
}
_, err = kv.Put(context.Background(), registryLockKey(address), "")
if err != nil {
t.Fatal(err)
}
defer func() {
// cleanup for test
_, err = kv.Delete(context.Background(), registryLockKey(address))
if err != nil {
t.Fatal(err)
}
}()

r1, err := New(client)
if err != nil {
t.Fatal(err)
}
r1.LeaseDuration = 10 * time.Second

st := time.Now()
err = r1.Start(addr)
if err == nil {
t.Fatalf("expected an error but got none")
}
if err != ErrFailedAcquireAddressLock {
t.Fatalf("expected an error `lease exists after timeout`, got %v", err)
}

// ensure that we waited 10 seconds...
rt := time.Since(st)
if rt < (10 * time.Second) {
t.Fatalf("runtime was too short, it should have tried for at least 2*LeaseDuration? ")
}
}

func TestWaitForLeaseThatDoesExpires(t *testing.T) {
t.Parallel()

start := false
client, _, addr := bootstrap(t, start)
defer client.Close()

kv := etcdv3.NewKV(client)

// synthetically create the lock
address, err := formatAddress(addr)
if err != nil {
t.Fatal(err)
}
r1, err := New(client)
if err != nil {
t.Fatal(err)
}
r1.LeaseDuration = 10 * time.Second

_, err = kv.Put(context.Background(), registryLockKey(address), "")
if err != nil {
t.Fatal(err)
}
time.AfterFunc(5*time.Second, func() {
// cleanup lock so that the registry can startup.
_, err = kv.Delete(context.Background(), registryLockKey(address))
if err != nil {
t.Fatal(err)
}
})

st := time.Now()
err = r1.Start(addr)
if err != nil {
t.Fatalf("unexpected error: err: %v", err)
}
// ensure that we waited 10 seconds...
rt := time.Since(st)
if rt < (4 * time.Second) {
t.Fatalf("runtime was too short, we didn't free the lock until 5 seconds")
}
if rt > (7 * time.Second) {
t.Fatalf("runtime was too long, we freed the lock after 5 seconds")
}

r1.Stop()
select {
case <-r1.done:
default:
t.Fatal("registry failed to stop")
}
}

func TestRegister(t *testing.T) {
client, r, _ := bootstrap(t, start)
defer client.Close()
Expand Down
2 changes: 1 addition & 1 deletion testetcd/etcdserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewEmbedded() *Embedded {

select {
case <-e.Server.ReadyNotify():
fmt.Printf("Started embedded etcd on url: %v", clientURL)
fmt.Printf("Started embedded etcd on url: %v\n", clientURL)

case <-time.After(10 * time.Second):
e.Server.Stop() // trigger a shutdown
Expand Down

0 comments on commit adba082

Please sign in to comment.