Skip to content

Commit

Permalink
Define a DLocker interface for distributed locker
Browse files Browse the repository at this point in the history
Also updated the contrib/lock example to demo how to use the DLocker

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Oct 25, 2024
1 parent 510b9b1 commit 5526e79
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 10 deletions.
15 changes: 13 additions & 2 deletions client/v3/concurrency/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (m *Mutex) IsOwner() v3.Cmp {
}

func (m *Mutex) Key() string { return m.myKey }
func (m *Mutex) Rev() int64 { return m.myRev }

// Header is the response header received from etcd on acquiring the lock.
func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }
Expand All @@ -171,7 +172,17 @@ func (lm *lockerMutex) Unlock() {
}
}

// NewLocker creates a sync.Locker backed by an etcd mutex.
func NewLocker(s *Session, pfx string) sync.Locker {
// NewLocker creates a DLocker backed by an etcd mutex.
func NewLocker(s *Session, pfx string) DLocker {
return &lockerMutex{NewMutex(s, pfx)}
}

// DLocker represents an object that can be locked and unlocked
// in distributed environment.
type DLocker interface {
sync.Locker
// Rev returns a revision which is monotonically incremental. It can
// be used as a fencing token to prevent expired locker from operating
// the shared resource.
Rev() int64
}
2 changes: 1 addition & 1 deletion contrib/lock/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ If things go well the second client process invoked as `./client 2` finishes soo
After checking this, please hit any key for `./client 1` and resume the process. It will show an output like below:
```
resuming client 1
expected fail to write to storage with old lease version: error: given version (694d82254d5fa305) is different from the existing version (694d82254e18770a)
expected fail to write to storage with old lease version: error: given version (8) is smaller than the existing version (10)
```

[fencing]: https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
Expand Down
11 changes: 6 additions & 5 deletions contrib/lock/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,27 @@ func main() {
locker := concurrency.NewLocker(session, "/lock")
locker.Lock()
defer locker.Unlock()
version := session.Lease()
leaseID := session.Lease()
version := locker.Rev()
log.Printf("acquired lock, version: %x", version)

if mode == 1 {
log.Printf("please manually revoke the lease using 'etcdctl lease revoke %x' or wait for it to expire, then start executing client 2 and hit any key...", version)
log.Printf("please manually revoke the lease using 'etcdctl lease revoke %x' or wait for it to expire, then start executing client 2 and hit any key...", leaseID)
reader := bufio.NewReader(os.Stdin)
_, _ = reader.ReadByte()
log.Print("resuming client 1")
} else {
log.Print("this is client 2, continuing\n")
}

err = write("key0", fmt.Sprintf("value from client %x", mode), int64(version))
err = write("key0", fmt.Sprintf("value from client %x", mode), version)
if err != nil {
if mode == 1 {
log.Printf("expected fail to write to storage with old lease version: %s\n", err) // client 1 should show this message
log.Printf("expected fail to write to storage with old version: %s\n", err) // client 1 should show this message
} else {
log.Fatalf("unexpected fail to write to storage: %s\n", err)
}
} else {
log.Printf("successfully write a key to storage using lease %x\n", int64(version))
log.Printf("successfully write a key to storage with version %x\n", version)
}
}
4 changes: 2 additions & 2 deletions contrib/lock/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func handler(w http.ResponseWriter, r *http.Request) {
}
} else if strings.Compare(req.Op, "write") == 0 {
if val, ok := data[req.Key]; ok {
if req.Version != val.version {
writeResponse(response{"", -1, fmt.Sprintf("given version (%x) is different from the existing version (%x)", req.Version, val.version)}, w)
if req.Version < val.version {
writeResponse(response{"", -1, fmt.Sprintf("given version (%d) is smaller than the existing version (%d)", req.Version, val.version)}, w)
} else {
data[req.Key].val = req.Val
data[req.Key].version = req.Version
Expand Down

0 comments on commit 5526e79

Please sign in to comment.