Skip to content

Commit

Permalink
lib/memfs: stop all Watcher spawn by DirWatcher
Browse files Browse the repository at this point in the history
Previously, the Watcher goroutine will stopped only if the file is
being deleted.
If the DirWatcher stopped manually, by calling Stop method, the
Watcher goroutine will still running in the background.

This changes record all spawned Watcher and stop it when files inside
a deleted directory or when Stop called.
  • Loading branch information
shuLhan committed Nov 7, 2023
1 parent 5b8a716 commit 8b20855
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 15 deletions.
53 changes: 45 additions & 8 deletions lib/memfs/dirwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type DirWatcher struct {
// consumed.
qFileChanges chan NodeState

qrun chan bool
qrun chan struct{}

root *Node
fs *MemFS
Expand All @@ -41,6 +41,10 @@ type DirWatcher struct {
// information.
dirs map[string]*Node

// fileWatcher contains active watcher for file with Node.Path as
// key.
fileWatcher map[string]*Watcher

// This struct embed Options to map the directory to be watched
// into memory.
//
Expand Down Expand Up @@ -95,9 +99,11 @@ func (dw *DirWatcher) init() (err error) {
dw.C = dw.qchanges

dw.qFileChanges = make(chan NodeState, dirWatcherQueueSize)
dw.qrun = make(chan bool, 1)
dw.qrun = make(chan struct{})

dw.dirs = make(map[string]*Node)
dw.fileWatcher = make(map[string]*Watcher)

dw.mapSubdirs(dw.root)

return nil
Expand All @@ -121,7 +127,17 @@ func (dw *DirWatcher) Start() (err error) {

// Stop watching changes on directory.
func (dw *DirWatcher) Stop() {
dw.qrun <- false
// Stop all file watchers.
var watcher *Watcher
for _, watcher = range dw.fileWatcher {
watcher.Stop()
}

select {
case dw.qrun <- struct{}{}:
<-dw.qrun
default:
}
}

// dirsKeys return all the key in field dirs sorted in ascending order.
Expand All @@ -145,8 +161,9 @@ func (dw *DirWatcher) mapSubdirs(node *Node) {
var (
logp = `DirWatcher.mapSubdirs`

child *Node
err error
child *Node
watcher *Watcher
err error
)

for _, child = range node.Childs {
Expand All @@ -157,10 +174,12 @@ func (dw *DirWatcher) mapSubdirs(node *Node) {
dw.mapSubdirs(child)
continue
}
_, err = newWatcher(node, child, dw.Delay, dw.qFileChanges)
watcher, err = newWatcher(node, child, dw.Delay, dw.qFileChanges)
if err != nil {
log.Printf("%s %q: %s", logp, child.SysPath, err)
continue
}
dw.fileWatcher[child.Path] = watcher
}
}

Expand All @@ -172,10 +191,14 @@ func (dw *DirWatcher) onCreated(parent, child *Node) (err error) {
dw.dirsLocker.Unlock()
} else {
// Start watching the file for modification.
_, err = newWatcher(parent, childInfo, dw.Delay, dw.qFileChanges)
var watcher *Watcher

watcher, err = newWatcher(parent, child, dw.Delay, dw.qFileChanges)
if err != nil {
return fmt.Errorf(`onCreated: %w`, err)
}

dw.fileWatcher[child.Path] = watcher
}

var ns = NodeState{
Expand All @@ -194,7 +217,8 @@ func (dw *DirWatcher) onCreated(parent, child *Node) (err error) {
// childs if its a directory.
func (dw *DirWatcher) onDelete(node *Node) {
var (
child *Node
child *Node
watcher *Watcher
)
for _, child = range node.Childs {
if child.IsDir() {
Expand All @@ -217,6 +241,13 @@ func (dw *DirWatcher) onDelete(node *Node) {
dw.dirsLocker.Lock()
delete(dw.dirs, node.Path)
dw.dirsLocker.Unlock()
} else {
// Stop the file watcher.
watcher = dw.fileWatcher[node.Path]
if watcher != nil {
watcher.Stop()
delete(dw.fileWatcher, node.Path)
}
}
dw.fs.RemoveChild(node.Parent, node)

Expand Down Expand Up @@ -470,6 +501,10 @@ func (dw *DirWatcher) start() {
node, err = dw.fs.Get(ns.Node.Path)
if err != nil {
log.Printf("%s: on file changes %s: %s", logp, ns.Node.Path, err)
var watcher = dw.fileWatcher[ns.Node.Path]
if watcher != nil {
watcher.Stop()
}
} else {
ns.Node = *node
switch ns.State {
Expand All @@ -485,6 +520,8 @@ func (dw *DirWatcher) start() {
case <-dw.qrun:
ever = false
ticker.Stop()
// Signal back to the Stop caller.
dw.qrun <- struct{}{}
}
}
}
Expand Down
37 changes: 30 additions & 7 deletions lib/memfs/dirwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

libstrings "github.com/shuLhan/share/lib/strings"
"github.com/shuLhan/share/lib/test"
)

Expand All @@ -28,9 +29,9 @@ func TestDirWatcher_renameDirectory(t *testing.T) {
//
// Create a directory with its content to be watched.
//
// rootDir
// |_ subDir
// |_ subDirFile
// rootDir/
// |_ subDir/
// |_ testfile
//

rootDir = t.TempDir()
Expand Down Expand Up @@ -68,9 +69,18 @@ func TestDirWatcher_renameDirectory(t *testing.T) {
t.Fatal(err)
}

<-dw.C
<-dw.C
<-dw.C
var ns NodeState
ns = <-dw.C // newsubdir created.
t.Logf(`0: %s %s`, ns.State, ns.Node.Path)

ns = <-dw.C // newsubdir/testfile created.
t.Logf(`1: %s %s`, ns.State, ns.Node.Path)

ns = <-dw.C // subdir/testfile removed.
t.Logf(`2: %s %s`, ns.State, ns.Node.Path)

ns = <-dw.C // subdir removed.
t.Logf(`3: %s %s`, ns.State, ns.Node.Path)

dw.Stop()

Expand Down Expand Up @@ -142,9 +152,22 @@ func TestDirWatcher_removeDirSymlink(t *testing.T) {
if err != nil {
t.Fatal(err)
}

var oneOf = []string{`/sub`, `/sub/index.html`}

got = <-dw.C
test.Assert(t, `RemoveAll state`, FileStateDeleted, got.State)
if !libstrings.IsContain(oneOf, got.Node.Path) {
t.Fatalf(`expecting one of %v, got %q`, oneOf, got.Node.Path)
}

got = <-dw.C
test.Assert(t, `RemoveAll state`, FileStateDeleted, got.State)
test.Assert(t, `RemoveAll path`, `/sub/index.html`, got.Node.Path)
if !libstrings.IsContain(oneOf, got.Node.Path) {
t.Fatalf(`expecting one of %v, got %q`, oneOf, got.Node.Path)
}

dw.Stop()
}

func TestDirWatcher_withSymlink(t *testing.T) {
Expand Down

0 comments on commit 8b20855

Please sign in to comment.