From 0263597ba84f65047948141696be926e53aa2429 Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Thu, 20 May 2021 11:31:17 +0800 Subject: [PATCH] Introduce compaction sleep interval flag This is a backporting cherry-pick of the following commits: - add flagsline Signed-off-by: Jalin Wang - etcdserver: rename defaultCompactionSleepInterval var (#18495) etcdserver: rename `minimumBatchInterval` to `defaultCompactionSleepInterval` and `defaultCompactBatchLimit` to `defaultCompactionBatchLimit` Signed-off-by: Jalin Wang (cherry picked from commit 2c53be7c5d91e9b618342d63d2d06c3b265abee4) - test: add CompactionSleepInterval in FakeStore's config After setting the ComparionSleepInterval, we can use time.Ticker instead of time.After to optimize the scheduleComparison(), otherwise it will fail in the 'TestStoreCompact(t)' test. Signed-off-by: guozhao (cherry picked from commit fab8474ef8370a089fe0174e28c197b5b93898df) - add sleep interval (cherry picked from commit 184b0e5d4964f1115590acec50fa5e584a2f7770) Signed-off-by: Jalin Wang --- server/config/config.go | 1 + server/embed/config.go | 8 +++++--- server/embed/etcd.go | 1 + server/etcdmain/config.go | 1 + server/etcdmain/help.go | 2 ++ server/etcdserver/server.go | 8 +++++++- server/mvcc/kvstore.go | 11 ++++++++--- server/mvcc/kvstore_compaction.go | 7 +++++-- server/mvcc/kvstore_test.go | 5 ++++- 9 files changed, 34 insertions(+), 10 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 4f1f21af7d6..bf15b54560e 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -114,6 +114,7 @@ type ServerConfig struct { AutoCompactionRetention time.Duration AutoCompactionMode string CompactionBatchLimit int + CompactionSleepInterval time.Duration QuotaBackendBytes int64 MaxTxnOps uint diff --git a/server/embed/config.go b/server/embed/config.go index fa1834b21ce..66785265cd9 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -334,9 +334,11 @@ type Config struct { // Requires experimental-enable-lease-checkpoint to be enabled. // Deprecated in v3.6. // TODO: Delete in v3.7 - ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"` - ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` - ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` + ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"` + ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + // ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop. + ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"` + ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` // ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request // takes more time than this value. ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"` diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 147114e7922..b2c7fee4482 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -218,6 +218,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, + CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, WarningApplyDuration: cfg.ExperimentalWarningApplyDuration, diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index aff94c2f960..8b22ccf21ee 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -296,6 +296,7 @@ func newConfig() *config { // TODO: delete in v3.7 fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") + fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.") fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.") fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.") diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 1fd278ba9a1..03e81cc8dba 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -268,6 +268,8 @@ Experimental feature: Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled. --experimental-compaction-batch-limit 1000 ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch. + --experimental-compaction-sleep-interval '10ms' + ExperimentalCompactionSleepInterval sets the sleep interval between each compaction batch. --experimental-peer-skip-client-san-verification 'false' Skip verification of SAN field in client certificate for peer connections. --experimental-watch-progress-notify-interval '10m' diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 61033a33bfb..2655d47d670 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -615,10 +615,16 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { cfg.Logger.Warn("failed to create token provider", zap.Error(err)) return nil, err } - srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + + mvccStoreConfig := mvcc.StoreConfig{ + CompactionBatchLimit: cfg.CompactionBatchLimit, + CompactionSleepInterval: cfg.CompactionSleepInterval, + } + srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) kvindex := ci.ConsistentIndex() srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) + if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 828d81549a9..fb81af6ec74 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -50,10 +50,12 @@ const ( ) var restoreChunkKeys = 10000 // non-const for testing -var defaultCompactBatchLimit = 1000 +var defaultCompactionBatchLimit = 1000 +var defaultCompactionSleepInterval = 10 * time.Millisecond type StoreConfig struct { - CompactionBatchLimit int + CompactionBatchLimit int + CompactionSleepInterval time.Duration } type store struct { @@ -94,7 +96,10 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi lg = zap.NewNop() } if cfg.CompactionBatchLimit == 0 { - cfg.CompactionBatchLimit = defaultCompactBatchLimit + cfg.CompactionBatchLimit = defaultCompactionBatchLimit + } + if cfg.CompactionSleepInterval == 0 { + cfg.CompactionSleepInterval = defaultCompactionSleepInterval } s := &store{ cfg: cfg, diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index 89defbd9e79..ad8c5cb5bcf 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -39,8 +39,11 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) batchNum := s.cfg.CompactionBatchLimit + batchTicker := time.NewTicker(s.cfg.CompactionSleepInterval) + defer batchTicker.Stop() h := newKVHasher(prevCompactRev, compactMainRev, keep) last := make([]byte, 8+1+8) + for { var rev revision @@ -58,7 +61,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal h.WriteKeyValue(keys[i], values[i]) } - if len(keys) < s.cfg.CompactionBatchLimit { + if len(keys) < batchNum { // gofail: var compactBeforeSetFinishedCompact struct{} rbytes := make([]byte, 8+1+8) revToBytes(revision{main: compactMainRev}, rbytes) @@ -87,7 +90,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) select { - case <-time.After(10 * time.Millisecond): + case <-batchTicker.C: case <-s.stopc: return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal") } diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index f1c0792ad96..612362d737b 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -910,7 +910,10 @@ func newFakeStore() *store { Recorder: &testutil.RecorderBuffered{}, rangeRespc: make(chan rangeResp, 5)}} s := &store{ - cfg: StoreConfig{CompactionBatchLimit: 10000}, + cfg: StoreConfig{ + CompactionBatchLimit: 10000, + CompactionSleepInterval: defaultCompactionSleepInterval, + }, b: b, le: &lease.FakeLessor{}, kvindex: newFakeIndex(),