Skip to content
This repository has been archived by the owner on Jul 21, 2021. It is now read-only.

added a validation to check whether lock exists in case of network pa… #230

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 62 additions & 5 deletions zk/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var (
ErrDeadlock = errors.New("zk: trying to acquire a lock twice")
// ErrNotLocked is returned by Unlock when trying to release a lock that has not first be acquired.
ErrNotLocked = errors.New("zk: not locked")

)

// Lock is a mutual exclusion lock.
Expand All @@ -23,6 +24,11 @@ type Lock struct {
seq int
}

// Initializing a map using the built-in make() function
// This map stores the lock_path of last successfully requested sequential ephemeral znode queued
// In case of any conflict, the sequence number is used to check whether lock has been acquired
var mapEphermeralSequenceLockPath = make(map[string]string)
continuum-Nikhil-Bhide marked this conversation as resolved.
Show resolved Hide resolved

// NewLock creates a new lock instance using the provided connection, path, and acl.
// The path must be a node that is only used by this lock. A lock instances starts
// unlocked until Lock() is called.
Expand All @@ -47,13 +53,23 @@ func (l *Lock) Lock() error {
return ErrDeadlock
}

if seqZnodePath, ok := mapEphermeralSequenceLockPath[l.path]; ok && seqZnodePath != "" {
// Check whether lock has been acquired previously and it still exists
if(lockExists(l.c,l.path,seqZnodePath)) {
return nil
}
}

prefix := fmt.Sprintf("%s/lock-", l.path)

path := ""
var err error
for i := 0; i < 3; i++ {
tryLock: for i := 0; i < 3; i++ {
continuum-Nikhil-Bhide marked this conversation as resolved.
Show resolved Hide resolved
path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl)
if err == ErrNoNode {
// Store the path of newly created sequential ephemeral znode against the parent znode path
mapEphermeralSequenceLockPath[l.path] = path
switch err {
case ErrNoNode:
// Create parent node.
parts := strings.Split(l.path, "/")
pth := ""
Expand All @@ -72,9 +88,9 @@ func (l *Lock) Lock() error {
return err
}
}
} else if err == nil {
break
} else {
case nil:
continuum-Nikhil-Bhide marked this conversation as resolved.
Show resolved Hide resolved
break tryLock
default:
return err
}
}
Expand Down Expand Up @@ -146,5 +162,46 @@ func (l *Lock) Unlock() error {
}
l.lockPath = ""
l.seq = 0
// Remove the entry of path of newly created sequential ephemeral znode
// this was stored against the parent znode path
delete(mapEphermeralSequenceLockPath,l.path)
continuum-Nikhil-Bhide marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

//Check whether lock got created and response was lost because of network partition failure.
//It queries zookeeper and scans existing sequential ephemeral znodes under the parent path
//It finds out that previously requested sequence number corresponds to child having lowest sequence number
func lockExists(c *Conn, rootPath string, znodePath string) bool {
seq, err := parseSeq(znodePath)
if err != nil {
return false
}

//Scan the existing znodes if there are any
children, _, err := c.Children(rootPath)
if err != nil {
return false
}

lowestSeq := seq
prevSeq := -1
for _, p := range children {
s, err := parseSeq(p)
if err != nil {
return false
}
if s < lowestSeq {
lowestSeq = s
}
if s < seq && s > prevSeq {
prevSeq = s
}
}

if seq == lowestSeq {
// Acquired the lock
return true
} else {
continuum-Nikhil-Bhide marked this conversation as resolved.
Show resolved Hide resolved
return false
}
}