From 5167a626d1e1eb4b3cabf36571248d06f03ceef2 Mon Sep 17 00:00:00 2001 From: Justin Cichra Date: Wed, 21 Aug 2024 15:00:56 -0400 Subject: [PATCH] etcdserver: add auto compaction interval option With enough key revisions per second, the constant 5 minute interval on revision compaction isn't fast enough to keep up with growth. Make the interval configurable, which will override the default behavior. Also pass the configuration through to periodic compaction for finer control of when compaction happens based on time. Fixes: https://github.com/etcd-io/etcd/issues/18471 Signed-off-by: Justin Cichra --- server/config/config.go | 1 + server/embed/config.go | 4 ++ server/embed/config_test.go | 29 +++++++++ server/embed/etcd.go | 13 ++++ server/etcdmain/help.go | 2 + .../etcdserver/api/v3compactor/compactor.go | 5 +- server/etcdserver/api/v3compactor/periodic.go | 24 ++++--- .../api/v3compactor/periodic_test.go | 63 +++++++++++++++++-- server/etcdserver/api/v3compactor/revision.go | 14 +++-- .../api/v3compactor/revision_test.go | 18 +++--- server/etcdserver/server.go | 2 +- 11 files changed, 146 insertions(+), 29 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 5f3a0d8e9f51..0c73797912c7 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -117,6 +117,7 @@ type ServerConfig struct { AutoCompactionRetention time.Duration AutoCompactionMode string + AutoCompactionInterval time.Duration CompactionBatchLimit int CompactionSleepInterval time.Duration QuotaBackendBytes int64 diff --git a/server/embed/config.go b/server/embed/config.go index b10c5dc52c6e..6cd3733165e2 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -272,6 +272,9 @@ type Config struct { // If no time unit is provided and compaction mode is 'periodic', // the unit defaults to hour. For example, '5' translates into 5-hour. AutoCompactionRetention string `json:"auto-compaction-retention"` + // AutoCompactionInterval is the delay between compaction runs. + // If no interval is specified 'periodic' defaults to retention, revision defaults to 5 minutes + AutoCompactionInterval string `json:"auto-compaction-interval"` // GRPCKeepAliveMinTime is the minimum interval that a client should // wait before pinging server. When client pings "too fast", server @@ -724,6 +727,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) { fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.") fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.") + fs.StringVar(&cfg.AutoCompactionInterval, "auto-compaction-interval", "", "Auto compaction interval for mvcc key value store. Default is based on mode selected.") // pprof profiler via HTTP fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"") diff --git a/server/embed/config_test.go b/server/embed/config_test.go index cf18a5ec55cf..039011b42298 100644 --- a/server/embed/config_test.go +++ b/server/embed/config_test.go @@ -483,6 +483,35 @@ func TestAutoCompactionModeParse(t *testing.T) { } } +func TestAutoCompactionIntervalParse(t *testing.T) { + tests := []struct { + interval string + werr bool + wdur time.Duration + }{ + {"", false, 0}, + {"1", true, 0}, + {"1h", false, time.Hour}, + {"1s", false, time.Second}, + {"a", true, 0}, + {"-1", true, 0}, + } + + hasErr := func(err error) bool { + return err != nil + } + + for i, tt := range tests { + dur, err := parseCompactionInterval(tt.interval) + if hasErr(err) != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + if dur != tt.wdur { + t.Errorf("#%d: duration = %s, want %s", i, dur, tt.wdur) + } + } +} + func TestPeerURLsMapAndTokenFromSRV(t *testing.T) { defer func() { getCluster = srv.GetCluster }() diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 2970a804d110..78283e71da07 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -164,6 +164,11 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { return e, err } + autoCompactionInterval, err := parseCompactionInterval(cfg.AutoCompactionInterval) + if err != nil { + return e, err + } + backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType) srvcfg := config.ServerConfig{ @@ -188,6 +193,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { InitialElectionTickAdvance: cfg.InitialElectionTickAdvance, AutoCompactionRetention: autoCompactionRetention, AutoCompactionMode: cfg.AutoCompactionMode, + AutoCompactionInterval: autoCompactionInterval, QuotaBackendBytes: cfg.QuotaBackendBytes, BackendBatchLimit: cfg.BackendBatchLimit, BackendFreelistType: backendFreelistType, @@ -892,6 +898,13 @@ func (e *Etcd) GetLogger() *zap.Logger { return l } +func parseCompactionInterval(interval string) (ret time.Duration, err error) { + if interval == "" { + return ret, nil + } + return time.ParseDuration(interval) +} + func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) { h, err := strconv.Atoi(retention) if err == nil && h >= 0 { diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 23c531b720c3..abf1e088f196 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -168,6 +168,8 @@ Clustering: Auto compaction retention length. 0 means disable auto compaction. --auto-compaction-mode 'periodic' Interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention. + --auto-compaction-interval '' + Auto compaction interval. Empty means use default based on mode selected. --v2-deprecation '` + string(cconfig.V2DeprDefault) + `' Phase of v2store deprecation. Allows to opt-in for higher compatibility mode. Supported values: diff --git a/server/etcdserver/api/v3compactor/compactor.go b/server/etcdserver/api/v3compactor/compactor.go index f916e71141bf..374cd44f7436 100644 --- a/server/etcdserver/api/v3compactor/compactor.go +++ b/server/etcdserver/api/v3compactor/compactor.go @@ -56,6 +56,7 @@ func New( lg *zap.Logger, mode string, retention time.Duration, + interval time.Duration, rg RevGetter, c Compactable, ) (Compactor, error) { @@ -64,9 +65,9 @@ func New( } switch mode { case ModePeriodic: - return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil + return newPeriodic(lg, clockwork.NewRealClock(), retention, interval, rg, c), nil case ModeRevision: - return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil + return newRevision(lg, clockwork.NewRealClock(), int64(retention), interval, rg, c), nil default: return nil, fmt.Errorf("unsupported compaction mode %s", mode) } diff --git a/server/etcdserver/api/v3compactor/periodic.go b/server/etcdserver/api/v3compactor/periodic.go index 98fbc381bb87..f84dddc31ae6 100644 --- a/server/etcdserver/api/v3compactor/periodic.go +++ b/server/etcdserver/api/v3compactor/periodic.go @@ -29,9 +29,10 @@ import ( // Periodic compacts the log by purging revisions older than // the configured retention time. type Periodic struct { - lg *zap.Logger - clock clockwork.Clock - period time.Duration + lg *zap.Logger + clock clockwork.Clock + period time.Duration + interval time.Duration rg RevGetter c Compactable @@ -47,13 +48,14 @@ type Periodic struct { // newPeriodic creates a new instance of Periodic compactor that purges // the log older than h Duration. -func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic { +func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, interval time.Duration, rg RevGetter, c Compactable) *Periodic { pc := &Periodic{ - lg: lg, - clock: clock, - period: h, - rg: rg, - c: c, + lg: lg, + clock: clock, + period: h, + interval: interval, + rg: rg, + c: c, } // revs won't be longer than the retentions. pc.revs = make([]int64, 0, pc.getRetentions()) @@ -161,11 +163,15 @@ func (pc *Periodic) Run() { }() } +// if static interval is provided, compact every x duration. // if given compaction period x is <1-hour, compact every x duration. // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute) // if given compaction period x is >1-hour, compact every hour. // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour) func (pc *Periodic) getCompactInterval() time.Duration { + if pc.interval != 0 { + return pc.interval + } itv := pc.period if itv > time.Hour { itv = time.Hour diff --git a/server/etcdserver/api/v3compactor/periodic_test.go b/server/etcdserver/api/v3compactor/periodic_test.go index 5053482a807c..88044d0a4d30 100644 --- a/server/etcdserver/api/v3compactor/periodic_test.go +++ b/server/etcdserver/api/v3compactor/periodic_test.go @@ -30,12 +30,13 @@ import ( func TestPeriodicHourly(t *testing.T) { retentionHours := 2 retentionDuration := time.Duration(retentionHours) * time.Hour + intervalDuration := time.Duration(0) fc := clockwork.NewFakeClock() // TODO: Do not depand or real time (Recorder.Wait) in unit tests. rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} - tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) + tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable) tb.Run() defer tb.Stop() @@ -82,11 +83,12 @@ func TestPeriodicHourly(t *testing.T) { func TestPeriodicMinutes(t *testing.T) { retentionMinutes := 5 retentionDuration := time.Duration(retentionMinutes) * time.Minute + intervalDuration := time.Duration(0) fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} - tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) + tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable) tb.Run() defer tb.Stop() @@ -129,12 +131,64 @@ func TestPeriodicMinutes(t *testing.T) { } } +func TestPeriodicMinutesWithInterval(t *testing.T) { + retentionMinutes := 10 + retentionDuration := time.Duration(retentionMinutes) * time.Minute + intervalDuration := 2 * time.Minute + + fc := clockwork.NewFakeClock() + rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} + compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} + tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable) + + tb.Run() + defer tb.Stop() + + // compaction doesn't happen til 10 minutes elapse + for i := 0; i < retentionMinutes; i++ { + rg.Wait(1) + fc.Advance(tb.getRetryInterval()) + } + + // very first compaction + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + expectedRevision := int64(1) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + + for i := 0; i < 10; i++ { + // advance 20 minutes, one revision for each minute + for j := 0; j < 20; j++ { + rg.Wait(1) + fc.Advance(1 * time.Minute) + } + + // compact + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + + // the expected revision is the current revision minus the retention duration + // since we made a revision every minute + expectedRevision := rg.rev - int64(retentionDuration.Minutes()) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + } +} + func TestPeriodicPause(t *testing.T) { fc := clockwork.NewFakeClock() retentionDuration := time.Hour + intervalDuration := time.Duration(0) rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} - tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) + tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable) tb.Run() tb.Pause() @@ -177,11 +231,12 @@ func TestPeriodicPause(t *testing.T) { func TestPeriodicSkipRevNotChange(t *testing.T) { retentionMinutes := 5 retentionDuration := time.Duration(retentionMinutes) * time.Minute + intervalDuration := time.Duration(0) fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} - tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) + tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable) tb.Run() defer tb.Stop() diff --git a/server/etcdserver/api/v3compactor/revision.go b/server/etcdserver/api/v3compactor/revision.go index 326ac211d0b9..b225de5ed1a9 100644 --- a/server/etcdserver/api/v3compactor/revision.go +++ b/server/etcdserver/api/v3compactor/revision.go @@ -33,6 +33,7 @@ type Revision struct { clock clockwork.Clock retention int64 + interval time.Duration rg RevGetter c Compactable @@ -46,11 +47,16 @@ type Revision struct { // newRevision creates a new instance of Revisonal compactor that purges // the log older than retention revisions from the current revision. -func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision { +func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, interval time.Duration, rg RevGetter, c Compactable) *Revision { + // default revision interval to 5 minutes + if interval == 0 { + interval = time.Minute * 5 + } rc := &Revision{ lg: lg, clock: clock, retention: retention, + interval: interval, rg: rg, c: c, } @@ -58,8 +64,6 @@ func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevG return rc } -const revInterval = 5 * time.Minute - // Run runs revision-based compactor. func (rc *Revision) Run() { prev := int64(0) @@ -68,7 +72,7 @@ func (rc *Revision) Run() { select { case <-rc.ctx.Done(): return - case <-rc.clock.After(revInterval): + case <-rc.clock.After(rc.interval): rc.mu.Lock() p := rc.paused rc.mu.Unlock() @@ -102,7 +106,7 @@ func (rc *Revision) Run() { "failed auto revision compaction", zap.Int64("revision", rev), zap.Int64("revision-compaction-retention", rc.retention), - zap.Duration("retry-interval", revInterval), + zap.Duration("retry-interval", rc.interval), zap.Error(err), ) } diff --git a/server/etcdserver/api/v3compactor/revision_test.go b/server/etcdserver/api/v3compactor/revision_test.go index 54e25f2b88ce..981525deaeb3 100644 --- a/server/etcdserver/api/v3compactor/revision_test.go +++ b/server/etcdserver/api/v3compactor/revision_test.go @@ -26,22 +26,24 @@ import ( "go.etcd.io/etcd/client/pkg/v3/testutil" ) +const testRevInterval = 5 * time.Minute + func TestRevision(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} - tb := newRevision(zaptest.NewLogger(t), fc, 10, rg, compactable) + tb := newRevision(zaptest.NewLogger(t), fc, 10, testRevInterval, rg, compactable) tb.Run() defer tb.Stop() - fc.Advance(revInterval) + fc.Advance(testRevInterval) rg.Wait(1) // nothing happens rg.SetRev(99) // will be 100 expectedRevision := int64(90) - fc.Advance(revInterval) + fc.Advance(testRevInterval) rg.Wait(1) a, err := compactable.Wait(1) if err != nil { @@ -58,7 +60,7 @@ func TestRevision(t *testing.T) { rg.SetRev(199) // will be 200 expectedRevision = int64(190) - fc.Advance(revInterval) + fc.Advance(testRevInterval) rg.Wait(1) a, err = compactable.Wait(1) if err != nil { @@ -73,15 +75,15 @@ func TestRevisionPause(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100 compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newRevision(zaptest.NewLogger(t), fc, 10, rg, compactable) + tb := newRevision(zaptest.NewLogger(t), fc, 10, testRevInterval, rg, compactable) tb.Run() tb.Pause() // tb will collect 3 hours of revisions but not compact since paused - n := int(time.Hour / revInterval) + n := int(time.Hour / testRevInterval) for i := 0; i < 3*n; i++ { - fc.Advance(revInterval) + fc.Advance(testRevInterval) } // tb ends up waiting for the clock @@ -95,7 +97,7 @@ func TestRevisionPause(t *testing.T) { tb.Resume() // unblock clock, will kick off a compaction at hour 3:05 - fc.Advance(revInterval) + fc.Advance(testRevInterval) rg.Wait(1) a, err := compactable.Wait(1) if err != nil { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 0600a31b8960..b2d06c65d0cf 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -383,7 +383,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() if num := cfg.AutoCompactionRetention; num != 0 { - srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv) + srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, cfg.AutoCompactionInterval, srv.kv, srv) if err != nil { return nil, err }