Skip to content

Commit

Permalink
Fixes #5 - DirWatcher & FileWatcher have method Done(), allowing call…
Browse files Browse the repository at this point in the history
…er to wait for finalization of internal goroutines
  • Loading branch information
illarion committed Sep 11, 2024
1 parent 714973e commit 0fcf3d0
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 24 deletions.
46 changes: 36 additions & 10 deletions dirwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,27 @@ import (
"context"
"os"
"path/filepath"
"sync"
)

// DirWatcher recursively watches the given root folder, waiting for file events.
// Events can be masked by providing fileMask. DirWatcher does not generate events for
// folders or subfolders.
type DirWatcher struct {
context.Context
C chan FileEvent
C chan FileEvent
done chan struct{}
}

// NewDirWatcher creates DirWatcher recursively waiting for events in the given root folder and
// emitting FileEvents in channel C, that correspond to fileMask. Folder events are ignored (having IN_ISDIR set to 1)
func NewDirWatcher(ctx context.Context, fileMask uint32, root string) (*DirWatcher, error) {
ctx, cancel := context.WithCancel(ctx)
dw := &DirWatcher{
Context: ctx,
C: make(chan FileEvent),
C: make(chan FileEvent),
done: make(chan struct{}),
}

i, err := NewInotify(ctx)
if err != nil {
cancel()
return nil, err
}

Expand Down Expand Up @@ -53,21 +52,36 @@ func NewDirWatcher(ctx context.Context, fileMask uint32, root string) (*DirWatch
})

if err != nil {
cancel()
return nil, err
}

events := make(chan FileEvent)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer cancel()
defer wg.Done()

for _, event := range queue {
events <- event
select {
case <-ctx.Done():
close(events)
return
case events <- event:

}
}
queue = nil

for {

select {
case <-ctx.Done():
close(events)
return
default:
}

raw, err := i.Read()
if err != nil {
close(events)
Expand Down Expand Up @@ -129,7 +143,11 @@ func NewDirWatcher(ctx context.Context, fileMask uint32, root string) (*DirWatch
}
}()

wg.Add(1)
go func() {
defer wg.Done()
defer close(dw.C)

for {
select {
case <-ctx.Done():
Expand All @@ -139,7 +157,6 @@ func NewDirWatcher(ctx context.Context, fileMask uint32, root string) (*DirWatch
dw.C <- FileEvent{
Eof: true,
}
cancel()
return
}

Expand All @@ -153,6 +170,15 @@ func NewDirWatcher(ctx context.Context, fileMask uint32, root string) (*DirWatch
}
}()

go func() {
wg.Wait()
close(dw.done)
}()

return dw, nil
}

// Done returns a channel that is closed when DirWatcher is done
func (dw *DirWatcher) Done() <-chan struct{} {
return dw.done
}
13 changes: 10 additions & 3 deletions dirwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
)

func TestDirWatcher(t *testing.T) {
Expand Down Expand Up @@ -79,7 +80,7 @@ func TestDirWatcher(t *testing.T) {

})

t.Run("ClosedDirwatcherHasDoneContext", func(t *testing.T) {
t.Run("ClosedDirwatcherBecomesDone", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -88,11 +89,17 @@ func TestDirWatcher(t *testing.T) {
t.Error(err)
}

go func() {
for e := range dw.C {
t.Logf("Event received: %v", e)
}
}()

cancel()

select {
case <-dw.Context.Done():
default:
case <-dw.Done():
case <-time.After(5 * time.Second):
t.Fail()
}
})
Expand Down
39 changes: 29 additions & 10 deletions filewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,26 @@ package gonotify
import (
"context"
"path/filepath"
"sync"
)

// FileWatcher waits for events generated by filesystem for a specific list of file paths, including
// IN_CREATE for not yet existing files and IN_DELETE for removed.
type FileWatcher struct {
context.Context
C chan FileEvent
C chan FileEvent
done chan struct{}
}

// NewFileWatcher creates FileWatcher with provided inotify mask and list of files to wait events for.
func NewFileWatcher(ctx context.Context, mask uint32, files ...string) (*FileWatcher, error) {

ctx, cancel := context.WithCancel(ctx)
f := &FileWatcher{
Context: ctx,
C: make(chan FileEvent),
C: make(chan FileEvent),
done: make(chan struct{}),
}

inotify, err := NewInotify(ctx)
if err != nil {
cancel()
return nil, err
}

Expand All @@ -32,19 +31,27 @@ func NewFileWatcher(ctx context.Context, mask uint32, files ...string) (*FileWat
for _, file := range files {
err := inotify.AddWatch(filepath.Dir(file), mask)
if err != nil {
cancel()
return nil, err
}
expectedPaths[file] = true
}

events := make(chan FileEvent)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer cancel()
defer wg.Done()
for {
raw, err := inotify.Read()

select {
case <-ctx.Done():
close(events)
return
default:
}

raw, err := inotify.Read()
if err != nil {
close(events)
return
Expand All @@ -62,8 +69,10 @@ func NewFileWatcher(ctx context.Context, mask uint32, files ...string) (*FileWat
}
}()

wg.Add(1)
go func() {
defer cancel()
defer wg.Done()
defer close(f.C)
for {
select {
case <-ctx.Done():
Expand All @@ -86,5 +95,15 @@ func NewFileWatcher(ctx context.Context, mask uint32, files ...string) (*FileWat
}
}()

go func() {
wg.Wait()
close(f.done)
}()

return f, nil
}

// Done returns a channel that is closed when the FileWatcher is done.
func (f *FileWatcher) Done() <-chan struct{} {
return f.done
}
9 changes: 8 additions & 1 deletion filewatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
)

func TestFileWatcher(t *testing.T) {
Expand Down Expand Up @@ -70,11 +71,17 @@ func TestFileWatcher(t *testing.T) {
t.Error(err)
}

go func() {
for e := range fw.C {
t.Logf("Event received: %v", e)
}
}()

cancel()

select {
case <-fw.Done():
default:
case <-time.After(5 * time.Second):
t.Fail()
}

Expand Down

0 comments on commit 0fcf3d0

Please sign in to comment.