diff --git a/server/config/config.go b/server/config/config.go index 4f1f21af7d6f..bf15b54560ee 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -114,6 +114,7 @@ type ServerConfig struct { AutoCompactionRetention time.Duration AutoCompactionMode string CompactionBatchLimit int + CompactionSleepInterval time.Duration QuotaBackendBytes int64 MaxTxnOps uint diff --git a/server/embed/config.go b/server/embed/config.go index fa1834b21ceb..66785265cd97 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -334,9 +334,11 @@ type Config struct { // Requires experimental-enable-lease-checkpoint to be enabled. // Deprecated in v3.6. // TODO: Delete in v3.7 - ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"` - ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` - ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` + ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"` + ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + // ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop. + ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"` + ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` // ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request // takes more time than this value. ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"` diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 147114e7922b..b2c7fee4482c 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -218,6 +218,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, + CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, WarningApplyDuration: cfg.ExperimentalWarningApplyDuration, diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 3eae19cd61e6..7051d4b0a988 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -298,6 +298,7 @@ func newConfig() *config { // TODO: delete in v3.7 fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") + fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.") fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.") fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.") diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 15de292f33f2..45d658323908 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -272,6 +272,8 @@ Experimental feature: Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled. --experimental-compaction-batch-limit 1000 ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch. + --experimental-compaction-sleep-interval '10ms' + ExperimentalCompactionSleepInterval sets the sleep interval between each compaction batch. --experimental-peer-skip-client-san-verification 'false' Skip verification of SAN field in client certificate for peer connections. --experimental-watch-progress-notify-interval '10m' diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 61033a33bfbe..2655d47d6704 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -615,10 +615,16 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { cfg.Logger.Warn("failed to create token provider", zap.Error(err)) return nil, err } - srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + + mvccStoreConfig := mvcc.StoreConfig{ + CompactionBatchLimit: cfg.CompactionBatchLimit, + CompactionSleepInterval: cfg.CompactionSleepInterval, + } + srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) kvindex := ci.ConsistentIndex() srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) + if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 828d81549a97..fb81af6ec747 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -50,10 +50,12 @@ const ( ) var restoreChunkKeys = 10000 // non-const for testing -var defaultCompactBatchLimit = 1000 +var defaultCompactionBatchLimit = 1000 +var defaultCompactionSleepInterval = 10 * time.Millisecond type StoreConfig struct { - CompactionBatchLimit int + CompactionBatchLimit int + CompactionSleepInterval time.Duration } type store struct { @@ -94,7 +96,10 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi lg = zap.NewNop() } if cfg.CompactionBatchLimit == 0 { - cfg.CompactionBatchLimit = defaultCompactBatchLimit + cfg.CompactionBatchLimit = defaultCompactionBatchLimit + } + if cfg.CompactionSleepInterval == 0 { + cfg.CompactionSleepInterval = defaultCompactionSleepInterval } s := &store{ cfg: cfg, diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index 89defbd9e794..ad8c5cb5bcfe 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -39,8 +39,11 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) batchNum := s.cfg.CompactionBatchLimit + batchTicker := time.NewTicker(s.cfg.CompactionSleepInterval) + defer batchTicker.Stop() h := newKVHasher(prevCompactRev, compactMainRev, keep) last := make([]byte, 8+1+8) + for { var rev revision @@ -58,7 +61,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal h.WriteKeyValue(keys[i], values[i]) } - if len(keys) < s.cfg.CompactionBatchLimit { + if len(keys) < batchNum { // gofail: var compactBeforeSetFinishedCompact struct{} rbytes := make([]byte, 8+1+8) revToBytes(revision{main: compactMainRev}, rbytes) @@ -87,7 +90,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) select { - case <-time.After(10 * time.Millisecond): + case <-batchTicker.C: case <-s.stopc: return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal") } diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index f1c0792ad96f..612362d737b5 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -910,7 +910,10 @@ func newFakeStore() *store { Recorder: &testutil.RecorderBuffered{}, rangeRespc: make(chan rangeResp, 5)}} s := &store{ - cfg: StoreConfig{CompactionBatchLimit: 10000}, + cfg: StoreConfig{ + CompactionBatchLimit: 10000, + CompactionSleepInterval: defaultCompactionSleepInterval, + }, b: b, le: &lease.FakeLessor{}, kvindex: newFakeIndex(),