Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zookeeper: Make Create+Put on a new znode atomic #148

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 39 additions & 19 deletions store/zookeeper/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ const (
// SOH control character
SOH = "\x01"

defaultTimeout = 10 * time.Second
defaultTimeout = 10 * time.Second

syncRetryLimit = 5

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to choose 5? I don't know but it seems high. How much delay this may introduce?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongluochen No specific reason, except in my test environment 3 retries would always be needed to get the correct value, and I rounded this up to 5.
Note that the sync currently happens on SOH or empty string getting returned, and not on all reads. Also this is to work around older versions of libkv which have this bug. The versions that have this patch will do create+write atomically and should not return SOH or empty so the sync should never be needed.

)

// Zookeeper is the receiver type for
Expand Down Expand Up @@ -66,19 +68,29 @@ func (s *Zookeeper) setTimeout(time time.Duration) {
// Get the value at "key", returns the last modified index
// to use in conjunction to Atomic calls
func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
resp, meta, err := s.client.Get(s.normalize(key))

if err != nil {
if err == zk.ErrNoNode {
return nil, store.ErrKeyNotFound
var resp []byte
var meta *zk.Stat

// To guard against older versions of libkv
// creating and writing to znodes non-atomically,
// We try to resync few times if we read SOH or
// an empty string
for i := 0; i <= syncRetryLimit; i++ {
resp, meta, err = s.client.Get(s.normalize(key))

if err != nil {
if err == zk.ErrNoNode {
return nil, store.ErrKeyNotFound
}
return nil, err
}
return nil, err
}

// FIXME handle very rare cases where Get returns the
// SOH control character instead of the actual value
if string(resp) == SOH {
return s.Get(store.Normalize(key))
if (string(resp) == SOH || string(resp) == "") && i < syncRetryLimit {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel i < syncRetryLimit here is unnecessary.

Copy link
Author

@amirma amirma Mar 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongluochen The i < syncRetryLimit is to prevent a Sync in the very last iteration of the for loop. Without this, we would unnecessarily call sync and then exit the loop. (So effectively we call sync not more than syncRetryLimit number of times).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

if _, err = s.client.Sync(s.normalize(key)); err != nil {
return nil, err
}
}
}

pair = &store.KVPair{
Expand All @@ -91,14 +103,21 @@ func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
}

// createFullPath creates the entire path for a directory
// that does not exist
func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error {
// that does not exist and sets the value of the last
// znode to data
func (s *Zookeeper) createFullPath(path []string, data []byte, ephemeral bool) error {
for i := 1; i <= len(path); i++ {
newpath := "/" + strings.Join(path[:i], "/")
if i == len(path) && ephemeral {
_, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))

if i == len(path) {
flag := 0
if ephemeral {
flag = zk.FlagEphemeral
}
_, err := s.client.Create(newpath, data, int32(flag), zk.WorldACL(zk.PermAll))
return err
}

_, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// Skip if node already exists
Expand All @@ -121,13 +140,14 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro

if !exists {
if opts != nil && opts.TTL > 0 {
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true)
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), value, true)
} else {
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false)
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), value, false)
}
} else {
_, err = s.client.Set(fkey, value, -1)
}

_, err = s.client.Set(fkey, value, -1)
return err
}

Expand Down Expand Up @@ -313,7 +333,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair,
// Create the directory
parts := store.SplitKey(strings.TrimSuffix(key, "/"))
parts = parts[:len(parts)-1]
if err = s.createFullPath(parts, false); err != nil {
if err = s.createFullPath(parts, []byte{}, false); err != nil {
// Failed to create the directory.
return false, nil, err
}
Expand Down