Skip to content

Commit

Permalink
lib/memfs: protect internal [DirWatcher.fileWatcher] with mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
shuLhan committed Feb 3, 2024
1 parent a8b0e94 commit a92b471
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 80 deletions.
174 changes: 103 additions & 71 deletions lib/memfs/dirwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ type DirWatcher struct {

qrun chan struct{}

// The fs field initialized from Root if its nil.
fs *MemFS

// The root Node in fs.
root *Node
fs *MemFS

// dirs contains list of directory and their sub-directories that is
// being watched for changes.
// The map key is relative path to directory and its value is a node
// The map key is relative path to Root and its value is a Node
// information.
dirs map[string]*Node

Expand All @@ -59,38 +62,43 @@ type DirWatcher struct {

// Delay define a duration when the new changes will be fetched from
// system.
// This field is optional, minimum is 100 milli second and default is
// 5 seconds.
// This field is optional, minimum is 100 milli second and default
// is 5 seconds.
Delay time.Duration

// dirsLocker protect adding and removing key in [dirs].
dirsLocker sync.Mutex

// mtxFileWatcher protect adding and removing key in [fileWatcher].
mtxFileWatcher sync.Mutex
}

// init validate and initialized all fields.
// Once initialized the [DirWatcher.dirs] will contains all directories and
// [DirWatcher.fileWatcher] start watching all regular files.
func (dw *DirWatcher) init() (err error) {
var (
logp = "init"

fi fs.FileInfo
)
var logp = `init`

if dw.Delay < 100*time.Millisecond {
dw.Delay = defWatchDelay
}

if dw.fs == nil {
var fi fs.FileInfo

fi, err = os.Stat(dw.Root)
if err != nil {
return fmt.Errorf("%s: %w", logp, err)
return fmt.Errorf(`%s: %w`, logp, err)
}
if !fi.IsDir() {
return fmt.Errorf("%s: %q is not a directory", logp, dw.Root)
return fmt.Errorf(`%s: %q is not a directory`, logp, dw.Root)
}

dw.Options.MaxFileSize = -1

dw.fs, err = New(&dw.Options)
if err != nil {
return fmt.Errorf("%s: %w", logp, err)
return fmt.Errorf(`%s: %w`, logp, err)
}
}
dw.root = dw.fs.Root
Expand All @@ -111,13 +119,11 @@ func (dw *DirWatcher) init() (err error) {

// Start watching changes in directory and its content.
func (dw *DirWatcher) Start() (err error) {
var (
logp = "Start"
)
var logp = `Start`

err = dw.init()
if err != nil {
return fmt.Errorf("%s: %w", logp, err)
return fmt.Errorf(`%s: %w`, logp, err)
}

go dw.start()
Expand All @@ -127,11 +133,7 @@ func (dw *DirWatcher) Start() (err error) {

// Stop watching changes on directory.
func (dw *DirWatcher) Stop() {
// Stop all file watchers.
var watcher *Watcher
for _, watcher = range dw.fileWatcher {
watcher.Stop()
}
dw.stopAllFileWatcher()

select {
case dw.qrun <- struct{}{}:
Expand All @@ -154,16 +156,15 @@ func (dw *DirWatcher) dirsKeys() (keys []string) {
return keys
}

// mapSubdirs iterate each child node recursively and map any sub
// directories into mapSubdirs.
// If its a regular file, start a NewWatcher.
// mapSubdirs iterate each child node recursively and map directories into
// [DirWatcher.dirs].
// If its a regular file, start a new file [Watcher].
func (dw *DirWatcher) mapSubdirs(node *Node) {
var (
logp = `DirWatcher.mapSubdirs`
logp = `mapSubdirs`

child *Node
watcher *Watcher
err error
child *Node
err error
)

for _, child = range node.Childs {
Expand All @@ -174,12 +175,10 @@ func (dw *DirWatcher) mapSubdirs(node *Node) {
dw.mapSubdirs(child)
continue
}
watcher, err = newWatcher(node, child, dw.Delay, dw.qFileChanges)
err = dw.startWatchingFile(node, child)
if err != nil {
log.Printf("%s %q: %s", logp, child.SysPath, err)
continue
log.Printf(`%s: %s`, logp, err)
}
dw.fileWatcher[child.Path] = watcher
}
}

Expand All @@ -190,15 +189,7 @@ func (dw *DirWatcher) onCreated(parent, child *Node) (err error) {
dw.dirs[child.Path] = child
dw.dirsLocker.Unlock()
} else {
// Start watching the file for modification.
var watcher *Watcher

watcher, err = newWatcher(parent, child, dw.Delay, dw.qFileChanges)
if err != nil {
return fmt.Errorf(`onCreated: %w`, err)
}

dw.fileWatcher[child.Path] = watcher
dw.startWatchingFile(parent, child)
}

var ns = NodeState{
Expand Down Expand Up @@ -242,13 +233,7 @@ func (dw *DirWatcher) onDirDeleted(node *Node) {
}

func (dw *DirWatcher) onFileDeleted(node *Node) {
var watcher = dw.fileWatcher[node.Path]
if watcher != nil {
watcher.Stop()
delete(dw.fileWatcher, node.Path)
}

dw.fs.RemoveChild(node.Parent, node)
dw.stopWatchingFile(node)

var ns = NodeState{
State: FileStateDeleted,
Expand Down Expand Up @@ -372,16 +357,17 @@ func (dw *DirWatcher) onRootCreated() {
dw.mapSubdirs(dw.root)
}

// onRootDeleted handle change when the root directory that we watch get
// deleted. It will send deleted event and unmount the root directory from
// memory.
// onRootDeleted handle change when the [DirWatcher.Options.Root] directory
// deleted.
// It will send the [FileStateDeleted] event and unmount the root directory
// from memory.
func (dw *DirWatcher) onRootDeleted() {
var (
ns = NodeState{
Node: *dw.root,
State: FileStateDeleted,
}
)
var ns = NodeState{
Node: *dw.root,
State: FileStateDeleted,
}

dw.stopAllFileWatcher()

dw.fs = nil
dw.root = nil
Expand Down Expand Up @@ -459,27 +445,26 @@ func (dw *DirWatcher) onUpdateMode(node *Node, newInfo os.FileInfo) {

func (dw *DirWatcher) start() {
var (
logp = "DirWatcher"
logp = `DirWatcher`
ticker = time.NewTicker(dw.Delay)
ever = true

node *Node
fi os.FileInfo
ns NodeState
err error
ns NodeState
err error
)

for ever {
for {
select {
case <-ticker.C:
var fi os.FileInfo

fi, err = os.Stat(dw.Root)
if err != nil {
if !os.IsNotExist(err) {
log.Printf("%s: %s", logp, err)
continue
}
if dw.fs != nil {
dw.onRootDeleted()
if os.IsNotExist(err) {
if dw.fs != nil {
dw.onRootDeleted()
}
} else {
log.Printf(`%s: %s`, logp, err)
}
continue
}
Expand All @@ -497,6 +482,8 @@ func (dw *DirWatcher) start() {
dw.processSubdirs()

case ns = <-dw.qFileChanges:
var node *Node

node, err = dw.fs.Get(ns.Node.Path)
if err != nil {
log.Printf("%s: on file changes %s: %s", logp, ns.Node.Path, err)
Expand All @@ -514,14 +501,59 @@ func (dw *DirWatcher) start() {
}

case <-dw.qrun:
ever = false
ticker.Stop()
// Signal back to the Stop caller.
dw.qrun <- struct{}{}
return
}
}
}

func (dw *DirWatcher) startWatchingFile(parent, child *Node) (err error) {
var (
logp = `startWatchingFile`
watcher *Watcher
)

watcher, err = newWatcher(parent, child, dw.Delay, dw.qFileChanges)
if err != nil {
return fmt.Errorf(`%s %q: %s`, logp, child.SysPath, err)
}

dw.mtxFileWatcher.Lock()
dw.fileWatcher[child.Path] = watcher
dw.mtxFileWatcher.Unlock()

return nil
}

func (dw *DirWatcher) stopAllFileWatcher() {
var watcher *Watcher

dw.mtxFileWatcher.Lock()

for _, watcher = range dw.fileWatcher {
watcher.Stop()
dw.fs.RemoveChild(watcher.node.Parent, watcher.node)
}
dw.fileWatcher = map[string]*Watcher{}

dw.mtxFileWatcher.Unlock()
}

func (dw *DirWatcher) stopWatchingFile(node *Node) {
dw.mtxFileWatcher.Lock()

var watcher = dw.fileWatcher[node.Path]
if watcher != nil {
watcher.Stop()
delete(dw.fileWatcher, node.Path)
dw.fs.RemoveChild(node.Parent, node)
}

dw.mtxFileWatcher.Unlock()
}

func (dw *DirWatcher) processSubdirs() {
var (
logp = `processSubdirs`
Expand Down
14 changes: 5 additions & 9 deletions lib/memfs/dirwatcher_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

func ExampleDirWatcher() {
var (
ns memfs.NodeState
rootDir string
err error
)
Expand All @@ -26,9 +25,9 @@ func ExampleDirWatcher() {
log.Fatal(err)
}

// In this example, we watch sub directory "assets" and its contents,
// include only file with .adoc extension and ignoring files with
// .html extension.
// In this example, we watch sub directory "assets" and its
// contents, including only files with ".adoc" extension and
// excluding files with ".html" extension.
var dw = &memfs.DirWatcher{
Options: memfs.Options{
Root: rootDir,
Expand All @@ -48,16 +47,13 @@ func ExampleDirWatcher() {
log.Fatal(err)
}

// Add delay for goroutine to catch up and modtime to changes.
// We try with 100ms but sometimes it stuck on the first <-dw.C.
time.Sleep(200 * time.Millisecond)

fmt.Println(`Deleting the root directory:`)
err = os.Remove(rootDir)
if err != nil {
log.Fatal(err)
}
ns = <-dw.C

var ns = <-dw.C
fmt.Println(`--`, ns.State, ns.Node.Path)

// Create the root directory back with sub directory
Expand Down

0 comments on commit a92b471

Please sign in to comment.