Skip to content

Commit

Permalink
Merge pull request #1 from inf-rno/addwatch
Browse files Browse the repository at this point in the history
Addwatch Operation
  • Loading branch information
inf-rno authored Jul 20, 2021
2 parents 8866d76 + 1c8f237 commit 87c460f
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 85 deletions.
12 changes: 6 additions & 6 deletions _examples/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import (
"fmt"
"time"

"github.com/go-zookeeper/zk"
"github.com/inf-rno/zk"
)

func main() {
c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second) //*10)
c, _, err := zk.Connect([]string{"127.0.0.1:2181", "127.0.0.1:2182", "127.0.0.1:2183"}, time.Second) //*10)
if err != nil {
panic(err)
}
children, stat, ch, err := c.ChildrenW("/")
ch, err := c.AddWatch("/", true)
if err != nil {
panic(err)
}
fmt.Printf("%+v %+v\n", children, stat)
e := <-ch
fmt.Printf("%+v\n", e)
for e := range ch {
fmt.Printf("%+v\n", e)
}
}
213 changes: 136 additions & 77 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
watchTypeData = iota
watchTypeExist
watchTypeChild
watchTypePersistent
watchTypePersistentRecursive
)

type watchPathType struct {
Expand Down Expand Up @@ -97,13 +99,14 @@ type Conn struct {
requests map[int32]*request // Xid -> pending request
requestsLock sync.Mutex
watchers map[watchPathType][]chan Event
recWatchers map[watchPathType][]chan Event
watchersLock sync.Mutex
closeChan chan struct{} // channel to tell send loop stop

// Debug (used by unit tests)
reconnectLatch chan struct{}
setWatchLimit int
setWatchCallback func([]*setWatchesRequest)
setWatchCallback func([]*setWatches2Request)

// Debug (for recurring re-auth hang)
debugCloseRecvLoop bool
Expand Down Expand Up @@ -200,6 +203,7 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti
sendChan: make(chan *request, sendChanSize),
requests: make(map[int32]*request),
watchers: make(map[watchPathType][]chan Event),
recWatchers: make(map[watchPathType][]chan Event),
passwd: emptyPassword,
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
Expand Down Expand Up @@ -535,97 +539,112 @@ func (c *Conn) invalidateWatches(err error) {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()

if len(c.watchers) >= 0 {
for pathType, watchers := range c.watchers {
ev := Event{Type: EventNotWatching, State: StateDisconnected, Path: pathType.path, Err: err}
c.sendEvent(ev) // also publish globally
for _, ch := range watchers {
ch <- ev
close(ch)
f := func(w map[watchPathType][]chan Event) {
if len(w) >= 0 {
for pathType, watchers := range w {
ev := Event{Type: EventNotWatching, State: StateDisconnected, Path: pathType.path, Err: err}
c.sendEvent(ev) // also publish globally
for _, ch := range watchers {
ch <- ev
close(ch)
}
}
}
c.watchers = make(map[watchPathType][]chan Event)
}
f(c.watchers)
c.watchers = make(map[watchPathType][]chan Event)
f(c.recWatchers)
c.recWatchers = make(map[watchPathType][]chan Event)
}

func (c *Conn) sendSetWatches() {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()

if len(c.watchers) == 0 {
return
}
f := func(w map[watchPathType][]chan Event) {
if len(w) == 0 {
return
}

// NB: A ZK server, by default, rejects packets >1mb. So, if we have too
// many watches to reset, we need to break this up into multiple packets
// to avoid hitting that limit. Mirroring the Java client behavior: we are
// conservative in that we limit requests to 128kb (since server limit is
// is actually configurable and could conceivably be configured smaller
// than default of 1mb).
limit := 128 * 1024
if c.setWatchLimit > 0 {
limit = c.setWatchLimit
}
// NB: A ZK server, by default, rejects packets >1mb. So, if we have too
// many watches to reset, we need to break this up into multiple packets
// to avoid hitting that limit. Mirroring the Java client behavior: we are
// conservative in that we limit requests to 128kb (since server limit is
// is actually configurable and could conceivably be configured smaller
// than default of 1mb).
limit := 128 * 1024
if c.setWatchLimit > 0 {
limit = c.setWatchLimit
}

var reqs []*setWatchesRequest
var req *setWatchesRequest
var sizeSoFar int
var reqs []*setWatches2Request
var req *setWatches2Request
var sizeSoFar int

n := 0
for pathType, watchers := range c.watchers {
if len(watchers) == 0 {
continue
}
addlLen := 4 + len(pathType.path)
if req == nil || sizeSoFar+addlLen > limit {
if req != nil {
// add to set of requests that we'll send
reqs = append(reqs, req)
n := 0
for pathType, watchers := range w {
if len(watchers) == 0 {
continue
}
sizeSoFar = 28 // fixed overhead of a set-watches packet
req = &setWatchesRequest{
RelativeZxid: c.lastZxid,
DataWatches: make([]string, 0),
ExistWatches: make([]string, 0),
ChildWatches: make([]string, 0),
addlLen := 4 + len(pathType.path)
if req == nil || sizeSoFar+addlLen > limit {
if req != nil {
// add to set of requests that we'll send
reqs = append(reqs, req)
}
sizeSoFar = 28 // fixed overhead of a set-watches packet
req = &setWatches2Request{
RelativeZxid: c.lastZxid,
DataWatches: make([]string, 0),
ExistWatches: make([]string, 0),
ChildWatches: make([]string, 0),
PersistentWatches: make([]string, 0),
PersistentRecursiveWatches: make([]string, 0),
}
}
sizeSoFar += addlLen
switch pathType.wType {
case watchTypeData:
req.DataWatches = append(req.DataWatches, pathType.path)
case watchTypeExist:
req.ExistWatches = append(req.ExistWatches, pathType.path)
case watchTypeChild:
req.ChildWatches = append(req.ChildWatches, pathType.path)
case watchTypePersistent:
req.PersistentWatches = append(req.PersistentWatches, pathType.path)
case watchTypePersistentRecursive:
req.PersistentRecursiveWatches = append(req.PersistentRecursiveWatches, pathType.path)
}
n++
}
if n == 0 {
return
}
sizeSoFar += addlLen
switch pathType.wType {
case watchTypeData:
req.DataWatches = append(req.DataWatches, pathType.path)
case watchTypeExist:
req.ExistWatches = append(req.ExistWatches, pathType.path)
case watchTypeChild:
req.ChildWatches = append(req.ChildWatches, pathType.path)
if req != nil { // don't forget any trailing packet we were building
reqs = append(reqs, req)
}
n++
}
if n == 0 {
return
}
if req != nil { // don't forget any trailing packet we were building
reqs = append(reqs, req)
}

if c.setWatchCallback != nil {
c.setWatchCallback(reqs)
}
if c.setWatchCallback != nil {
c.setWatchCallback(reqs)
}

go func() {
res := &setWatchesResponse{}
// TODO: Pipeline these so queue all of them up before waiting on any
// response. That will require some investigation to make sure there
// aren't failure modes where a blocking write to the channel of requests
// could hang indefinitely and cause this goroutine to leak...
for _, req := range reqs {
_, err := c.request(opSetWatches, req, res, nil)
if err != nil {
c.logger.Printf("Failed to set previous watches: %v", err)
break
go func() {
res := &setWatches2Response{}
// TODO: Pipeline these so queue all of them up before waiting on any
// response. That will require some investigation to make sure there
// aren't failure modes where a blocking write to the channel of requests
// could hang indefinitely and cause this goroutine to leak...
for _, req := range reqs {
_, err := c.request(opSetWatches2, req, res, nil)
if err != nil {
c.logger.Printf("Failed to set previous watches: %v", err)
break
}
}
}
}()
}()
}
f(c.watchers)
f(c.recWatchers)
}

func (c *Conn) authenticate() error {
Expand Down Expand Up @@ -812,7 +831,7 @@ func (c *Conn) recvLoop(conn net.Conn) error {
Err: nil,
}
c.sendEvent(ev)
wTypes := make([]watchType, 0, 2)
wTypes := []watchType{watchTypePersistent}
switch res.Type {
case EventNodeCreated:
wTypes = append(wTypes, watchTypeExist)
Expand All @@ -827,9 +846,20 @@ func (c *Conn) recvLoop(conn net.Conn) error {
if watchers := c.watchers[wpt]; watchers != nil && len(watchers) > 0 {
for _, ch := range watchers {
ch <- ev
close(ch)
if t != watchTypePersistent {
close(ch)
}
}
if t != watchTypePersistent {
delete(c.watchers, wpt)
}
}
}
for basePath, watchers := range c.recWatchers {
if strings.HasPrefix(res.Path, basePath.path) {
for _, ch := range watchers {
ch <- ev
}
delete(c.watchers, wpt)
}
}
c.watchersLock.Unlock()
Expand Down Expand Up @@ -879,7 +909,11 @@ func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event {

ch := make(chan Event, 1)
wpt := watchPathType{path, watchType}
c.watchers[wpt] = append(c.watchers[wpt], ch)
if watchType == watchTypePersistentRecursive {
c.recWatchers[wpt] = append(c.recWatchers[wpt], ch)
} else {
c.watchers[wpt] = append(c.watchers[wpt], ch)
}
return ch
}

Expand Down Expand Up @@ -963,6 +997,31 @@ func (c *Conn) AddAuth(scheme string, auth []byte) error {
return nil
}

// AddWatch creates a persistent, recursive watch at the given path
func (c *Conn) AddWatch(path string, recursive bool) (<-chan Event, error) {
if err := validatePath(path, false); err != nil {
return nil, err
}

var ech <-chan Event
mode := WatchModePersistent
wt := watchTypePersistent
if recursive {
mode = WatchModePersistentRecursive
wt = watchTypePersistentRecursive
}
res := &addWatchResponse{}
_, err := c.request(opAddWatch, &addWatchRequest{Path: path, Mode: mode}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watchType(wt))
}
})
if err != nil {
return nil, err
}
return ech, err
}

// Children returns the children of a znode.
func (c *Conn) Children(path string) ([]string, *Stat, error) {
if err := validatePath(path, false); err != nil {
Expand Down
25 changes: 25 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
opClose = -11
opSetAuth = 100
opSetWatches = 101
opSetWatches2 = 105
opAddWatch = 106
opError = -1
// Not in protocol, used internally
opWatcherEvent = -2
Expand Down Expand Up @@ -100,6 +102,27 @@ func (s State) String() string {
return "Unknown"
}

const (
WatchModePersistent AddWatchMode = 0
WatchModePersistentRecursive AddWatchMode = 1
)

var (
addWatchModeNames = map[AddWatchMode]string{
WatchModePersistent: "WatchModePersistent",
WatchModePersistentRecursive: "WatchModePersistentRecursive",
}
)

type AddWatchMode int32

func (m AddWatchMode) String() string {
if name := addWatchModeNames[m]; name != "" {
return name
}
return "Unknown"
}

type ErrCode int32

var (
Expand Down Expand Up @@ -215,6 +238,8 @@ var (
opClose: "close",
opSetAuth: "setAuth",
opSetWatches: "setWatches",
opSetWatches2: "setWatches2",
opAddWatch: "addWatch",

opWatcherEvent: "watcherEvent",
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/go-zookeeper/zk
module github.com/inf-rno/zk

go 1.13
Loading

0 comments on commit 87c460f

Please sign in to comment.