From 6b75b34db81fd166ca3ec77508bb3cc0d7d210f2 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 10 Dec 2024 16:53:14 +0800 Subject: [PATCH] change name to write_throttle_ratio Signed-off-by: ekexium --- metrics/metrics.go | 11 +++++++++++ tikv/kv.go | 13 ++++++++----- txnkv/transaction/txn.go | 19 ++++++++++--------- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index ce247600c..a566473c0 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -117,6 +117,7 @@ var ( TiKVPipelinedFlushDuration prometheus.Histogram TiKVValidateReadTSFromPDCount prometheus.Counter TiKVLowResolutionTSOUpdateIntervalSecondsGauge prometheus.Gauge + TiKVPipelinedFlushThrottleSecondsHistogram prometheus.Histogram ) // Label constants. @@ -852,6 +853,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "The actual working update interval for the low resolution TSO. As there are adaptive mechanism internally, this value may differ from the config.", }) + TiKVPipelinedFlushThrottleSecondsHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "pipelined_flush_throttle_seconds", + Help: "Throttle durations of pipelined flushes.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 28), // 0.5ms ~ 18h + }) + initShortcuts() } @@ -948,6 +958,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVPipelinedFlushDuration) prometheus.MustRegister(TiKVValidateReadTSFromPDCount) prometheus.MustRegister(TiKVLowResolutionTSOUpdateIntervalSecondsGauge) + prometheus.MustRegister(TiKVPipelinedFlushThrottleSecondsHistogram) } // readCounter reads the value of a prometheus.Counter. diff --git a/tikv/kv.go b/tikv/kv.go index 5a8dbb673..9828683c1 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -83,7 +83,7 @@ const ( defaultPipelinedFlushConcurrency = 128 defaultPipelinedResolveLockConcurrency = 8 - defaultPipelinedWriteSpeedRatio = 1.0 + defaultPipelinedWriteThrottleRatio = 0.0 ) func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) { @@ -955,20 +955,23 @@ func WithDefaultPipelinedTxn() TxnOption { Enable: true, FlushConcurrency: defaultPipelinedFlushConcurrency, ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency, - WriteSpeedRatio: defaultPipelinedWriteSpeedRatio, + WriteThrottleRatio: defaultPipelinedWriteThrottleRatio, } } } // WithPipelinedTxn creates pipelined txn with specified parameters -func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int, - writeSpeedRatio float64) TxnOption { +func WithPipelinedTxn( + flushConcurrency, + resolveLockConcurrency int, + writeThrottleRatio float64, +) TxnOption { return func(st *transaction.TxnOptions) { st.PipelinedTxn = transaction.PipelinedTxnOptions{ Enable: true, FlushConcurrency: flushConcurrency, ResolveLockConcurrency: resolveLockConcurrency, - WriteSpeedRatio: writeSpeedRatio, + WriteThrottleRatio: writeThrottleRatio, } } } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 23addcf4d..8605d79f6 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -115,8 +115,8 @@ type PipelinedTxnOptions struct { Enable bool FlushConcurrency int ResolveLockConcurrency int - // (0,1], 1 = no sleep - WriteSpeedRatio float64 + // [0,1), 0 = no sleep, 1 = no write + WriteThrottleRatio float64 } // TxnOptions indicates the option when beginning a transaction. @@ -184,7 +184,7 @@ type KVTxn struct { pipelinedCancel context.CancelFunc pipelinedFlushConcurrency int pipelinedResolveLockConcurrency int - writeSpeedRatio float64 + writeThrottleRatio float64 // flushBatchDurationEWMA is read before each flush, and written after each flush => no race flushBatchDurationEWMA ewma.MovingAverage } @@ -212,7 +212,7 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, } newTiKVTxn.pipelinedFlushConcurrency = options.PipelinedTxn.FlushConcurrency newTiKVTxn.pipelinedResolveLockConcurrency = options.PipelinedTxn.ResolveLockConcurrency - newTiKVTxn.writeSpeedRatio = options.PipelinedTxn.WriteSpeedRatio + newTiKVTxn.writeThrottleRatio = options.PipelinedTxn.WriteThrottleRatio if err := newTiKVTxn.InitPipelinedMemDB(); err != nil { return nil, err } @@ -666,10 +666,10 @@ func (txn *KVTxn) InitPipelinedMemDB() error { } func (txn *KVTxn) throttle() { - if txn.writeSpeedRatio > 1 || txn.writeSpeedRatio <= 0 { + if txn.writeThrottleRatio >= 1 || txn.writeThrottleRatio < 0 { logutil.BgLogger().Error( "[pipelined dml] invalid write speed ratio", - zap.Float64("writeSpeedRatio", txn.writeSpeedRatio), + zap.Float64("writeThrottleRatio", txn.writeThrottleRatio), zap.Uint64("session", txn.committer.sessionID), zap.Uint64("startTS", txn.startTS), ) @@ -677,15 +677,16 @@ func (txn *KVTxn) throttle() { } expectedFlushMs := txn.flushBatchDurationEWMA.Value() - // T_sleep / (T_sleep + T_flush) = 1 - writeSpeedRatio - sleepMs := int((1.0 - txn.writeSpeedRatio) / txn.writeSpeedRatio * expectedFlushMs) + // T_sleep / (T_sleep + T_flush) = writeThrottleRatio + sleepMs := int(txn.writeThrottleRatio / (1.0 - txn.writeThrottleRatio) * expectedFlushMs) + metrics.TiKVPipelinedFlushThrottleSecondsHistogram.Observe(float64(sleepMs) / 1000) if sleepMs == 0 { return } logutil.BgLogger().Info( "[pipelined dml] throttle", zap.Int("sleepMs", sleepMs), - zap.Float64("writeSpeedRatio", txn.writeSpeedRatio), + zap.Float64("writeThrottleRatio", txn.writeThrottleRatio), zap.Uint64("session", txn.committer.sessionID), zap.Uint64("startTS", txn.startTS), )