From 59471a73295ac7a430777ff7c40c9a67801fe588 Mon Sep 17 00:00:00 2001 From: Anton Kalpakchiev Date: Tue, 26 Nov 2024 01:40:55 +0100 Subject: [PATCH] Add ability for aggro cache cleanup to only consider Kraken's disk usage --- lib/store/cleanup.go | 130 +++++++++++------- lib/store/cleanup_test.go | 153 ++++++++++++++++------ utils/diskspaceutil/diskspaceutil.go | 16 ++- utils/diskspaceutil/diskspaceutil_test.go | 19 ++- 4 files changed, 225 insertions(+), 93 deletions(-) diff --git a/lib/store/cleanup.go b/lib/store/cleanup.go index c85668aad..2e8d9f10e 100644 --- a/lib/store/cleanup.go +++ b/lib/store/cleanup.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -16,6 +16,7 @@ package store import ( "fmt" "os" + "strconv" "sync" "time" @@ -30,19 +31,15 @@ import ( // CleanupConfig defines configuration for periodically cleaning up idle files. type CleanupConfig struct { - Disabled bool `yaml:"disabled"` - Interval time.Duration `yaml:"interval"` // How often cleanup runs. - TTI time.Duration `yaml:"tti"` // Time to idle based on last access time. - TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL. - AggressiveThreshold int `yaml:"aggressive_threshold"` // The disk util threshold to trigger aggressive cleanup. If 0, disables aggressive cleanup. - AggressiveTTL time.Duration `yaml:"aggressive_ttL"` // Time to live regardless of access if aggressive cleanup is triggered. + Disabled bool `yaml:"disabled"` + Interval time.Duration `yaml:"interval"` // How often cleanup runs. + TTI time.Duration `yaml:"tti"` // Time to idle based on last access time. + TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL. + ExcludeOtherServices bool `yaml:"exclude_other_services"` // Whether to exclude other services from the disk util calculation. + AggressiveThreshold int `yaml:"aggressive_threshold"` // The disk util threshold to trigger aggressive cleanup. If 0, disables aggressive cleanup. + AggressiveTTL time.Duration `yaml:"aggressive_ttL"` // Time to live regardless of access if aggressive cleanup is triggered. } -type ( - // Define a func type for mocking diskSpaceUtil function. - diskSpaceUtilFunc func() (int, error) -) - func (c CleanupConfig) applyDefaults() CleanupConfig { if c.Interval == 0 { c.Interval = 30 * time.Minute @@ -87,6 +84,7 @@ func newCleanupManager(clk clock.Clock, stats tally.Scope) (*cleanupManager, err // on the settings in config. op must set the desired states to clean before addJob // is called. func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp) { + jobStats := m.stats.Tagged(map[string]string{"job": tag}) config = config.applyDefaults() if config.Disabled { log.Warnf("Cleanup disabled for %s", op) @@ -95,26 +93,17 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp if config.TTL == 0 { log.Warnf("TTL disabled for %s", op) } - if config.AggressiveThreshold == 0 { log.Warnf("Aggressive cleanup disabled for %s", op) } ticker := m.clk.Ticker(config.Interval) - usageGauge := m.stats.Tagged(map[string]string{"job": tag}).Gauge("disk_usage") - go func() { for { select { case <-ticker.C: - log.Debugf("Performing cleanup of %s", op) - ttl := m.checkAggressiveCleanup(op, config, diskspaceutil.DiskSpaceUtil) - usage, err := m.scan(op, config.TTI, ttl) - if err != nil { - log.Errorf("Error scanning %s: %s", op, err) - } - usageGauge.Update(float64(usage)) + m.clean(jobStats, config, op) case <-m.stopc: ticker.Stop() return @@ -123,32 +112,94 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp }() } -func (m *cleanupManager) stop() { - m.stopOnce.Do(func() { close(m.stopc) }) -} - -// scan scans the op for idle or expired files. Also returns the total disk usage -// of op. -func (m *cleanupManager) scan( - op base.FileOp, tti time.Duration, ttl time.Duration) (usage int64, err error) { +// clean deletes idle files from op based on the config. +func (m *cleanupManager) clean(jobStats tally.Scope, config CleanupConfig, op base.FileOp) { + log.Debugf("Performing cleanup of %s", op) + ttl := m.calculateTTL(jobStats, op, config, calculateDiskUtil) names, err := op.ListNames() if err != nil { - return 0, fmt.Errorf("list names: %s", err) + log.Errorf("Error cleaning cache: list names: %v", err) } + + var absUsage int64 + for _, name := range names { info, err := op.GetFileStat(name) if err != nil { log.With("name", name).Errorf("Error getting file stat: %s", err) continue } - if ready, err := m.readyForDeletion(op, name, info, tti, ttl); err != nil { + if ready, err := m.readyForDeletion(op, name, info, config.TTI, ttl); err != nil { log.With("name", name).Errorf("Error checking if file expired: %s", err) } else if ready { if err := op.DeleteFile(name); err != nil && err != base.ErrFilePersisted { log.With("name", name).Errorf("Error deleting expired file: %s", err) } } + absUsage += info.Size() + } + + jobStats.Gauge("disk_usage").Update(float64(absUsage)) +} + +type diskUtilFn func(op base.FileOp, c CleanupConfig) (int, error) + +// calculateTTL returns the TTL used for cleanup based on the config and current disk utilization. +func (m *cleanupManager) calculateTTL(jobStats tally.Scope, op base.FileOp, config CleanupConfig, calculateDiskUtil diskUtilFn) time.Duration { + if config.AggressiveThreshold == 0 { + return config.TTL + } + + utilPercent, err := calculateDiskUtil(op, config) + if err != nil { + log.Errorf("Defaulting to normal TTL due to error calculating disk space util of %s: %v", op, err) + return config.TTL + } + jobStats.Tagged(map[string]string{"exclude_other_services": strconv.FormatBool(config.ExcludeOtherServices)}). + Gauge("disk_util").Update(float64(utilPercent)) + + if utilPercent >= config.AggressiveThreshold { + log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, utilPercent) + return config.AggressiveTTL + } + return config.TTL +} + +// calculateDiskUtil calculates the disk space utilization based on the config. +// If 'ExcludeOtherServices' is turned on, only this op's utilization of the filesystem is considered. +func calculateDiskUtil(op base.FileOp, config CleanupConfig) (int, error) { + if config.ExcludeOtherServices { + fsSize, err := diskspaceutil.FileSystemSize() + if err != nil { + return 0, err + } + opSize, err := size(op) + if err != nil { + return 0, err + } + utilPercent := int(100 * opSize / fsSize) + return utilPercent, nil + } + + utilPercent, err := diskspaceutil.FileSystemUtil() + if err != nil { + return 0, err + } + return utilPercent, nil +} + +func size(op base.FileOp) (usage int64, err error) { + names, err := op.ListNames() + if err != nil { + return 0, fmt.Errorf("list names: %s", err) + } + for _, name := range names { + info, err := op.GetFileStat(name) + if err != nil { + log.With("name", name).Errorf("Error getting file stat: %s", err) + continue + } usage += info.Size() } return usage, nil @@ -174,17 +225,6 @@ func (m *cleanupManager) readyForDeletion( return m.clk.Now().Sub(lat.Time) > tti, nil } -func (m *cleanupManager) checkAggressiveCleanup(op base.FileOp, config CleanupConfig, util diskSpaceUtilFunc) time.Duration { - if config.AggressiveThreshold != 0 { - diskspaceutil, err := util() - if err != nil { - log.Errorf("Error checking disk space util %s: %s", op, err) - return config.TTL - } - if diskspaceutil >= config.AggressiveThreshold { - log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, diskspaceutil) - return config.AggressiveTTL - } - } - return config.TTL +func (m *cleanupManager) stop() { + m.stopOnce.Do(func() { close(m.stopc) }) } diff --git a/lib/store/cleanup_test.go b/lib/store/cleanup_test.go index e7f7c76b0..6f2ec3f9c 100644 --- a/lib/store/cleanup_test.go +++ b/lib/store/cleanup_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -14,6 +14,7 @@ package store import ( + // "errors" "errors" "io/ioutil" "os" @@ -50,7 +51,8 @@ func fileOpFixture(clk clock.Clock) (base.FileState, base.FileOp, func()) { func TestCleanupManagerAddJob(t *testing.T) { require := require.New(t) - clk := clock.New() + clk := clock.NewMock() + clk.Set(time.Now()) m, err := newCleanupManager(clk, tally.NoopScope) require.NoError(err) @@ -69,7 +71,7 @@ func TestCleanupManagerAddJob(t *testing.T) { require.NoError(op.CreateFile(name, state, 0)) - time.Sleep(2 * time.Second) + clk.Add(2 * time.Second) _, err = op.GetFileStat(name) require.True(os.IsNotExist(err)) @@ -80,8 +82,10 @@ func TestCleanupManagerDeleteIdleFiles(t *testing.T) { clk := clock.NewMock() clk.Set(time.Now()) - tti := 6 * time.Hour - ttl := 24 * time.Hour + config := CleanupConfig{ + TTI: 6 * time.Hour, + TTL: 24 * time.Hour, + } m, err := newCleanupManager(clk, tally.NoopScope) require.NoError(err) @@ -100,15 +104,14 @@ func TestCleanupManagerDeleteIdleFiles(t *testing.T) { require.NoError(op.CreateFile(name, state, 0)) } - clk.Add(tti + 1) + clk.Add(config.TTI + 1) active := names[50:] for _, name := range active { require.NoError(op.CreateFile(name, state, 0)) } - _, err = m.scan(op, tti, ttl) - require.NoError(err) + m.clean(tally.NoopScope, config, op) for _, name := range idle { _, err := op.GetFileStat(name) @@ -125,8 +128,10 @@ func TestCleanupManagerDeleteExpiredFiles(t *testing.T) { clk := clock.NewMock() clk.Set(time.Now()) - tti := 6 * time.Hour - ttl := 24 * time.Hour + config := CleanupConfig{ + TTI: 6 * time.Hour, + TTL: 24 * time.Hour, + } m, err := newCleanupManager(clk, tally.NoopScope) require.NoError(err) @@ -143,18 +148,16 @@ func TestCleanupManagerDeleteExpiredFiles(t *testing.T) { require.NoError(op.CreateFile(name, state, 0)) } - _, err = m.scan(op, tti, ttl) - require.NoError(err) + m.clean(tally.NoopScope, config, op) for _, name := range names { _, err := op.GetFileStat(name) require.NoError(err) } - clk.Add(ttl + 1) + clk.Add(config.TTL + 1) - _, err = m.scan(op, tti, ttl) - require.NoError(err) + m.clean(tally.NoopScope, config, op) for _, name := range names { _, err := op.GetFileStat(name) @@ -167,8 +170,10 @@ func TestCleanupManagerSkipsPersistedFiles(t *testing.T) { clk := clock.NewMock() clk.Set(time.Now()) - tti := 48 * time.Hour - ttl := 24 * time.Hour + config := CleanupConfig{ + TTI: 48 * time.Hour, + TTL: 24 * time.Hour, + } m, err := newCleanupManager(clk, tally.NoopScope) require.NoError(err) @@ -194,10 +199,9 @@ func TestCleanupManagerSkipsPersistedFiles(t *testing.T) { require.NoError(err) } - clk.Add(tti + 1) + clk.Add(config.TTI + 1) - _, err = m.scan(op, tti, ttl) - require.NoError(err) + m.clean(tally.NoopScope, config, op) for _, name := range idle { _, err := op.GetFileStat(name) @@ -209,7 +213,7 @@ func TestCleanupManagerSkipsPersistedFiles(t *testing.T) { } } -func TestCleanupManageDiskUsage(t *testing.T) { +func TestCleanupManagerSize(t *testing.T) { require := require.New(t) clk := clock.New() @@ -221,41 +225,110 @@ func TestCleanupManageDiskUsage(t *testing.T) { state, op, cleanup := fileOpFixture(clk) defer cleanup() - for i := 0; i < 100; i++ { + s, err := size(op) + require.Nil(err) + require.Equal(int64(0), s) + + for i := 0; i < 10; i++ { require.NoError(op.CreateFile(core.DigestFixture().Hex(), state, 5)) } - usage, err := m.scan(op, time.Hour, time.Hour) - require.NoError(err) - require.Equal(int64(500), usage) + s, err = size(op) + require.Nil(err) + require.Equal(int64(50), s) } -func TestCleanupManagerAggressive(t *testing.T) { +func TestCleanupManagerDiskMetrics(t *testing.T) { require := require.New(t) + clk := clock.New() config := CleanupConfig{ - AggressiveThreshold: 80, - TTL: 10 * time.Second, - AggressiveTTL: 5 * time.Second, + TTI: 1 * time.Hour, + TTL: 1 * time.Hour, } - clk := clock.NewMock() m, err := newCleanupManager(clk, tally.NoopScope) require.NoError(err) defer m.stop() - _, op, cleanup := fileOpFixture(clk) + state, op, cleanup := fileOpFixture(clk) defer cleanup() - require.Equal(m.checkAggressiveCleanup(op, config, func() (int, error) { - return 90, nil - }), 5*time.Second) + for i := 0; i < 100; i++ { + require.NoError(op.CreateFile(core.DigestFixture().Hex(), state, 5)) + } + + testStats := tally.NewTestScope("", map[string]string{}) + m.clean(testStats, config, op) + snapshot := testStats.Snapshot() + usageGauge, ok := snapshot.Gauges()["disk_usage+"] + require.True(ok) + require.Equal(float64(500), usageGauge.Value()) +} - require.Equal(m.checkAggressiveCleanup(op, config, func() (int, error) { - return 60, nil - }), 10*time.Second) +func TestCleanupManagerCalculateTTL(t *testing.T) { + for _, tc := range []struct { + desc string + config CleanupConfig + calculateDiskUtil diskUtilFn + wantTTL time.Duration + }{ + { + desc: "aggressive cleanup disabled", + config: CleanupConfig{ + TTI: 10 * time.Minute, + TTL: 30 * time.Minute, + }, + calculateDiskUtil: nil, + wantTTL: 30 * time.Minute, + }, + { + desc: "aggressive cleanup enabled, disk util below threshold", + config: CleanupConfig{ + TTI: 10 * time.Minute, + TTL: 30 * time.Minute, + + AggressiveThreshold: 50, + AggressiveTTL: 5 * time.Minute, + }, + calculateDiskUtil: func(op base.FileOp, c CleanupConfig) (int, error) { return 49, nil }, + wantTTL: 30 * time.Minute, + }, + { + desc: "aggressive cleanup enabled, disk util passed threshold", + config: CleanupConfig{ + TTI: 10 * time.Minute, + TTL: 30 * time.Minute, + + AggressiveThreshold: 50, + AggressiveTTL: 5 * time.Minute, + }, + calculateDiskUtil: func(op base.FileOp, c CleanupConfig) (int, error) { return 50, nil }, + wantTTL: 5 * time.Minute, + }, + { + desc: "aggressive cleanup enabled, disk util could not be calculated", + config: CleanupConfig{ + TTI: 10 * time.Minute, + TTL: 30 * time.Minute, + + AggressiveThreshold: 50, + AggressiveTTL: 5 * time.Minute, + }, + calculateDiskUtil: func(op base.FileOp, c CleanupConfig) (int, error) { return 0, errors.New("test-error") }, + wantTTL: 30 * time.Minute, + }, + } { + require := require.New(t) + + clk := clock.New() + m, err := newCleanupManager(clk, tally.NoopScope) + require.NoError(err) + defer m.stop() + + _, op, cleanup := fileOpFixture(clk) + defer cleanup() - require.Equal(m.checkAggressiveCleanup(op, config, func() (int, error) { - return 0, errors.New("fake error") - }), 10*time.Second) + require.Equal(tc.wantTTL, m.calculateTTL(tally.NoopScope, op, tc.config, tc.calculateDiskUtil)) + } } diff --git a/utils/diskspaceutil/diskspaceutil.go b/utils/diskspaceutil/diskspaceutil.go index ad9b24106..e73cbabca 100644 --- a/utils/diskspaceutil/diskspaceutil.go +++ b/utils/diskspaceutil/diskspaceutil.go @@ -17,11 +17,10 @@ import ( "syscall" ) -const path = "/" -// Helper method to get disk util. -func DiskSpaceUtil() (int, error) { +func FileSystemUtil() (int, error) { fs := syscall.Statfs_t{} + path := "/" err := syscall.Statfs(path, &fs) if err != nil { return 0, err @@ -31,5 +30,16 @@ func DiskSpaceUtil() (int, error) { diskFree := fs.Bfree * uint64(fs.Bsize) diskUsed := diskAll - diskFree return int(diskUsed * 100 / diskAll), nil +} + +func FileSystemSize() (int64, error) { + fs := syscall.Statfs_t{} + path := "/" + err := syscall.Statfs(path, &fs) + if err != nil { + return 0, err + } + diskAll := fs.Blocks * uint64(fs.Bsize) + return int64(diskAll), nil } diff --git a/utils/diskspaceutil/diskspaceutil_test.go b/utils/diskspaceutil/diskspaceutil_test.go index 7eb427cec..64347dac5 100644 --- a/utils/diskspaceutil/diskspaceutil_test.go +++ b/utils/diskspaceutil/diskspaceutil_test.go @@ -8,10 +8,19 @@ import ( "github.com/uber/kraken/utils/diskspaceutil" ) -func TestParseManifestV2List(t *testing.T) { - util, err := diskspaceutil.DiskSpaceUtil() - require.NoError(t, err) +func TestFileSystemUtil(t *testing.T) { + require := require.New(t) + fsUtil, err := diskspaceutil.FileSystemUtil() + require.NoError(err) - require.Equal(t, true, util > 0) - require.Equal(t, true, util < 100) + require.Equal(true, fsUtil > 0) + require.Equal(true, fsUtil < 100) +} + +func TestFileSystemSize(t *testing.T) { + require := require.New(t) + fsSize, err := diskspaceutil.FileSystemSize() + require.NoError(err) + + require.Equal(true, fsSize > 0) }