Skip to content
This repository has been archived by the owner on Jul 21, 2021. It is now read-only.

added a validation to check whether lock exists in case of network pa… #230

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 67 additions & 9 deletions zk/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ var (

// Lock is a mutual exclusion lock.
type Lock struct {
c *Conn
path string
acl []ACL
lockPath string
seq int
c *Conn
path string
acl []ACL
lockPath string
seq int
attemptedLockPath string
}

// NewLock creates a new lock instance using the provided connection, path, and acl.
Expand All @@ -47,13 +48,29 @@ func (l *Lock) Lock() error {
return ErrDeadlock
}

if l.attemptedLockPath != "" {
// Check whether lock has been acquired previously and it still exists
if lockExists(l.c, l.path, l.attemptedLockPath) {
l.lockPath = l.attemptedLockPath
return nil
}
}

prefix := fmt.Sprintf("%s/lock-", l.path)

path := ""
var err error
tryLock:
for i := 0; i < 3; i++ {
path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl)
if err == ErrNoNode {

if path != "" {
// Store the path of newly created sequential ephemeral znode
l.attemptedLockPath = path
}

switch err {
case ErrNoNode:
// Create parent node.
parts := strings.Split(l.path, "/")
pth := ""
Expand All @@ -72,9 +89,9 @@ func (l *Lock) Lock() error {
return err
}
}
} else if err == nil {
break
} else {
case nil:
continuum-Nikhil-Bhide marked this conversation as resolved.
Show resolved Hide resolved
break tryLock
default:
return err
}
}
Expand Down Expand Up @@ -144,7 +161,48 @@ func (l *Lock) Unlock() error {
if err := l.c.Delete(l.lockPath, -1); err != nil {
return err
}
// Perform clean up
l.lockPath = ""
l.seq = 0
l.attemptedLockPath = ""

return nil
}

//Check whether lock got created and response was lost because of network partition failure.
//It queries zookeeper and scans existing sequential ephemeral znodes under the parent path
//It finds out that previously requested sequence number corresponds to child having lowest sequence number
func lockExists(c *Conn, rootPath string, znodePath string) bool {
seq, err := parseSeq(znodePath)
if err != nil {
return false
}

//Scan the existing znodes if there are any
children, _, err := c.Children(rootPath)
if err != nil {
return false
}

lowestSeq := seq
prevSeq := -1
for _, p := range children {
s, err := parseSeq(p)
if err != nil {
return false
}
if s < lowestSeq {
lowestSeq = s
}
if s < seq && s > prevSeq {
prevSeq = s
}
}

if seq == lowestSeq {
// Acquired the lock
return true
}

return false
}
124 changes: 124 additions & 0 deletions zk/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,127 @@ func TestMultiLevelLock(t *testing.T) {
t.Fatal(err)
}
}

// This tests whether locking contention does not result into deadlock in case of network partition failure
// this is simulated by cleaning lock_path attribute from lock object
func TestLockExists(t *testing.T) {
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()

zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

acls := WorldACL(PermAll)

l := NewLock(zk, "/test", acls)
if err := l.Lock(); err != nil {
t.Fatal(err)
}

//clean off lockPath
l.lockPath = ""

// try to acquire lock
if err := l.Lock(); err != nil {
t.Fatal(err)
}

//try to release the lock
if err := l.Unlock(); err != nil {
t.Fatal(err)
}
}

// This tests whether lock path is populated correctly when lock is acquired
func TestLockPathExists(t *testing.T) {
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()

zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

acls := WorldACL(PermAll)

l := NewLock(zk, "/test", acls)
if err := l.Lock(); err != nil {
t.Fatal(err)
}

// Create test data set
type args struct {
c *Conn
rootPath string
znodePath string
}
tests := []struct {
name string
args args
want bool
}{
{"lock_path_populated_success", args{
zk,
"/test",
l.attemptedLockPath,
},
true,
},
{"lock_path_populated_failure", args{
zk,
"/test_no_lock",
l.attemptedLockPath,
},
false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := lockExists(tt.args.c, tt.args.rootPath, tt.args.znodePath); got != tt.want {
t.Errorf("TestLockPathExists() = %v, want %v", got, tt.want)
}
})
}

if err := l.Unlock(); err != nil {
t.Fatal(err)
}
}

// This tests whether lock path is cleaned off correctly when lock is acquired
func TestLockPathCleaning(t *testing.T) {
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()

zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

acls := WorldACL(PermAll)

l := NewLock(zk, "/test", acls)
if err := l.Lock(); err != nil {
t.Fatal(err)
}

if err := l.Unlock(); err != nil {
t.Fatal(err)
}

}