Skip to content

Commit

Permalink
Add Shutdown method
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalif committed Oct 4, 2023
1 parent e14dcc8 commit bee8a9b
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 9 deletions.
35 changes: 27 additions & 8 deletions internal/goofys.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Goofys struct {
bufferPool *BufferPool
wantFree int32

shutdown int32
shutdownCh chan struct{}

// A lock protecting the state of the file system struct itself (distinct
// from per-inode locks). Should be always taken after any inode locks.
mu sync.RWMutex
Expand Down Expand Up @@ -286,6 +289,7 @@ func newGoofys(ctx context.Context, bucket string, flags *cfg.FlagStorage,
flags: flags,
umask: 0122,
lfru: NewLFRU(flags.CachePopularThreshold, flags.CacheMaxHits, flags.CacheAgeInterval, flags.CacheAgeDecrement),
shutdownCh: make(chan struct{}),
zeroBuf: make([]byte, 1048576),
inflightChanges: make(map[string]int),
inflightListings: make(map[int]map[string]bool),
Expand Down Expand Up @@ -360,8 +364,6 @@ func newGoofys(ctx context.Context, bucket string, flags *cfg.FlagStorage,

fs.fileHandles = make(map[fuseops.HandleID]*FileHandle)

// FIXME: Add Shutdown() method to cleanup these goroutines

fs.flusherCond = sync.NewCond(&fs.flusherMu)
go fs.Flusher()
if fs.flags.StatsInterval > 0 {
Expand All @@ -378,6 +380,15 @@ func newGoofys(ctx context.Context, bucket string, flags *cfg.FlagStorage,
return fs, nil
}

func (fs *Goofys) Shutdown() {
atomic.StoreInt32(&fs.shutdown, 1)
close(fs.shutdownCh)
fs.WakeupFlusher()
if fs.diskFdCond != nil {
fs.diskFdCond.Broadcast()
}
}

// from https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-golang
func RandStringBytesMaskImprSrc(n int) string {
const letterBytes = "abcdefghijklmnopqrstuvwxyz0123456789"
Expand Down Expand Up @@ -446,8 +457,12 @@ func (fs *Goofys) AddFileHandle(fh *FileHandle) fuseops.HandleID {
}

func (fs *Goofys) StatPrinter() {
for {
time.Sleep(fs.flags.StatsInterval)
for atomic.LoadInt32(&fs.shutdown) == 0 {
select {
case <-time.After(fs.flags.StatsInterval):
case <-fs.shutdownCh:
return
}
now := time.Now()
d := now.Sub(fs.stats.ts).Seconds()
reads := atomic.SwapInt64(&fs.stats.reads, 0)
Expand Down Expand Up @@ -484,7 +499,7 @@ func (fs *Goofys) StatPrinter() {
// Close unneeded cache FDs
func (fs *Goofys) FDCloser() {
fs.diskFdMu.Lock()
for {
for atomic.LoadInt32(&fs.shutdown) == 0 {
rmFdItem := fs.lfru.Pick(nil)
for fs.flags.MaxDiskCacheFD > 0 && fs.diskFdCount > fs.flags.MaxDiskCacheFD && rmFdItem != nil {
fs.diskFdMu.Unlock()
Expand Down Expand Up @@ -702,7 +717,7 @@ func (fs *Goofys) ScheduleRetryFlush() {
func (fs *Goofys) Flusher() {
var inodes []fuseops.InodeID
again := false
for {
for atomic.LoadInt32(&fs.shutdown) == 0 {
if !again {
fs.flusherMu.Lock()
if fs.flushPending == 0 {
Expand Down Expand Up @@ -805,9 +820,13 @@ func (fs *Goofys) EvictEntry(id fuseops.InodeID) bool {
func (fs *Goofys) MetaEvictor() {
retry := false
var seen map[fuseops.InodeID]bool
for {
for atomic.LoadInt32(&fs.shutdown) == 0 {
if !retry {
time.Sleep(1 * time.Second)
select {
case <-time.After(1 * time.Second):
case <-fs.shutdownCh:
return
}
seen = make(map[fuseops.InodeID]bool)
}
// Try to keep the number of cached inodes under control %)
Expand Down
1 change: 1 addition & 0 deletions internal/goofys_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func (s *GoofysTest) TearDownTest(t *C) {

if s.fs != nil {
s.fs.SyncFS(nil)
s.fs.Shutdown()
}

for _, cloud := range s.removeBucket {
Expand Down
6 changes: 5 additions & 1 deletion internal/goofys_fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,11 +857,14 @@ func (fs *GoofysFuse) SetConnection(conn *fuse.Connection) {

type FuseMfsWrapper struct {
*fuse.MountedFileSystem
fs *Goofys
mountPoint string
}

func (m *FuseMfsWrapper) Unmount() error {
return TryUnmount(m.mountPoint)
err := TryUnmount(m.mountPoint)
m.fs.Shutdown()
return err
}

func convertFuseOptions(flags *cfg.FlagStorage) map[string]string {
Expand Down Expand Up @@ -933,6 +936,7 @@ func mountFuseFS(fs *Goofys) (mfs MountedFS, err error) {

mfs = &FuseMfsWrapper{
MountedFileSystem: fuseMfs,
fs: fs,
mountPoint: fs.flags.MountPoint,
}

Expand Down
4 changes: 4 additions & 0 deletions internal/goofys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,10 @@ func (s *GoofysTest) TestPutMimeType(t *C) {
}

func (s *GoofysTest) TestBucketPrefixSlash(t *C) {
s.fs.Shutdown()

s.fs, _ = NewGoofys(context.Background(), s.fs.bucket+":dir2", s.fs.flags)
defer s.fs.Shutdown()
t.Assert(s.getRoot(t).dir.mountPrefix, Equals, "dir2/")

s.fs, _ = NewGoofys(context.Background(), s.fs.bucket+":dir2///", s.fs.flags)
Expand Down Expand Up @@ -1363,6 +1366,7 @@ func (s *GoofysTest) anonymous(t *C) {
t.Skip("cloud does not support canned ACL")
}

s.fs.Shutdown()
s.fs, _ = NewGoofys(context.Background(), bucket, s.fs.flags)
t.Assert(s.fs, NotNil)

Expand Down
1 change: 1 addition & 0 deletions internal/goofys_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (s *GoofysTest) TestIssue231(t *C) {
func (s *GoofysTest) TestFuseWithPrefix(t *C) {
mountPoint := s.tmp + "/mnt" + s.fs.bucket

s.fs.Shutdown()
s.fs, _ = NewGoofys(context.Background(), s.fs.bucket+":testprefix", s.fs.flags)

s.runFuseTest(t, mountPoint, true, s.tmp+"/fuse-test.sh", mountPoint)
Expand Down
1 change: 1 addition & 0 deletions internal/goofys_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,5 +1028,6 @@ func (fs *GoofysWin) Unmount() error {
if !r {
return fmt.Errorf("unmounting failed")
}
fs.Shutdown()
return nil
}

0 comments on commit bee8a9b

Please sign in to comment.