Skip to content

Commit

Permalink
Fixed missing calls to inotify_rm_watch, when removing watch descript…
Browse files Browse the repository at this point in the history
…or either by id or by path; AddWatch() now returns watch descriptor
  • Loading branch information
illarion committed Oct 11, 2024
1 parent e6476c2 commit 01b4d75
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 32 deletions.
3 changes: 2 additions & 1 deletion dirwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func NewDirWatcher(ctx context.Context, fileMask uint32, root string) (*DirWatch

return nil
}
return i.AddWatch(path, IN_ALL_EVENTS)
_, err = i.AddWatch(path, IN_ALL_EVENTS)
return err
})

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion filewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewFileWatcher(ctx context.Context, mask uint32, files ...string) (*FileWat
expectedPaths := make(map[string]bool)

for _, file := range files {
err := inotify.AddWatch(filepath.Dir(file), mask)
_, err := inotify.AddWatch(filepath.Dir(file), mask)
if err != nil {
return nil, err
}
Expand Down
131 changes: 101 additions & 30 deletions inotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,25 @@ const (
maxEvents = 1024
)

var TimeoutError = errors.New("Inotify timeout")
type addWatchResult struct {
wd int
err error
}

type addWatchRequest struct {
pathName string
mask uint32
result chan addWatchResult
}

type rmWdRequest struct {
wd int
ignored bool // if true, the watch was removed automatically
result chan error
}

type rmPathRequest struct {
pathName string
result chan error
}

Expand All @@ -40,8 +54,8 @@ type Inotify struct {
ctx context.Context
done chan struct{}
addWatchIn chan addWatchRequest
rmByWdIn chan int
rmByPathIn chan string
rmByWdIn chan rmWdRequest
rmByPathIn chan rmPathRequest
eventsOut chan eventItem

readMutex sync.Mutex
Expand All @@ -56,14 +70,12 @@ func NewInotify(ctx context.Context) (*Inotify, error) {

file := os.NewFile(uintptr(fd), "inotify")

//ctx, cancel := context.WithCancel(ctx)

inotify := &Inotify{
ctx: ctx,
done: make(chan struct{}),
addWatchIn: make(chan addWatchRequest),
rmByWdIn: make(chan int),
rmByPathIn: make(chan string),
rmByWdIn: make(chan rmWdRequest),
rmByPathIn: make(chan rmPathRequest),
eventsOut: make(chan eventItem, maxEvents),
}

Expand All @@ -80,16 +92,14 @@ func NewInotify(ctx context.Context) (*Inotify, error) {
go func() {
//defer cancel()
<-ctx.Done()
file.Close()
//file.Close()
wg.Done()
}()

wg.Add(1)
// read events goroutine. Only this goroutine can read or close the inotify file descriptor
go func() {
//defer cancel()
defer wg.Done()
//defer file.Close()
defer close(inotify.eventsOut)

// reusable buffers for reading inotify events. Make sure they're not
Expand Down Expand Up @@ -193,13 +203,28 @@ func NewInotify(ctx context.Context) (*Inotify, error) {

// remove watch

result := make(chan error)

select {
case <-ctx.Done():
return
case inotify.rmByWdIn <- inotifyEvent.Wd:
case inotify.rmByWdIn <- rmWdRequest{
wd: int(event.Wd),
ignored: true,
result: result,
}:
case <-time.After(1 * time.Second):
}

select {
case <-ctx.Done():
return
case err := <-result:
if err != nil {
// TODO log error
}
}

continue

}
Expand Down Expand Up @@ -240,7 +265,10 @@ func NewInotify(ctx context.Context) (*Inotify, error) {
case req := <-inotify.addWatchIn:
// Send error to addWatch requests
select {
case req.result <- errors.New("Inotify instance closed"):
case req.result <- addWatchResult{
wd: 0,
err: errors.New("Inotify instance closed"),
}:
default:
}
case <-inotify.rmByWdIn:
Expand Down Expand Up @@ -268,23 +296,40 @@ func NewInotify(ctx context.Context) (*Inotify, error) {
paths[wd] = req.pathName
}
select {
case req.result <- err:
case req.result <- addWatchResult{wd: wd, err: err}:
case <-ctx.Done():
}
case wd := <-inotify.rmByWdIn:
pathName, ok := paths[wd]
case req := <-inotify.rmByWdIn:
pathName, ok := paths[req.wd]
if !ok {
continue
}

if !req.ignored {
_, err = syscallf.InotifyRmWatch(fd, req.wd)
}

delete(watches, pathName)
delete(paths, wd)
case pathName := <-inotify.rmByPathIn:
wd, ok := watches[pathName]
delete(paths, req.wd)

select {
case req.result <- err:
case <-ctx.Done():
}
case req := <-inotify.rmByPathIn:
wd, ok := watches[req.pathName]
if !ok {
continue
}
delete(watches, pathName)
_, err := syscallf.InotifyRmWatch(fd, wd)

delete(watches, req.pathName)
delete(paths, wd)

select {
case req.result <- err:
case <-ctx.Done():
}
case req := <-getPathIn:
wd := paths[req.wd]
select {
Expand All @@ -310,45 +355,71 @@ func (i *Inotify) Done() <-chan struct{} {
}

// AddWatch adds given path to list of watched files / folders
func (i *Inotify) AddWatch(pathName string, mask uint32) error {
func (i *Inotify) AddWatch(pathName string, mask uint32) (int, error) {

req := addWatchRequest{
pathName: pathName,
mask: mask,
result: make(chan error),
result: make(chan addWatchResult),
}

select {
case <-i.ctx.Done():
return i.ctx.Err()
return 0, i.ctx.Err()
case i.addWatchIn <- req:

select {
case <-i.ctx.Done():
return i.ctx.Err()
case err := <-req.result:
return err
return 0, i.ctx.Err()
case result := <-req.result:
return result.wd, result.err
}
}
}

// RmWd removes watch by watch descriptor
func (i *Inotify) RmWd(wd int) error {

req := rmWdRequest{
wd: wd,
ignored: false,
result: make(chan error),
}

select {
case <-i.ctx.Done():
return i.ctx.Err()
case i.rmByWdIn <- wd:
return nil
case i.rmByWdIn <- req:
}

select {
case <-i.ctx.Done():
return i.ctx.Err()
case err := <-req.result:
return err
}

}

// RmWatch removes watch by pathName
func (i *Inotify) RmWatch(pathName string) error {

req := rmPathRequest{
pathName: pathName,
result: make(chan error),
}

select {
case <-i.ctx.Done():
return i.ctx.Err()
case i.rmByPathIn <- req:
}

select {
case <-i.ctx.Done():
return i.ctx.Err()
case i.rmByPathIn <- pathName:
return nil
case err := <-req.result:
return err
}
}

Expand Down Expand Up @@ -421,7 +492,7 @@ func (i *Inotify) Read() ([]InotifyEvent, error) {
}

// ReadDeadline waits for InotifyEvents until deadline is reached, or context is cancelled. If
// deadline is reached, TimeoutError is returned.
// deadline is reached, it will return all events read until that point.
func (i *Inotify) ReadDeadline(deadline time.Time) ([]InotifyEvent, error) {
i.readMutex.Lock()
defer i.readMutex.Unlock()
Expand Down

0 comments on commit 01b4d75

Please sign in to comment.