From f42c1facc2730bab51dfeb80a5b2775c7cad48b4 Mon Sep 17 00:00:00 2001 From: nikhilbhide Date: Sat, 8 Feb 2020 11:25:37 +0530 Subject: [PATCH 01/12] added a validation to check whether lock exists in case of network partition failure --- zk/lock.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/zk/lock.go b/zk/lock.go index 3c35a427..0d75f27c 100644 --- a/zk/lock.go +++ b/zk/lock.go @@ -12,6 +12,7 @@ var ( ErrDeadlock = errors.New("zk: trying to acquire a lock twice") // ErrNotLocked is returned by Unlock when trying to release a lock that has not first be acquired. ErrNotLocked = errors.New("zk: not locked") + ) // Lock is a mutual exclusion lock. @@ -23,6 +24,8 @@ type Lock struct { seq int } +var mapEphermeralSequenceLockPath = make(map[string]string) + // NewLock creates a new lock instance using the provided connection, path, and acl. // The path must be a node that is only used by this lock. A lock instances starts // unlocked until Lock() is called. @@ -43,6 +46,12 @@ func parseSeq(path string) (int, error) { // is acquired or an error occurs. If this instance already has the lock // then ErrDeadlock is returned. func (l *Lock) Lock() error { + if(mapEphermeralSequenceLockPath[l.path]!="") { + if(lockExists(l.c,l.path,mapEphermeralSequenceLockPath[l.path])) { + return nil + } + } + if l.lockPath != "" { return ErrDeadlock } @@ -53,6 +62,7 @@ func (l *Lock) Lock() error { var err error for i := 0; i < 3; i++ { path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl) + mapEphermeralSequenceLockPath[l.path] = path if err == ErrNoNode { // Create parent node. parts := strings.Split(l.path, "/") @@ -146,5 +156,46 @@ func (l *Lock) Unlock() error { } l.lockPath = "" l.seq = 0 + delete(mapEphermeralSequenceLockPath,l.path) return nil } + +//Checks whether lock got created and response was lost because of network partition failure. +//It queries zookeeper and scans existing sequential ephemeral znodes under the parent path +//It finds out that previously requested sequence number corresponds to child having lowest sequence number +func lockExists(c *Conn, rootPath string, znodePath string) bool { + seq, err := parseSeq(znodePath) + if err != nil { + return false + } + + //scans the existing znodes if there are any + for { + children, _, err := c.Children(rootPath) + if err != nil { + return false + } + + lowestSeq := seq + prevSeq := -1 + for _, p := range children { + s, err := parseSeq(p) + if err != nil { + return false + } + if s < lowestSeq { + lowestSeq = s + } + if s < seq && s > prevSeq { + prevSeq = s + } + } + + if seq == lowestSeq { + // Acquired the lock + return true + } else { + return false + } + } +} \ No newline at end of file From 3b972648058f0505e859702460ce3e0986378869 Mon Sep 17 00:00:00 2001 From: nikhilbhide Date: Sat, 8 Feb 2020 12:32:57 +0530 Subject: [PATCH 02/12] Fix issues found by deepsource.io (#229) --- zk/lock.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/zk/lock.go b/zk/lock.go index 0d75f27c..e4c0b414 100644 --- a/zk/lock.go +++ b/zk/lock.go @@ -63,7 +63,8 @@ func (l *Lock) Lock() error { for i := 0; i < 3; i++ { path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl) mapEphermeralSequenceLockPath[l.path] = path - if err == ErrNoNode { + switch err { + case ErrNoNode: // Create parent node. parts := strings.Split(l.path, "/") pth := "" @@ -82,9 +83,9 @@ func (l *Lock) Lock() error { return err } } - } else if err == nil { + case nil: break - } else { + default: return err } } From 34593e327dde1987bbb03a686d3d62bb8dbe8611 Mon Sep 17 00:00:00 2001 From: nikhilbhide Date: Sun, 9 Feb 2020 15:31:25 +0530 Subject: [PATCH 03/12] fixed issues pertaining to code structure and formatting --- zk/lock.go | 65 +++++++++++++++++++++++++++++------------------------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/zk/lock.go b/zk/lock.go index e4c0b414..d834e5fa 100644 --- a/zk/lock.go +++ b/zk/lock.go @@ -24,6 +24,9 @@ type Lock struct { seq int } +// Initializing a map using the built-in make() function +// This map stores the lock_path of last successfully requested sequential ephemeral znode queued +// In case of any conflict, the sequence number is used to check whether lock has been acquired var mapEphermeralSequenceLockPath = make(map[string]string) // NewLock creates a new lock instance using the provided connection, path, and acl. @@ -46,22 +49,24 @@ func parseSeq(path string) (int, error) { // is acquired or an error occurs. If this instance already has the lock // then ErrDeadlock is returned. func (l *Lock) Lock() error { - if(mapEphermeralSequenceLockPath[l.path]!="") { - if(lockExists(l.c,l.path,mapEphermeralSequenceLockPath[l.path])) { - return nil - } - } - if l.lockPath != "" { return ErrDeadlock } + if seqZnodePath, ok := mapEphermeralSequenceLockPath[l.path]; ok && seqZnodePath != "" { + // Check whether lock has been acquired previously and it still exists + if(lockExists(l.c,l.path,seqZnodePath)) { + return nil + } + } + prefix := fmt.Sprintf("%s/lock-", l.path) path := "" var err error - for i := 0; i < 3; i++ { + tryLock: for i := 0; i < 3; i++ { path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl) + // Store the path of newly created sequential ephemeral znode against the parent znode path mapEphermeralSequenceLockPath[l.path] = path switch err { case ErrNoNode: @@ -84,7 +89,7 @@ func (l *Lock) Lock() error { } } case nil: - break + break tryLock default: return err } @@ -157,6 +162,8 @@ func (l *Lock) Unlock() error { } l.lockPath = "" l.seq = 0 + // Remove the entry of path of newly created sequential ephemeral znode + // this was stored against the parent znode path delete(mapEphermeralSequenceLockPath,l.path) return nil } @@ -171,32 +178,30 @@ func lockExists(c *Conn, rootPath string, znodePath string) bool { } //scans the existing znodes if there are any - for { - children, _, err := c.Children(rootPath) + children, _, err := c.Children(rootPath) + if err != nil { + return false + } + + lowestSeq := seq + prevSeq := -1 + for _, p := range children { + s, err := parseSeq(p) if err != nil { return false } - - lowestSeq := seq - prevSeq := -1 - for _, p := range children { - s, err := parseSeq(p) - if err != nil { - return false - } - if s < lowestSeq { - lowestSeq = s - } - if s < seq && s > prevSeq { - prevSeq = s - } + if s < lowestSeq { + lowestSeq = s } - - if seq == lowestSeq { - // Acquired the lock - return true - } else { - return false + if s < seq && s > prevSeq { + prevSeq = s } } + + if seq == lowestSeq { + // Acquired the lock + return true + } else { + return false + } } \ No newline at end of file From 7f719093660e0f6402696de88b52b328015c6e61 Mon Sep 17 00:00:00 2001 From: nikhilbhide Date: Sun, 9 Feb 2020 15:32:16 +0530 Subject: [PATCH 04/12] fixed issues pertaining to code structure and formatting --- zk/lock.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zk/lock.go b/zk/lock.go index d834e5fa..66bf6d80 100644 --- a/zk/lock.go +++ b/zk/lock.go @@ -168,7 +168,7 @@ func (l *Lock) Unlock() error { return nil } -//Checks whether lock got created and response was lost because of network partition failure. +//Check whether lock got created and response was lost because of network partition failure. //It queries zookeeper and scans existing sequential ephemeral znodes under the parent path //It finds out that previously requested sequence number corresponds to child having lowest sequence number func lockExists(c *Conn, rootPath string, znodePath string) bool { @@ -177,7 +177,7 @@ func lockExists(c *Conn, rootPath string, znodePath string) bool { return false } - //scans the existing znodes if there are any + //Scan the existing znodes if there are any children, _, err := c.Children(rootPath) if err != nil { return false From 33260f00422c0713f27bd7d1d034b735d2909311 Mon Sep 17 00:00:00 2001 From: nikhilbhide Date: Sun, 9 Feb 2020 15:42:09 +0530 Subject: [PATCH 05/12] renamed map to hold lock paths by path and formatted code --- zk/lock.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/zk/lock.go b/zk/lock.go index 66bf6d80..b6a69b5d 100644 --- a/zk/lock.go +++ b/zk/lock.go @@ -12,7 +12,6 @@ var ( ErrDeadlock = errors.New("zk: trying to acquire a lock twice") // ErrNotLocked is returned by Unlock when trying to release a lock that has not first be acquired. ErrNotLocked = errors.New("zk: not locked") - ) // Lock is a mutual exclusion lock. @@ -27,7 +26,7 @@ type Lock struct { // Initializing a map using the built-in make() function // This map stores the lock_path of last successfully requested sequential ephemeral znode queued // In case of any conflict, the sequence number is used to check whether lock has been acquired -var mapEphermeralSequenceLockPath = make(map[string]string) +var lockPathsByPath = make(map[string]string) // NewLock creates a new lock instance using the provided connection, path, and acl. // The path must be a node that is only used by this lock. A lock instances starts @@ -53,9 +52,9 @@ func (l *Lock) Lock() error { return ErrDeadlock } - if seqZnodePath, ok := mapEphermeralSequenceLockPath[l.path]; ok && seqZnodePath != "" { + if seqZnodePath, ok := lockPathsByPath[l.path]; ok && seqZnodePath != "" { // Check whether lock has been acquired previously and it still exists - if(lockExists(l.c,l.path,seqZnodePath)) { + if lockExists(l.c, l.path, seqZnodePath) { return nil } } @@ -64,10 +63,11 @@ func (l *Lock) Lock() error { path := "" var err error - tryLock: for i := 0; i < 3; i++ { +tryLock: + for i := 0; i < 3; i++ { path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl) // Store the path of newly created sequential ephemeral znode against the parent znode path - mapEphermeralSequenceLockPath[l.path] = path + lockPathsByPath[l.path] = path switch err { case ErrNoNode: // Create parent node. @@ -164,7 +164,7 @@ func (l *Lock) Unlock() error { l.seq = 0 // Remove the entry of path of newly created sequential ephemeral znode // this was stored against the parent znode path - delete(mapEphermeralSequenceLockPath,l.path) + delete(lockPathsByPath, l.path) return nil } @@ -204,4 +204,4 @@ func lockExists(c *Conn, rootPath string, znodePath string) bool { } else { return false } -} \ No newline at end of file +} From bdaf4887ebae7623bbfcf9284b08bd0a963e6e2b Mon Sep 17 00:00:00 2001 From: nikhilbhide Date: Sun, 9 Feb 2020 15:50:14 +0530 Subject: [PATCH 06/12] Removed redundant else loop --- zk/lock.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zk/lock.go b/zk/lock.go index b6a69b5d..baae83f6 100644 --- a/zk/lock.go +++ b/zk/lock.go @@ -201,7 +201,7 @@ func lockExists(c *Conn, rootPath string, znodePath string) bool { if seq == lowestSeq { // Acquired the lock return true - } else { - return false } + + return false } From a6c6e74a257496f3ab98da324d577ab02a33445b Mon Sep 17 00:00:00 2001 From: nikhilbhide Date: Sun, 9 Feb 2020 16:07:57 +0530 Subject: [PATCH 07/12] Fixed race condition in accessing map of lockPathsByPath --- zk/lock.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/zk/lock.go b/zk/lock.go index baae83f6..55bc2944 100644 --- a/zk/lock.go +++ b/zk/lock.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "sync" ) var ( @@ -26,7 +27,10 @@ type Lock struct { // Initializing a map using the built-in make() function // This map stores the lock_path of last successfully requested sequential ephemeral znode queued // In case of any conflict, the sequence number is used to check whether lock has been acquired -var lockPathsByPath = make(map[string]string) +var ( + lockPathsByPath = make(map[string]string) + lockPathsByPathLock sync.Mutex +) // NewLock creates a new lock instance using the provided connection, path, and acl. // The path must be a node that is only used by this lock. A lock instances starts @@ -66,8 +70,12 @@ func (l *Lock) Lock() error { tryLock: for i := 0; i < 3; i++ { path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl) + // Store the path of newly created sequential ephemeral znode against the parent znode path + lockPathsByPathLock.Lock() lockPathsByPath[l.path] = path + lockPathsByPathLock.Unlock() + switch err { case ErrNoNode: // Create parent node. @@ -164,7 +172,9 @@ func (l *Lock) Unlock() error { l.seq = 0 // Remove the entry of path of newly created sequential ephemeral znode // this was stored against the parent znode path + lockPathsByPathLock.Lock() delete(lockPathsByPath, l.path) + lockPathsByPathLock.Unlock() return nil } From 3853affe94928ca20d927a954d55cf6fa638cd36 Mon Sep 17 00:00:00 2001 From: nikhil Date: Mon, 10 Feb 2020 13:17:34 +0530 Subject: [PATCH 08/12] Replaced the map variable lockPathsByPath with string variable as there is no possibility to use same lock object to obtain lock for different paths --- zk/lock.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/zk/lock.go b/zk/lock.go index 55bc2944..c80bc168 100644 --- a/zk/lock.go +++ b/zk/lock.go @@ -22,6 +22,7 @@ type Lock struct { acl []ACL lockPath string seq int + attemptedLockPath string } // Initializing a map using the built-in make() function @@ -56,9 +57,9 @@ func (l *Lock) Lock() error { return ErrDeadlock } - if seqZnodePath, ok := lockPathsByPath[l.path]; ok && seqZnodePath != "" { + if l.attemptedLockPath!="" { // Check whether lock has been acquired previously and it still exists - if lockExists(l.c, l.path, seqZnodePath) { + if lockExists(l.c, l.path, l.attemptedLockPath) { return nil } } @@ -71,10 +72,10 @@ tryLock: for i := 0; i < 3; i++ { path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl) - // Store the path of newly created sequential ephemeral znode against the parent znode path - lockPathsByPathLock.Lock() - lockPathsByPath[l.path] = path - lockPathsByPathLock.Unlock() + if(path!="") { + // Store the path of newly created sequential ephemeral znode + l.attemptedLockPath = path + } switch err { case ErrNoNode: @@ -168,13 +169,11 @@ func (l *Lock) Unlock() error { if err := l.c.Delete(l.lockPath, -1); err != nil { return err } + // Perform clean up l.lockPath = "" l.seq = 0 - // Remove the entry of path of newly created sequential ephemeral znode - // this was stored against the parent znode path - lockPathsByPathLock.Lock() - delete(lockPathsByPath, l.path) - lockPathsByPathLock.Unlock() + l.attemptedLockPath = "" + return nil } From ac9f39b4bfa5884603cbfa922b33e5d9c38baa30 Mon Sep 17 00:00:00 2001 From: nikhil Date: Tue, 11 Feb 2020 11:05:11 +0530 Subject: [PATCH 09/12] removed redundant variables --- zk/lock.go | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/zk/lock.go b/zk/lock.go index c80bc168..05a1b1d7 100644 --- a/zk/lock.go +++ b/zk/lock.go @@ -5,7 +5,6 @@ import ( "fmt" "strconv" "strings" - "sync" ) var ( @@ -17,22 +16,14 @@ var ( // Lock is a mutual exclusion lock. type Lock struct { - c *Conn - path string - acl []ACL - lockPath string - seq int + c *Conn + path string + acl []ACL + lockPath string + seq int attemptedLockPath string } -// Initializing a map using the built-in make() function -// This map stores the lock_path of last successfully requested sequential ephemeral znode queued -// In case of any conflict, the sequence number is used to check whether lock has been acquired -var ( - lockPathsByPath = make(map[string]string) - lockPathsByPathLock sync.Mutex -) - // NewLock creates a new lock instance using the provided connection, path, and acl. // The path must be a node that is only used by this lock. A lock instances starts // unlocked until Lock() is called. @@ -57,7 +48,7 @@ func (l *Lock) Lock() error { return ErrDeadlock } - if l.attemptedLockPath!="" { + if l.attemptedLockPath != "" { // Check whether lock has been acquired previously and it still exists if lockExists(l.c, l.path, l.attemptedLockPath) { return nil @@ -72,7 +63,7 @@ tryLock: for i := 0; i < 3; i++ { path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl) - if(path!="") { + if path != "" { // Store the path of newly created sequential ephemeral znode l.attemptedLockPath = path } @@ -172,7 +163,7 @@ func (l *Lock) Unlock() error { // Perform clean up l.lockPath = "" l.seq = 0 - l.attemptedLockPath = "" + l.attemptedLockPath = "" return nil } From 817c76557e393afd0fa2f2d929557099232deb7c Mon Sep 17 00:00:00 2001 From: nikhil Date: Tue, 11 Feb 2020 11:08:28 +0530 Subject: [PATCH 10/12] populated lockPath in case of lock is identified by looking upon attemptedLockPath --- zk/lock.go | 1 + 1 file changed, 1 insertion(+) diff --git a/zk/lock.go b/zk/lock.go index 05a1b1d7..14f1867b 100644 --- a/zk/lock.go +++ b/zk/lock.go @@ -51,6 +51,7 @@ func (l *Lock) Lock() error { if l.attemptedLockPath != "" { // Check whether lock has been acquired previously and it still exists if lockExists(l.c, l.path, l.attemptedLockPath) { + l.lockPath = l.attemptedLockPath return nil } } From 3307c27303da7dbb7a8885083cef4e6da53bb13e Mon Sep 17 00:00:00 2001 From: nikhil Date: Tue, 11 Feb 2020 11:14:20 +0530 Subject: [PATCH 11/12] added unit tests for lockExists --- zk/lock_test.go | 124 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/zk/lock_test.go b/zk/lock_test.go index 77dce9b7..5853227f 100644 --- a/zk/lock_test.go +++ b/zk/lock_test.go @@ -93,3 +93,127 @@ func TestMultiLevelLock(t *testing.T) { t.Fatal(err) } } + +// This tests whether locking contention does not result into deadlock in case of network partition failure +// this is simulated by cleaning lock_path attribute from lock object +func TestLockExists(t *testing.T) { + ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + acls := WorldACL(PermAll) + + l := NewLock(zk, "/test", acls) + if err := l.Lock(); err != nil { + t.Fatal(err) + } + + //clean off lockPath + l.lockPath = "" + + // try to acquire lock + if err := l.Lock(); err != nil { + t.Fatal(err) + } + + //try to release the lock + if err := l.Unlock(); err != nil { + t.Fatal(err) + } +} + +// This tests whether lock path is populated correctly when lock is acquired +func TestLockPathExists(t *testing.T) { + ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + acls := WorldACL(PermAll) + + l := NewLock(zk, "/test", acls) + if err := l.Lock(); err != nil { + t.Fatal(err) + } + + // Create test data set + type args struct { + c *Conn + rootPath string + znodePath string + } + tests := []struct { + name string + args args + want bool + }{ + {"lock_path_populated_success", args{ + zk, + "test", + l.attemptedLockPath, + }, + true, + }, + {"lock_path_populated_failure", args{ + zk, + "test_no_lock", + l.attemptedLockPath, + }, + false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := lockExists(tt.args.c, tt.args.rootPath, tt.args.znodePath); got != tt.want { + t.Errorf("TestLockPathExists() = %v, want %v", got, tt.want) + } + }) + } + + if err := l.Unlock(); err != nil { + t.Fatal(err) + } +} + +// This tests whether lock path is cleaned off correctly when lock is acquired +func TestLockPathCleaning(t *testing.T) { + ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + acls := WorldACL(PermAll) + + l := NewLock(zk, "/test", acls) + if err := l.Lock(); err != nil { + t.Fatal(err) + } + + if err := l.Unlock(); err != nil { + t.Fatal(err) + } + +} From 628889403ed863193ce86236f1ddccf5738c3e15 Mon Sep 17 00:00:00 2001 From: nikhil Date: Tue, 11 Feb 2020 11:26:32 +0530 Subject: [PATCH 12/12] fixed test case --- zk/lock_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zk/lock_test.go b/zk/lock_test.go index 5853227f..c89c6dfd 100644 --- a/zk/lock_test.go +++ b/zk/lock_test.go @@ -164,14 +164,14 @@ func TestLockPathExists(t *testing.T) { }{ {"lock_path_populated_success", args{ zk, - "test", + "/test", l.attemptedLockPath, }, true, }, {"lock_path_populated_failure", args{ zk, - "test_no_lock", + "/test_no_lock", l.attemptedLockPath, }, false,