diff --git a/dirwatcher.go b/dirwatcher.go index 7c2f551..3c11531 100644 --- a/dirwatcher.go +++ b/dirwatcher.go @@ -88,6 +88,13 @@ func NewDirWatcher(ctx context.Context, fileMask uint32, root string) (*DirWatch return } + select { + case <-ctx.Done(): + close(events) + return + default: + } + for _, event := range raw { // Skip ignored events queued from removed watchers @@ -170,7 +177,6 @@ func NewDirWatcher(ctx context.Context, fileMask uint32, root string) (*DirWatch return } } - return case event, ok := <-events: if !ok { select { @@ -197,8 +203,8 @@ func NewDirWatcher(ctx context.Context, fileMask uint32, root string) (*DirWatch }() go func() { - <-i.Done() wg.Wait() + <-i.Done() close(dw.done) }() diff --git a/inotify.go b/inotify.go index e1092ea..7710151 100644 --- a/inotify.go +++ b/inotify.go @@ -19,39 +19,30 @@ const maxEvents = 1024 var TimeoutError = errors.New("Inotify timeout") -type getWatchRequest struct { +type addWatchRequest struct { pathName string - result chan uint32 + mask uint32 + result chan error } -type getPathRequest struct { - wd uint32 - result chan string +type readResponse struct { + events []InotifyEvent + err error } -type addWatchRequest struct { - pathName string - wd uint32 +type readRequest struct { + deadline time.Time + result chan readResponse } // Inotify is the low level wrapper around inotify_init(), inotify_add_watch() and inotify_rm_watch() type Inotify struct { - // ctx is the context of inotify instance - ctx context.Context - // fd is the file descriptor of inotify instance - fd int - - // done channel is closed when the instance has completed - done chan struct{} - - // getPathByWatchIn is the channel for getting path by watch descriptor - getPathByWatchIn chan getPathRequest - // addWatchIn is the channel for adding watch + ctx context.Context + done chan struct{} addWatchIn chan addWatchRequest - // rmByWdIn is the channel for removing watch by watch descriptor - rmByWdIn chan uint32 - // rmByPathIn is the channel for removing watch by path + rmByWdIn chan uint32 rmByPathIn chan string + readIn chan readRequest } // NewInotify creates new inotify instance @@ -62,41 +53,172 @@ func NewInotify(ctx context.Context) (*Inotify, error) { } inotify := &Inotify{ - ctx: ctx, - done: make(chan struct{}), - fd: fd, - getPathByWatchIn: make(chan getPathRequest), - addWatchIn: make(chan addWatchRequest), - rmByWdIn: make(chan uint32), - rmByPathIn: make(chan string), + ctx: ctx, + done: make(chan struct{}), + addWatchIn: make(chan addWatchRequest), + rmByWdIn: make(chan uint32), + rmByPathIn: make(chan string), + readIn: make(chan readRequest), } go func() { defer close(inotify.done) + defer syscall.Close(fd) watches := make(map[string]uint32) paths := make(map[uint32]string) + main: for { select { case <-ctx.Done(): + + // Handle pending requests + draining := true + + for draining { + select { + case req := <-inotify.addWatchIn: + // Send error to addWatch requests + select { + case req.result <- errors.New("Inotify instance closed"): + default: + } + case req := <-inotify.readIn: + // Send error to read requests + select { + case req.result <- readResponse{err: errors.New("Inotify instance closed")}: + default: + } + case <-time.After(200 * time.Millisecond): + draining = false + default: + draining = false + } + } + for _, w := range watches { _, err := syscall.InotifyRmWatch(fd, w) if err != nil { continue } } - syscall.Close(fd) + return case req := <-inotify.addWatchIn: - watches[req.pathName] = req.wd - paths[req.wd] = req.pathName - case req := <-inotify.getPathByWatchIn: - pathName, ok := paths[req.wd] - if ok { - req.result <- pathName + wd, err := syscall.InotifyAddWatch(fd, req.pathName, req.mask) + if err == nil { + wdu32 := uint32(wd) + watches[req.pathName] = wdu32 + paths[wdu32] = req.pathName + } + select { + case req.result <- err: + case <-ctx.Done(): + } + case req := <-inotify.readIn: + { + deadline := req.deadline + response := readResponse{} + + events := make([]InotifyEvent, 0, maxEvents) + buf := make([]byte, maxEvents*(syscall.SizeofInotifyEvent+syscall.NAME_MAX+1)) + + var n int + read: + for { + now := time.Now() + if now.After(deadline) { + response.err = TimeoutError + response.events = events + select { + case req.result <- response: + case <-ctx.Done(): + } + continue main + } + + n, err = syscall.Read(fd, buf) + if err != nil { + if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK { + + // wait for a little bit while + select { + case <-time.After(time.Millisecond * 200): + case <-ctx.Done(): + continue main + } + + continue read + } + response.err = err + response.events = events + select { + case req.result <- response: + case <-ctx.Done(): + } + continue main + } + + if n > 0 { + break read + } + } + + if n < syscall.SizeofInotifyEvent { + response.err = fmt.Errorf("short inotify read, expected at least one SizeofInotifyEvent %d, got %d", syscall.SizeofInotifyEvent, n) + response.events = events + select { + case req.result <- response: + case <-ctx.Done(): + } + continue main + } + + offset := 0 + for offset+syscall.SizeofInotifyEvent <= n { + event := (*syscall.InotifyEvent)(unsafe.Pointer(&buf[offset])) + var name string + { + nameStart := offset + syscall.SizeofInotifyEvent + nameEnd := offset + syscall.SizeofInotifyEvent + int(event.Len) + + if nameEnd > n { + response.err = fmt.Errorf("corrupted inotify event length %d", event.Len) + response.events = events + select { + case req.result <- response: + case <-ctx.Done(): + } + continue main + } + + name = strings.TrimRight(string(buf[nameStart:nameEnd]), "\x00") + offset = nameEnd + } + + watchName, ok := paths[uint32(event.Wd)] + if !ok { + continue + } + + name = filepath.Join(watchName, name) + + events = append(events, InotifyEvent{ + Wd: uint32(event.Wd), + Name: name, + Mask: event.Mask, + Cookie: event.Cookie, + }) + } + + response.err = nil + response.events = events + select { + case req.result <- response: + case <-ctx.Done(): + } } - close(req.result) case wd := <-inotify.rmByWdIn: pathName, ok := paths[wd] if !ok { @@ -125,19 +247,24 @@ func (i *Inotify) Done() <-chan struct{} { // AddWatch adds given path to list of watched files / folders func (i *Inotify) AddWatch(pathName string, mask uint32) error { - w, err := syscall.InotifyAddWatch(i.fd, pathName, mask) - if err != nil { - return err + + req := addWatchRequest{ + pathName: pathName, + mask: mask, + result: make(chan error), } select { case <-i.ctx.Done(): return i.ctx.Err() - case i.addWatchIn <- addWatchRequest{ - pathName: pathName, - wd: uint32(w), - }: - return nil + case i.addWatchIn <- req: + + select { + case <-i.ctx.Done(): + return i.ctx.Err() + case err := <-req.result: + return err + } } } @@ -165,7 +292,7 @@ func (i *Inotify) RmWatch(pathName string) error { // wait forever, until context is cancelled. func (i *Inotify) Read() ([]InotifyEvent, error) { for { - evts, err := i.ReadDeadline(time.Now().Add(time.Millisecond * 200)) + evts, err := i.ReadDeadline(time.Now().Add(time.Millisecond * 300)) if err != nil { if err == TimeoutError { continue @@ -181,109 +308,20 @@ func (i *Inotify) Read() ([]InotifyEvent, error) { // ReadDeadline waits for InotifyEvents until deadline is reached, or context is cancelled. If // deadline is reached, TimeoutError is returned. func (i *Inotify) ReadDeadline(deadline time.Time) ([]InotifyEvent, error) { - events := make([]InotifyEvent, 0, maxEvents) - buf := make([]byte, maxEvents*(syscall.SizeofInotifyEvent+syscall.NAME_MAX+1)) - - var n int - var err error - - fdset := &syscall.FdSet{} - - //main: - for { - if i.ctx.Err() != nil { - return events, i.ctx.Err() - } - - now := time.Now() - - if now.After(deadline) { - return events, TimeoutError - } - - diff := deadline.Sub(now) - - timeout := syscall.NsecToTimeval(diff.Nanoseconds()) - - fdset.Bits[0] = 1 << uint(i.fd) - _, err = syscall.Select(i.fd+1, fdset, nil, nil, &timeout) - if err != nil { - if err == syscall.EINTR { - continue - } - return events, err - } - - if fdset.Bits[0]&(1< 0 { - break - } - } - - if n < syscall.SizeofInotifyEvent { - return events, fmt.Errorf("short inotify read, expected at least one SizeofInotifyEvent %d, got %d", syscall.SizeofInotifyEvent, n) + req := readRequest{ + deadline: deadline, + result: make(chan readResponse), } - offset := 0 - - for offset+syscall.SizeofInotifyEvent <= n { - - event := (*syscall.InotifyEvent)(unsafe.Pointer(&buf[offset])) - var name string - { - nameStart := offset + syscall.SizeofInotifyEvent - nameEnd := offset + syscall.SizeofInotifyEvent + int(event.Len) - - if nameEnd > n { - return events, fmt.Errorf("corrupted inotify event length %d", event.Len) - } - - name = strings.TrimRight(string(buf[nameStart:nameEnd]), "\x00") - offset = nameEnd - } - - req := getPathRequest{ - wd: uint32(event.Wd), - result: make(chan string), - } - + select { + case <-i.ctx.Done(): + return nil, i.ctx.Err() + case i.readIn <- req: select { case <-i.ctx.Done(): - return events, i.ctx.Err() - case i.getPathByWatchIn <- req: - - select { - case <-i.ctx.Done(): - - // drain result channel - for range req.result { - // noop - } - - return events, i.ctx.Err() - case watchName := <-req.result: - name = filepath.Join(watchName, name) - } + return nil, i.ctx.Err() + case resp := <-req.result: + return resp.events, resp.err } - - events = append(events, InotifyEvent{ - Wd: uint32(event.Wd), - Name: name, - Mask: event.Mask, - Cookie: event.Cookie, - }) } - - return events, nil }