Skip to content

Commit

Permalink
Use poll based file watcher
Browse files Browse the repository at this point in the history
fsnotify/fsnotify can't watch a folder that contains a symlink into
a socket or named pipe.

Use poll-based mechanism to watch the file for the time being until
we find a better way or fix the issue in the upstream.

Signed-off-by: Fata Nugraha <[email protected]>
  • Loading branch information
fatanugraha committed Jan 22, 2025
1 parent 31b50f3 commit 2af050d
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 120 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/containers/winquit v1.1.0
github.com/coreos/stream-metadata-go v0.4.5
github.com/dustin/go-humanize v1.0.1
github.com/fsnotify/fsnotify v1.8.0
github.com/google/gopacket v1.1.19
github.com/insomniacslk/dhcp v0.0.0-20240710054256-ddd8a41251c9
github.com/linuxkit/virtsock v0.0.0-20220523201153-1a23e78aa7a2
Expand All @@ -26,13 +25,15 @@ require (
github.com/stretchr/testify v1.10.0
github.com/vishvananda/netlink v1.3.0
golang.org/x/crypto v0.32.0
golang.org/x/mod v0.22.0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.29.0
gvisor.dev/gvisor v0.0.0-20240916094835-a174eb65023f
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/mdlayher/socket v0.4.1 // indirect
Expand All @@ -43,7 +44,6 @@ require (
github.com/u-root/uio v0.0.0-20240224005618-d2acac8f3701 // indirect
github.com/vishvananda/netns v0.0.4 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.5.0 // indirect
Expand Down
10 changes: 2 additions & 8 deletions pkg/services/dns/dns_config_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,8 @@ func (r *dnsConfig) init() error {
return err
}

w, err := utils.NewFileWatcher(etcResolvConfPath)
if err != nil {
return err
}

if err := w.Start(func() { _ = r.refreshNameservers() }); err != nil {
return err
}
w := utils.NewFileWatcher(etcResolvConfPath)
w.Start(func() { _ = r.refreshNameservers() })

return nil
}
Expand Down
13 changes: 2 additions & 11 deletions pkg/services/dns/hosts_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,8 @@ func NewHostsFile(hostsPath string) (*HostsFile, error) {
}

func (h *HostsFile) startWatch() error {
watcher, err := utils.NewFileWatcher(h.hostsFilePath)
if err != nil {
log.Errorf("Hosts file adding watcher error: %s", err)
return err
}

if err := watcher.Start(h.updateHostsFile); err != nil {
log.Errorf("Hosts file adding watcher error: %s", err)
return err
}

watcher := utils.NewFileWatcher(h.hostsFilePath)
watcher.Start(h.updateHostsFile)
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/services/dns/hosts_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ func TestHostsFile(t *testing.T) {
}

func TestReloadingHostsFile(t *testing.T) {
t.Parallel()

hostsFile := filepath.Join(t.TempDir(), "hosts")
assert.NoError(t, os.WriteFile(hostsFile, []byte(`127.0.0.1 entry1`), 0600))

hosts, err := NewHostsFile(hostsFile)
time.Sleep(time.Second)
time.Sleep(6 * time.Second)
assert.NoError(t, err)
ip, err := hosts.LookupByHostname("entry1")
assert.NoError(t, err)
assert.Equal(t, "127.0.0.1", ip.String())

assert.NoError(t, os.WriteFile(hostsFile, []byte(`127.0.0.1 entry2 foobar`), 0600))
time.Sleep(time.Second)
time.Sleep(6 * time.Second)

ipBar, err := hosts.LookupByHostname("foobar")
assert.NoError(t, err)
Expand Down
79 changes: 29 additions & 50 deletions pkg/utils/filewatcher.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,63 @@
package utils

import (
"fmt"
"path/filepath"
"os"
"time"

"github.com/fsnotify/fsnotify"
)

// FileWatcher is an utility that
type FileWatcher struct {
w *fsnotify.Watcher
path string

writeGracePeriod time.Duration
timer *time.Timer
closeCh chan struct{}
pollInterval time.Duration
}

func NewFileWatcher(path string) (*FileWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
func NewFileWatcher(path string) *FileWatcher {
return &FileWatcher{
path: path,
pollInterval: 5 * time.Second, // 5s is the default inode cache timeout in linux.
closeCh: make(chan struct{}),
}

return &FileWatcher{w: watcher, path: path, writeGracePeriod: 200 * time.Millisecond}, nil
}

func (fw *FileWatcher) Start(changeHandler func()) error {
// Ensure that the target that we're watching is not a symlink as we won't get any events when we're watching
// a symlink.
fileRealPath, err := filepath.EvalSymlinks(fw.path)
if err != nil {
return fmt.Errorf("adding watcher failed: %s", err)
}

// watch the directory instead of the individual file to ensure the notification still works when the file is modified
// through moving/renaming rather than writing into it directly (like what most modern editor does by default).
// ref: https://github.com/fsnotify/fsnotify/blob/a9bc2e01792f868516acf80817f7d7d7b3315409/README.md#watching-a-file-doesnt-work-well
if err = fw.w.Add(filepath.Dir(fileRealPath)); err != nil {
return fmt.Errorf("adding watcher failed: %s", err)
}
func (fw *FileWatcher) Start(changeHandler func()) {
prevModTime := fw.fileModTime(fw.path)

// use polling-based approach to detect file changes
// we can't use fsnotify/fsnotify due to issues with symlink+socket. see #462.
go func() {
for {
select {
case _, ok := <-fw.w.Errors:
if !ok {
return // watcher is closed.
}
case event, ok := <-fw.w.Events:
case _, ok := <-fw.closeCh:
if !ok {
return // watcher is closed.
}
case <-time.After(fw.pollInterval):
}

if event.Name != fileRealPath {
continue // we don't care about this file.
}
modTime := fw.fileModTime(fw.path)
if modTime.IsZero() {
continue // file does not exists
}

// Create may not always followed by Write e.g. when we replace the file with mv.
if event.Op.Has(fsnotify.Create) || event.Op.Has(fsnotify.Write) {
// as per the documentation, receiving Write does not mean that the write is finished.
// we try our best here to ignore "unfinished" write by assuming that after [writeGracePeriod] of
// inactivity the write has been finished.
fw.debounce(changeHandler)
}
if !prevModTime.Equal(modTime) {
changeHandler()
prevModTime = modTime
}
}
}()

return nil
}

func (fw *FileWatcher) debounce(fn func()) {
if fw.timer != nil {
fw.timer.Stop()
func (fw *FileWatcher) fileModTime(path string) time.Time {
info, err := os.Stat(path)
if err != nil {
return time.Time{}
}

fw.timer = time.AfterFunc(fw.writeGracePeriod, fn)
return info.ModTime()
}

func (fw *FileWatcher) Stop() error {
return fw.w.Close()
func (fw *FileWatcher) Stop() {
close(fw.closeCh)
}
123 changes: 76 additions & 47 deletions pkg/utils/filewatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,57 +11,86 @@ import (
)

func TestFileWatcher(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "file.txt")
_ = os.WriteFile(path, []byte("1"), 0o600)

fw, err := NewFileWatcher(path)
fw.writeGracePeriod = 50 * time.Millisecond // reduce the delay to make the test runs faster.
assert.NoError(t, err)
_ = fw.w.Add(path)

var numTriggered atomic.Int64
assertNumTriggered := func(expected int) {
time.Sleep(fw.writeGracePeriod + 50*time.Millisecond)
assert.Equal(t, int64(expected), numTriggered.Load())
if testing.Short() {
// we can't really speed up the test as we need to wait for the
// inode cache to expire so that we can read the latest
// file's modtime.
t.Skip("skipping test in short mode.")
}
t.Parallel()

_ = fw.Start(func() {
numTriggered.Add(1)
})
assertSpec := func(t *testing.T, watchedPath, filePath string) {
t.Helper()

// CASE: can detect changes to the file.
if err := os.WriteFile(path, []byte("2"), 0o600); err != nil {
panic(err)
}
assertNumTriggered(1)
fw := NewFileWatcher(watchedPath)

// CASE: can detect "swap"-based file modification.
tmpFile := filepath.Join(dir, "tmp.txt")
if err := os.WriteFile(tmpFile, []byte("lol"), 0o600); err != nil {
panic(err)
}
if err := os.Rename(tmpFile, path); err != nil {
panic(err)
}
assertNumTriggered(2)
var numTriggered atomic.Int64
assertNumTriggered := func(expected int) {
time.Sleep(fw.pollInterval + 200*time.Millisecond)
assert.Equal(t, int64(expected), numTriggered.Load())
}

fw.Start(func() {
numTriggered.Add(1)
})

// CASE: can detect changes to the file.
if err := os.WriteFile(filePath, []byte("2"), 0o600); err != nil {
panic(err)
}
assertNumTriggered(1)

// CASE: can detect "swap"-based file modification.
tmpFile := filepath.Join(filepath.Dir(filePath), "tmp.txt")
if err := os.WriteFile(tmpFile, []byte("lol"), 0o600); err != nil {
panic(err)
}
if err := os.Rename(tmpFile, filePath); err != nil {
panic(err)
}
assertNumTriggered(2)

// CASE: combine multiple partial writes into single event.
fd, err := os.OpenFile(filePath, os.O_WRONLY|os.O_TRUNC, 0o600)
if err != nil {
panic(err)
}
// we assume these writes happens in less than 50ms.
_, _ = fd.Write([]byte("a"))
_ = fd.Sync()
_, _ = fd.Write([]byte("b"))
fd.Close()
assertNumTriggered(3)

// CASE: closed file watcher should not call the callback after Stop() is called.
fw.Stop()
if err := os.WriteFile(filePath, []byte("2"), 0o600); err != nil {
panic(err)
}
assertNumTriggered(3) // does not change.

// CASE: combine multiple partial writes into single event.
fd, err := os.OpenFile(path, os.O_WRONLY|os.O_TRUNC, 0o600)
if err != nil {
panic(err)
}
// we assume these writes happens in less than 50ms.
_, _ = fd.Write([]byte("a"))
_ = fd.Sync()
_, _ = fd.Write([]byte("b"))
fd.Close()
assertNumTriggered(3)

// CASE: closed file watcher should not call the callback after Stop() is called.
assert.NoError(t, fw.Stop())
if err := os.WriteFile(path, []byte("2"), 0o600); err != nil {
panic(err)
}
assertNumTriggered(3) // does not change.

t.Run("normal file", func(t *testing.T) {
t.Parallel()

dir := t.TempDir()
path := filepath.Join(dir, "file.txt")
_ = os.WriteFile(path, []byte("1"), 0o600)

assertSpec(t, path, path)
})

t.Run("symlink", func(t *testing.T) {
t.Parallel()

dir := t.TempDir()
path := filepath.Join(dir, "file.txt")
_ = os.WriteFile(path, []byte("1"), 0o600)

symlinkPath := filepath.Join(dir, "symlink.txt")
os.Symlink(path, symlinkPath)

Check failure on line 92 in pkg/utils/filewatcher_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `os.Symlink` is not checked (errcheck)

assertSpec(t, symlinkPath, path)
})
}

0 comments on commit 2af050d

Please sign in to comment.