From bee8a9babaae36d39654fb86c18aa72c7452c6f7 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 5 Oct 2023 01:38:55 +0300 Subject: [PATCH] Add Shutdown method --- internal/goofys.go | 35 ++++++++++++++++++++++++++-------- internal/goofys_common_test.go | 1 + internal/goofys_fuse.go | 6 +++++- internal/goofys_test.go | 4 ++++ internal/goofys_unix_test.go | 1 + internal/goofys_windows.go | 1 + 6 files changed, 39 insertions(+), 9 deletions(-) diff --git a/internal/goofys.go b/internal/goofys.go index ca9ca843..de5281ab 100644 --- a/internal/goofys.go +++ b/internal/goofys.go @@ -59,6 +59,9 @@ type Goofys struct { bufferPool *BufferPool wantFree int32 + shutdown int32 + shutdownCh chan struct{} + // A lock protecting the state of the file system struct itself (distinct // from per-inode locks). Should be always taken after any inode locks. mu sync.RWMutex @@ -286,6 +289,7 @@ func newGoofys(ctx context.Context, bucket string, flags *cfg.FlagStorage, flags: flags, umask: 0122, lfru: NewLFRU(flags.CachePopularThreshold, flags.CacheMaxHits, flags.CacheAgeInterval, flags.CacheAgeDecrement), + shutdownCh: make(chan struct{}), zeroBuf: make([]byte, 1048576), inflightChanges: make(map[string]int), inflightListings: make(map[int]map[string]bool), @@ -360,8 +364,6 @@ func newGoofys(ctx context.Context, bucket string, flags *cfg.FlagStorage, fs.fileHandles = make(map[fuseops.HandleID]*FileHandle) - // FIXME: Add Shutdown() method to cleanup these goroutines - fs.flusherCond = sync.NewCond(&fs.flusherMu) go fs.Flusher() if fs.flags.StatsInterval > 0 { @@ -378,6 +380,15 @@ func newGoofys(ctx context.Context, bucket string, flags *cfg.FlagStorage, return fs, nil } +func (fs *Goofys) Shutdown() { + atomic.StoreInt32(&fs.shutdown, 1) + close(fs.shutdownCh) + fs.WakeupFlusher() + if fs.diskFdCond != nil { + fs.diskFdCond.Broadcast() + } +} + // from https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-golang func RandStringBytesMaskImprSrc(n int) string { const letterBytes = "abcdefghijklmnopqrstuvwxyz0123456789" @@ -446,8 +457,12 @@ func (fs *Goofys) AddFileHandle(fh *FileHandle) fuseops.HandleID { } func (fs *Goofys) StatPrinter() { - for { - time.Sleep(fs.flags.StatsInterval) + for atomic.LoadInt32(&fs.shutdown) == 0 { + select { + case <-time.After(fs.flags.StatsInterval): + case <-fs.shutdownCh: + return + } now := time.Now() d := now.Sub(fs.stats.ts).Seconds() reads := atomic.SwapInt64(&fs.stats.reads, 0) @@ -484,7 +499,7 @@ func (fs *Goofys) StatPrinter() { // Close unneeded cache FDs func (fs *Goofys) FDCloser() { fs.diskFdMu.Lock() - for { + for atomic.LoadInt32(&fs.shutdown) == 0 { rmFdItem := fs.lfru.Pick(nil) for fs.flags.MaxDiskCacheFD > 0 && fs.diskFdCount > fs.flags.MaxDiskCacheFD && rmFdItem != nil { fs.diskFdMu.Unlock() @@ -702,7 +717,7 @@ func (fs *Goofys) ScheduleRetryFlush() { func (fs *Goofys) Flusher() { var inodes []fuseops.InodeID again := false - for { + for atomic.LoadInt32(&fs.shutdown) == 0 { if !again { fs.flusherMu.Lock() if fs.flushPending == 0 { @@ -805,9 +820,13 @@ func (fs *Goofys) EvictEntry(id fuseops.InodeID) bool { func (fs *Goofys) MetaEvictor() { retry := false var seen map[fuseops.InodeID]bool - for { + for atomic.LoadInt32(&fs.shutdown) == 0 { if !retry { - time.Sleep(1 * time.Second) + select { + case <-time.After(1 * time.Second): + case <-fs.shutdownCh: + return + } seen = make(map[fuseops.InodeID]bool) } // Try to keep the number of cached inodes under control %) diff --git a/internal/goofys_common_test.go b/internal/goofys_common_test.go index f4dec1e9..6ee96b81 100644 --- a/internal/goofys_common_test.go +++ b/internal/goofys_common_test.go @@ -345,6 +345,7 @@ func (s *GoofysTest) TearDownTest(t *C) { if s.fs != nil { s.fs.SyncFS(nil) + s.fs.Shutdown() } for _, cloud := range s.removeBucket { diff --git a/internal/goofys_fuse.go b/internal/goofys_fuse.go index 6419ddec..91251747 100644 --- a/internal/goofys_fuse.go +++ b/internal/goofys_fuse.go @@ -857,11 +857,14 @@ func (fs *GoofysFuse) SetConnection(conn *fuse.Connection) { type FuseMfsWrapper struct { *fuse.MountedFileSystem + fs *Goofys mountPoint string } func (m *FuseMfsWrapper) Unmount() error { - return TryUnmount(m.mountPoint) + err := TryUnmount(m.mountPoint) + m.fs.Shutdown() + return err } func convertFuseOptions(flags *cfg.FlagStorage) map[string]string { @@ -933,6 +936,7 @@ func mountFuseFS(fs *Goofys) (mfs MountedFS, err error) { mfs = &FuseMfsWrapper{ MountedFileSystem: fuseMfs, + fs: fs, mountPoint: fs.flags.MountPoint, } diff --git a/internal/goofys_test.go b/internal/goofys_test.go index 42a03574..dbe7983e 100644 --- a/internal/goofys_test.go +++ b/internal/goofys_test.go @@ -1310,7 +1310,10 @@ func (s *GoofysTest) TestPutMimeType(t *C) { } func (s *GoofysTest) TestBucketPrefixSlash(t *C) { + s.fs.Shutdown() + s.fs, _ = NewGoofys(context.Background(), s.fs.bucket+":dir2", s.fs.flags) + defer s.fs.Shutdown() t.Assert(s.getRoot(t).dir.mountPrefix, Equals, "dir2/") s.fs, _ = NewGoofys(context.Background(), s.fs.bucket+":dir2///", s.fs.flags) @@ -1363,6 +1366,7 @@ func (s *GoofysTest) anonymous(t *C) { t.Skip("cloud does not support canned ACL") } + s.fs.Shutdown() s.fs, _ = NewGoofys(context.Background(), bucket, s.fs.flags) t.Assert(s.fs, NotNil) diff --git a/internal/goofys_unix_test.go b/internal/goofys_unix_test.go index bcb9aca7..2a69f2de 100644 --- a/internal/goofys_unix_test.go +++ b/internal/goofys_unix_test.go @@ -216,6 +216,7 @@ func (s *GoofysTest) TestIssue231(t *C) { func (s *GoofysTest) TestFuseWithPrefix(t *C) { mountPoint := s.tmp + "/mnt" + s.fs.bucket + s.fs.Shutdown() s.fs, _ = NewGoofys(context.Background(), s.fs.bucket+":testprefix", s.fs.flags) s.runFuseTest(t, mountPoint, true, s.tmp+"/fuse-test.sh", mountPoint) diff --git a/internal/goofys_windows.go b/internal/goofys_windows.go index 0284bfea..b064944f 100644 --- a/internal/goofys_windows.go +++ b/internal/goofys_windows.go @@ -1028,5 +1028,6 @@ func (fs *GoofysWin) Unmount() error { if !r { return fmt.Errorf("unmounting failed") } + fs.Shutdown() return nil }