From 2af050df6b5fba0d89a7347b22dc77292b0a98ca Mon Sep 17 00:00:00 2001 From: Fata Nugraha Date: Wed, 22 Jan 2025 00:21:52 +0800 Subject: [PATCH] Use poll based file watcher fsnotify/fsnotify can't watch a folder that contains a symlink into a socket or named pipe. Use poll-based mechanism to watch the file for the time being until we find a better way or fix the issue in the upstream. Signed-off-by: Fata Nugraha --- go.mod | 4 +- pkg/services/dns/dns_config_unix.go | 10 +-- pkg/services/dns/hosts_file.go | 13 +-- pkg/services/dns/hosts_file_test.go | 6 +- pkg/utils/filewatcher.go | 79 +++++++----------- pkg/utils/filewatcher_test.go | 123 +++++++++++++++++----------- 6 files changed, 115 insertions(+), 120 deletions(-) diff --git a/go.mod b/go.mod index ad35ef70..799479b9 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/containers/winquit v1.1.0 github.com/coreos/stream-metadata-go v0.4.5 github.com/dustin/go-humanize v1.0.1 - github.com/fsnotify/fsnotify v1.8.0 github.com/google/gopacket v1.1.19 github.com/insomniacslk/dhcp v0.0.0-20240710054256-ddd8a41251c9 github.com/linuxkit/virtsock v0.0.0-20220523201153-1a23e78aa7a2 @@ -26,6 +25,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/vishvananda/netlink v1.3.0 golang.org/x/crypto v0.32.0 + golang.org/x/mod v0.22.0 golang.org/x/sync v0.10.0 golang.org/x/sys v0.29.0 gvisor.dev/gvisor v0.0.0-20240916094835-a174eb65023f @@ -33,6 +33,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/google/btree v1.1.2 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/mdlayher/socket v0.4.1 // indirect @@ -43,7 +44,6 @@ require ( github.com/u-root/uio v0.0.0-20240224005618-d2acac8f3701 // indirect github.com/vishvananda/netns v0.0.4 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect - golang.org/x/mod v0.22.0 // indirect golang.org/x/net v0.33.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/pkg/services/dns/dns_config_unix.go b/pkg/services/dns/dns_config_unix.go index c1e90487..5fdf3659 100644 --- a/pkg/services/dns/dns_config_unix.go +++ b/pkg/services/dns/dns_config_unix.go @@ -16,14 +16,8 @@ func (r *dnsConfig) init() error { return err } - w, err := utils.NewFileWatcher(etcResolvConfPath) - if err != nil { - return err - } - - if err := w.Start(func() { _ = r.refreshNameservers() }); err != nil { - return err - } + w := utils.NewFileWatcher(etcResolvConfPath) + w.Start(func() { _ = r.refreshNameservers() }) return nil } diff --git a/pkg/services/dns/hosts_file.go b/pkg/services/dns/hosts_file.go index 106129ee..23c528e9 100644 --- a/pkg/services/dns/hosts_file.go +++ b/pkg/services/dns/hosts_file.go @@ -35,17 +35,8 @@ func NewHostsFile(hostsPath string) (*HostsFile, error) { } func (h *HostsFile) startWatch() error { - watcher, err := utils.NewFileWatcher(h.hostsFilePath) - if err != nil { - log.Errorf("Hosts file adding watcher error: %s", err) - return err - } - - if err := watcher.Start(h.updateHostsFile); err != nil { - log.Errorf("Hosts file adding watcher error: %s", err) - return err - } - + watcher := utils.NewFileWatcher(h.hostsFilePath) + watcher.Start(h.updateHostsFile) return nil } diff --git a/pkg/services/dns/hosts_file_test.go b/pkg/services/dns/hosts_file_test.go index 4d2f6991..9eb88c94 100644 --- a/pkg/services/dns/hosts_file_test.go +++ b/pkg/services/dns/hosts_file_test.go @@ -21,18 +21,20 @@ func TestHostsFile(t *testing.T) { } func TestReloadingHostsFile(t *testing.T) { + t.Parallel() + hostsFile := filepath.Join(t.TempDir(), "hosts") assert.NoError(t, os.WriteFile(hostsFile, []byte(`127.0.0.1 entry1`), 0600)) hosts, err := NewHostsFile(hostsFile) - time.Sleep(time.Second) + time.Sleep(6 * time.Second) assert.NoError(t, err) ip, err := hosts.LookupByHostname("entry1") assert.NoError(t, err) assert.Equal(t, "127.0.0.1", ip.String()) assert.NoError(t, os.WriteFile(hostsFile, []byte(`127.0.0.1 entry2 foobar`), 0600)) - time.Sleep(time.Second) + time.Sleep(6 * time.Second) ipBar, err := hosts.LookupByHostname("foobar") assert.NoError(t, err) diff --git a/pkg/utils/filewatcher.go b/pkg/utils/filewatcher.go index 0993d38e..4f41ee7c 100644 --- a/pkg/utils/filewatcher.go +++ b/pkg/utils/filewatcher.go @@ -1,84 +1,63 @@ package utils import ( - "fmt" - "path/filepath" + "os" "time" - - "github.com/fsnotify/fsnotify" ) // FileWatcher is an utility that type FileWatcher struct { - w *fsnotify.Watcher path string - writeGracePeriod time.Duration - timer *time.Timer + closeCh chan struct{} + pollInterval time.Duration } -func NewFileWatcher(path string) (*FileWatcher, error) { - watcher, err := fsnotify.NewWatcher() - if err != nil { - return nil, err +func NewFileWatcher(path string) *FileWatcher { + return &FileWatcher{ + path: path, + pollInterval: 5 * time.Second, // 5s is the default inode cache timeout in linux. + closeCh: make(chan struct{}), } - - return &FileWatcher{w: watcher, path: path, writeGracePeriod: 200 * time.Millisecond}, nil } -func (fw *FileWatcher) Start(changeHandler func()) error { - // Ensure that the target that we're watching is not a symlink as we won't get any events when we're watching - // a symlink. - fileRealPath, err := filepath.EvalSymlinks(fw.path) - if err != nil { - return fmt.Errorf("adding watcher failed: %s", err) - } - - // watch the directory instead of the individual file to ensure the notification still works when the file is modified - // through moving/renaming rather than writing into it directly (like what most modern editor does by default). - // ref: https://github.com/fsnotify/fsnotify/blob/a9bc2e01792f868516acf80817f7d7d7b3315409/README.md#watching-a-file-doesnt-work-well - if err = fw.w.Add(filepath.Dir(fileRealPath)); err != nil { - return fmt.Errorf("adding watcher failed: %s", err) - } +func (fw *FileWatcher) Start(changeHandler func()) { + prevModTime := fw.fileModTime(fw.path) + // use polling-based approach to detect file changes + // we can't use fsnotify/fsnotify due to issues with symlink+socket. see #462. go func() { for { select { - case _, ok := <-fw.w.Errors: - if !ok { - return // watcher is closed. - } - case event, ok := <-fw.w.Events: + case _, ok := <-fw.closeCh: if !ok { return // watcher is closed. } + case <-time.After(fw.pollInterval): + } - if event.Name != fileRealPath { - continue // we don't care about this file. - } + modTime := fw.fileModTime(fw.path) + if modTime.IsZero() { + continue // file does not exists + } - // Create may not always followed by Write e.g. when we replace the file with mv. - if event.Op.Has(fsnotify.Create) || event.Op.Has(fsnotify.Write) { - // as per the documentation, receiving Write does not mean that the write is finished. - // we try our best here to ignore "unfinished" write by assuming that after [writeGracePeriod] of - // inactivity the write has been finished. - fw.debounce(changeHandler) - } + if !prevModTime.Equal(modTime) { + changeHandler() + prevModTime = modTime } } }() - - return nil } -func (fw *FileWatcher) debounce(fn func()) { - if fw.timer != nil { - fw.timer.Stop() +func (fw *FileWatcher) fileModTime(path string) time.Time { + info, err := os.Stat(path) + if err != nil { + return time.Time{} } - fw.timer = time.AfterFunc(fw.writeGracePeriod, fn) + return info.ModTime() } -func (fw *FileWatcher) Stop() error { - return fw.w.Close() +func (fw *FileWatcher) Stop() { + close(fw.closeCh) } diff --git a/pkg/utils/filewatcher_test.go b/pkg/utils/filewatcher_test.go index a13950b9..20a8990a 100644 --- a/pkg/utils/filewatcher_test.go +++ b/pkg/utils/filewatcher_test.go @@ -11,57 +11,86 @@ import ( ) func TestFileWatcher(t *testing.T) { - dir := t.TempDir() - path := filepath.Join(dir, "file.txt") - _ = os.WriteFile(path, []byte("1"), 0o600) - - fw, err := NewFileWatcher(path) - fw.writeGracePeriod = 50 * time.Millisecond // reduce the delay to make the test runs faster. - assert.NoError(t, err) - _ = fw.w.Add(path) - - var numTriggered atomic.Int64 - assertNumTriggered := func(expected int) { - time.Sleep(fw.writeGracePeriod + 50*time.Millisecond) - assert.Equal(t, int64(expected), numTriggered.Load()) + if testing.Short() { + // we can't really speed up the test as we need to wait for the + // inode cache to expire so that we can read the latest + // file's modtime. + t.Skip("skipping test in short mode.") } + t.Parallel() - _ = fw.Start(func() { - numTriggered.Add(1) - }) + assertSpec := func(t *testing.T, watchedPath, filePath string) { + t.Helper() - // CASE: can detect changes to the file. - if err := os.WriteFile(path, []byte("2"), 0o600); err != nil { - panic(err) - } - assertNumTriggered(1) + fw := NewFileWatcher(watchedPath) - // CASE: can detect "swap"-based file modification. - tmpFile := filepath.Join(dir, "tmp.txt") - if err := os.WriteFile(tmpFile, []byte("lol"), 0o600); err != nil { - panic(err) - } - if err := os.Rename(tmpFile, path); err != nil { - panic(err) - } - assertNumTriggered(2) + var numTriggered atomic.Int64 + assertNumTriggered := func(expected int) { + time.Sleep(fw.pollInterval + 200*time.Millisecond) + assert.Equal(t, int64(expected), numTriggered.Load()) + } + + fw.Start(func() { + numTriggered.Add(1) + }) + + // CASE: can detect changes to the file. + if err := os.WriteFile(filePath, []byte("2"), 0o600); err != nil { + panic(err) + } + assertNumTriggered(1) + + // CASE: can detect "swap"-based file modification. + tmpFile := filepath.Join(filepath.Dir(filePath), "tmp.txt") + if err := os.WriteFile(tmpFile, []byte("lol"), 0o600); err != nil { + panic(err) + } + if err := os.Rename(tmpFile, filePath); err != nil { + panic(err) + } + assertNumTriggered(2) + + // CASE: combine multiple partial writes into single event. + fd, err := os.OpenFile(filePath, os.O_WRONLY|os.O_TRUNC, 0o600) + if err != nil { + panic(err) + } + // we assume these writes happens in less than 50ms. + _, _ = fd.Write([]byte("a")) + _ = fd.Sync() + _, _ = fd.Write([]byte("b")) + fd.Close() + assertNumTriggered(3) + + // CASE: closed file watcher should not call the callback after Stop() is called. + fw.Stop() + if err := os.WriteFile(filePath, []byte("2"), 0o600); err != nil { + panic(err) + } + assertNumTriggered(3) // does not change. - // CASE: combine multiple partial writes into single event. - fd, err := os.OpenFile(path, os.O_WRONLY|os.O_TRUNC, 0o600) - if err != nil { - panic(err) - } - // we assume these writes happens in less than 50ms. - _, _ = fd.Write([]byte("a")) - _ = fd.Sync() - _, _ = fd.Write([]byte("b")) - fd.Close() - assertNumTriggered(3) - - // CASE: closed file watcher should not call the callback after Stop() is called. - assert.NoError(t, fw.Stop()) - if err := os.WriteFile(path, []byte("2"), 0o600); err != nil { - panic(err) } - assertNumTriggered(3) // does not change. + + t.Run("normal file", func(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + path := filepath.Join(dir, "file.txt") + _ = os.WriteFile(path, []byte("1"), 0o600) + + assertSpec(t, path, path) + }) + + t.Run("symlink", func(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + path := filepath.Join(dir, "file.txt") + _ = os.WriteFile(path, []byte("1"), 0o600) + + symlinkPath := filepath.Join(dir, "symlink.txt") + os.Symlink(path, symlinkPath) + + assertSpec(t, symlinkPath, path) + }) }