From 2c208976551a763c0656937adb73bc2362b6f9fc Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Thu, 28 Mar 2024 11:05:18 +0800 Subject: [PATCH 1/8] retry, client: append errors for backoffer (#7896) ref tikv/pd#7894 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/go.mod | 2 +- client/retry/backoff.go | 21 ++++++++++++++++----- client/retry/backoff_test.go | 1 + 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/client/go.mod b/client/go.mod index 6a9d29a31847..9b2cb87f75e4 100644 --- a/client/go.mod +++ b/client/go.mod @@ -16,6 +16,7 @@ require ( github.com/stretchr/testify v1.8.2 go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.1.11 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 google.golang.org/grpc v1.59.0 @@ -33,7 +34,6 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.46.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/client/retry/backoff.go b/client/retry/backoff.go index e79d0e3e4eb8..6c2930989718 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -20,8 +20,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "go.uber.org/multierr" ) +const maxRecordErrorCount = 20 + // Backoffer is a backoff policy for retrying operations. type Backoffer struct { // base defines the initial time interval to wait before each retry. @@ -34,6 +37,7 @@ type Backoffer struct { // By default, all errors are retryable. retryableChecker func(err error) bool + attempt int next time.Duration currentTotal time.Duration } @@ -45,11 +49,16 @@ func (bo *Backoffer) Exec( ) error { defer bo.resetBackoff() var ( - err error - after *time.Timer + allErrors error + after *time.Timer ) for { - err = fn() + err := fn() + bo.attempt++ + if bo.attempt < maxRecordErrorCount { + // multierr.Append will ignore nil error. + allErrors = multierr.Append(allErrors, err) + } if !bo.isRetryable(err) { break } @@ -62,7 +71,7 @@ func (bo *Backoffer) Exec( select { case <-ctx.Done(): after.Stop() - return errors.Trace(ctx.Err()) + return multierr.Append(allErrors, errors.Trace(ctx.Err())) case <-after.C: failpoint.Inject("backOffExecute", func() { testBackOffExecuteFlag = true @@ -77,7 +86,7 @@ func (bo *Backoffer) Exec( } } } - return err + return allErrors } // InitialBackoffer make the initial state for retrying. @@ -102,6 +111,7 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer { }, next: base, currentTotal: 0, + attempt: 0, } } @@ -141,6 +151,7 @@ func (bo *Backoffer) exponentialInterval() time.Duration { func (bo *Backoffer) resetBackoff() { bo.next = bo.base bo.currentTotal = 0 + bo.attempt = 0 } // Only used for test. diff --git a/client/retry/backoff_test.go b/client/retry/backoff_test.go index 3dd983f2afaa..32a42d083bde 100644 --- a/client/retry/backoff_test.go +++ b/client/retry/backoff_test.go @@ -84,6 +84,7 @@ func TestBackoffer(t *testing.T) { return expectedErr }) re.InDelta(total, time.Since(start), float64(250*time.Millisecond)) + re.ErrorContains(err, "test; test; test; test") re.ErrorIs(err, expectedErr) re.Equal(4, execCount) re.True(isBackofferReset(bo)) From 87db551ea32163c3acd54581cebce786c803ccb3 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 29 Mar 2024 14:57:49 +0800 Subject: [PATCH 2/8] config: fix `trace-region-flow` displayed incorrectly (#7993) close tikv/pd#7917 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/config/config.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 6be949e8a62c..264c72bc1eac 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -548,9 +548,6 @@ func (c *PDServerConfig) adjust(meta *configutil.ConfigMetaData) error { if !meta.IsDefined("dashboard-address") { c.DashboardAddress = defaultDashboardAddress } - if !meta.IsDefined("trace-region-flow") { - c.TraceRegionFlow = defaultTraceRegionFlow - } if !meta.IsDefined("flow-round-by-digit") { configutil.AdjustInt(&c.FlowRoundByDigit, defaultFlowRoundByDigit) } From a44b208bfa135845b4b0e4a7ee47694c1dd4ed3e Mon Sep 17 00:00:00 2001 From: Hu# Date: Fri, 29 Mar 2024 16:20:02 +0800 Subject: [PATCH 3/8] tests/member: change leader priority check ticker to reduce test time (#8001) ref tikv/pd#7969 Signed-off-by: husharp --- tests/server/member/member_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index 6d4cbb3a6a46..7aadc2772e8a 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/utils/assertutil" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" @@ -151,7 +152,9 @@ func TestLeaderPriority(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.LeaderPriorityCheckInterval = typeutil.NewDuration(time.Second) + }) defer cluster.Destroy() re.NoError(err) From 945e29c033cdb7206496ca796617228c300f8aea Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Mon, 1 Apr 2024 10:33:15 +0800 Subject: [PATCH 4/8] cluster: dynamic progress time window for offline scene (#7722) close tikv/pd#7726 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/progress/progress.go | 101 +++++++++++++++++++------ pkg/progress/progress_test.go | 136 ++++++++++++++++++++++++++++++++-- pkg/schedule/coordinator.go | 22 +++++- server/cluster/cluster.go | 18 +++-- 4 files changed, 239 insertions(+), 38 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 9354d21b0e94..855aa793a83e 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -24,8 +24,14 @@ import ( "github.com/tikv/pd/pkg/utils/syncutil" ) -// speedStatisticalWindow is the speed calculation window -const speedStatisticalWindow = 10 * time.Minute +const ( + // maxSpeedCalculationWindow is the maximum size of the time window used to calculate the speed, + // but it does not mean that all data in it will be used to calculate the speed, + // which data is used depends on the patrol region duration + maxSpeedCalculationWindow = 2 * time.Hour + // minSpeedCalculationWindow is the minimum speed calculation window + minSpeedCalculationWindow = 10 * time.Minute +) // Manager is used to maintain the progresses we care about. type Manager struct { @@ -46,12 +52,28 @@ type progressIndicator struct { remaining float64 // We use a fixed interval's history to calculate the latest average speed. history *list.List - // We use speedStatisticalWindow / updateInterval to get the windowLengthLimit. - // Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. + // We use (maxSpeedCalculationWindow / updateInterval + 1) to get the windowCapacity. + // Assume that the windowCapacity is 4, the init value is 1. After update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. // Then we update it again with 5, the window will become [2, 3, 4, 5]. - windowLengthLimit int - updateInterval time.Duration - lastSpeed float64 + windowCapacity int + // windowLength is used to determine what data will be computed. + // Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1]. + // After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4]. + // It helps us avoid calculation results jumping change when patrol-region-interval changes. + windowLength int + // front is the first element which should be used. + // currentWindowLength indicates where the front is currently in the queue. + // Assume that the windowLength is 2, the init value is 1. The front is [1] and currentWindowLength is 1. + // After update 3 times with 2, 3, 4 separately. + // The front is [3], the currentWindowLength is 2, and values in queue are [(1,2),3,4] + // ^ front + // - - currentWindowLength = len([3,4]) = 2 + // We will always keep the currentWindowLength equal to windowLength if the actual size is enough. + front *list.Element + currentWindowLength int + + updateInterval time.Duration + lastSpeed float64 } // Reset resets the progress manager. @@ -62,52 +84,89 @@ func (m *Manager) Reset() { m.progresses = make(map[string]*progressIndicator) } +// Option is used to do some action for progressIndicator. +type Option func(*progressIndicator) + +// WindowDurationOption changes the time window size. +func WindowDurationOption(dur time.Duration) func(*progressIndicator) { + return func(pi *progressIndicator) { + if dur < minSpeedCalculationWindow { + dur = minSpeedCalculationWindow + } else if dur > maxSpeedCalculationWindow { + dur = maxSpeedCalculationWindow + } + pi.windowLength = int(dur/pi.updateInterval) + 1 + } +} + // AddProgress adds a progress into manager if it doesn't exist. -func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) { +func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration, opts ...Option) (exist bool) { m.Lock() defer m.Unlock() history := list.New() history.PushBack(current) if _, exist = m.progresses[progress]; !exist { - m.progresses[progress] = &progressIndicator{ - total: total, - remaining: total, - history: history, - windowLengthLimit: int(speedStatisticalWindow / updateInterval), - updateInterval: updateInterval, + pi := &progressIndicator{ + total: total, + remaining: total, + history: history, + windowCapacity: int(maxSpeedCalculationWindow/updateInterval) + 1, + windowLength: int(minSpeedCalculationWindow / updateInterval), + updateInterval: updateInterval, + } + for _, op := range opts { + op(pi) } + m.progresses[progress] = pi + pi.front = history.Front() + pi.currentWindowLength = 1 } return } // UpdateProgress updates the progress if it exists. -func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) { +func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool, opts ...Option) { m.Lock() defer m.Unlock() if p, exist := m.progresses[progress]; exist { + for _, op := range opts { + op(p) + } p.remaining = remaining if p.total < remaining { p.total = remaining } - if p.history.Len() > p.windowLengthLimit { + p.history.PushBack(current) + p.currentWindowLength++ + + // try to move `front` into correct place. + for p.currentWindowLength > p.windowLength { + p.front = p.front.Next() + p.currentWindowLength-- + } + for p.currentWindowLength < p.windowLength && p.front.Prev() != nil { + p.front = p.front.Prev() + p.currentWindowLength++ + } + + for p.history.Len() > p.windowCapacity { p.history.Remove(p.history.Front()) } - p.history.PushBack(current) // It means it just init and we haven't update the progress if p.history.Len() <= 1 { p.lastSpeed = 0 } else if isInc { // the value increases, e.g., [1, 2, 3] - p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) / - (float64(p.history.Len()-1) * p.updateInterval.Seconds()) + p.lastSpeed = (current - p.front.Value.(float64)) / + (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) } else { // the value decreases, e.g., [3, 2, 1] - p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) / - (float64(p.history.Len()-1) * p.updateInterval.Seconds()) + p.lastSpeed = (p.front.Value.(float64) - current) / + (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) } if p.lastSpeed < 0 { p.lastSpeed = 0 diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index 8f8b0ebcb565..a7b159bc9071 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -24,7 +24,6 @@ import ( ) func TestProgress(t *testing.T) { - t.Parallel() re := require.New(t) n := "test" m := NewManager() @@ -41,15 +40,13 @@ func TestProgress(t *testing.T) { p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.7, p) - // 30/(70/1s+) > 30/70 - re.Greater(ls, 30.0/70.0) - // 70/1s+ > 70 - re.Less(cs, 70.0) + re.Less(math.Abs(ls-30.0/7.0), 1e-6) + re.Less(math.Abs(cs-7), 1e-6) // there is no scheduling - for i := 0; i < 100; i++ { + for i := 0; i < 1000; i++ { m.UpdateProgress(n, 30, 30, false) } - re.Equal(61, m.progresses[n].history.Len()) + re.Equal(721, m.progresses[n].history.Len()) p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.7, p) @@ -70,7 +67,6 @@ func TestProgress(t *testing.T) { } func TestAbnormal(t *testing.T) { - t.Parallel() re := require.New(t) n := "test" m := NewManager() @@ -95,3 +91,127 @@ func TestAbnormal(t *testing.T) { re.Equal(0.0, ls) re.Equal(0.0, cs) } + +func TestProgressWithDynamicWindow(t *testing.T) { + // The full capacity of queue is 721. + re := require.New(t) + n := "test" + m := NewManager() + re.False(m.AddProgress(n, 100, 100, 10*time.Second)) + p, ls, cs, err := m.Status(n) + re.NoError(err) + re.Equal(0.0, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + time.Sleep(time.Second) + re.True(m.AddProgress(n, 100, 100, 10*time.Second)) + + m.UpdateProgress(n, 31, 31, false) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.69, p) + re.Less(math.Abs(ls-31.0/6.9), 1e-6) + re.Less(math.Abs(cs-6.9), 1e-6) + re.Equal(2, m.progresses[n].currentWindowLength) + re.Equal(100.0, m.progresses[n].front.Value.(float64)) + + m.UpdateProgress(n, 30, 30, false, WindowDurationOption(time.Minute*20)) + re.Equal(3, m.progresses[n].currentWindowLength) + re.Equal(100.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.7, p) + re.Less(math.Abs(ls-30.0/(7.0/2)), 1e-6) + re.Less(math.Abs(cs-3.5), 1e-6) + + for i := 0; i < 1000; i++ { + m.UpdateProgress(n, 30, 30, false) + } + re.Equal(721, m.progresses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.7, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + m.UpdateProgress(n, 29, 29, false, WindowDurationOption(time.Minute*20)) + re.Equal(121, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + re.Equal(721, m.progresses[n].history.Len()) + + for i := 0; i < 60; i++ { + m.UpdateProgress(n, 28, 28, false) + } + re.Equal(721, m.progresses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(float64(28/(2./120)*10.), ls) + re.Equal(float64(2./120/10.), cs) + + m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*10)) + re.Equal(721, m.progresses[n].history.Len()) + re.Equal(61, m.progresses[n].currentWindowLength) + re.Equal(28.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + + m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*20)) + re.Equal(121, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(float64(28/(2./120)*10.), ls) + re.Equal(float64(2./120/10.), cs) + + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*12)) + re.Equal(73, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(29./72)*10.), ls) + re.Equal(float64(29./72/10.), cs) + + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5)) + re.Equal(61, m.progresses[n].currentWindowLength) + re.Equal(28.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(27./60)*10.), ls) + re.Equal(float64(27./60/10.), cs) + + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*180)) + p, ls, cs, err = m.Status(n) + re.Equal(721, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(29./720)*10.), ls) + re.Equal(float64(29./720/10.), cs) + for i := 0; i < 2000; i++ { + m.UpdateProgress(n, 1, 1, false) + } + re.Equal(721, m.progresses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + + ps := m.GetProgresses(func(p string) bool { + return strings.Contains(p, n) + }) + re.Len(ps, 1) + re.Equal(n, ps[0]) + ps = m.GetProgresses(func(p string) bool { + return strings.Contains(p, "a") + }) + re.Empty(ps) + re.True(m.RemoveProgress(n)) + re.False(m.RemoveProgress(n)) +} diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index c7c77cabf3f4..35d9c2029a1b 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -74,6 +74,7 @@ type Coordinator struct { cancel context.CancelFunc schedulersInitialized bool + patrolRegionsDuration time.Duration cluster sche.ClusterInformer prepareChecker *prepareChecker @@ -110,6 +111,22 @@ func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbS } } +// GetPatrolRegionsDuration returns the duration of the last patrol region round. +func (c *Coordinator) GetPatrolRegionsDuration() time.Duration { + if c == nil { + return 0 + } + c.RLock() + defer c.RUnlock() + return c.patrolRegionsDuration +} + +func (c *Coordinator) setPatrolRegionsDuration(dur time.Duration) { + c.Lock() + defer c.Unlock() + c.patrolRegionsDuration = dur +} + // markSchedulersInitialized marks the scheduler initialization is finished. func (c *Coordinator) markSchedulersInitialized() { c.Lock() @@ -157,6 +174,7 @@ func (c *Coordinator) PatrolRegions() { ticker.Reset(c.cluster.GetCheckerConfig().GetPatrolRegionInterval()) case <-c.ctx.Done(): patrolCheckRegionsGauge.Set(0) + c.setPatrolRegionsDuration(0) log.Info("patrol regions has been stopped") return } @@ -178,7 +196,9 @@ func (c *Coordinator) PatrolRegions() { // Updates the label level isolation statistics. c.cluster.UpdateRegionsLabelLevelStats(regions) if len(key) == 0 { - patrolCheckRegionsGauge.Set(time.Since(start).Seconds()) + dur := time.Since(start) + patrolCheckRegionsGauge.Set(dur.Seconds()) + c.setPatrolRegionsDuration(dur) start = time.Now() } failpoint.Inject("break-patrol", func() { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 354e12020e3b..1a8de44b0bc2 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1409,7 +1409,7 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro if err == nil { regionSize := float64(c.core.GetStoreRegionSize(storeID)) c.resetProgress(storeID, store.GetAddress()) - c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval) + c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration())) // record the current store limit in memory c.prevStoreLimit[storeID] = map[storelimit.Type]float64{ storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer), @@ -1935,21 +1935,23 @@ func updateTopology(topology map[string]any, sortedLabels []*metapb.StoreLabel) func (c *RaftCluster) updateProgress(storeID uint64, storeAddress, action string, current, remaining float64, isInc bool) { storeLabel := strconv.FormatUint(storeID, 10) - var progress string + var progressName string + var opts []progress.Option switch action { case removingAction: - progress = encodeRemovingProgressKey(storeID) + progressName = encodeRemovingProgressKey(storeID) + opts = []progress.Option{progress.WindowDurationOption(c.coordinator.GetPatrolRegionsDuration())} case preparingAction: - progress = encodePreparingProgressKey(storeID) + progressName = encodePreparingProgressKey(storeID) } - if exist := c.progressManager.AddProgress(progress, current, remaining, nodeStateCheckJobInterval); !exist { + if exist := c.progressManager.AddProgress(progressName, current, remaining, nodeStateCheckJobInterval, opts...); !exist { return } - c.progressManager.UpdateProgress(progress, current, remaining, isInc) - process, ls, cs, err := c.progressManager.Status(progress) + c.progressManager.UpdateProgress(progressName, current, remaining, isInc, opts...) + process, ls, cs, err := c.progressManager.Status(progressName) if err != nil { - log.Error("get progress status failed", zap.String("progress", progress), zap.Float64("remaining", remaining), errs.ZapError(err)) + log.Error("get progress status failed", zap.String("progress", progressName), zap.Float64("remaining", remaining), errs.ZapError(err)) return } storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(process) From 75b8a4b682952ba06de99899e8d95dd66c66ac58 Mon Sep 17 00:00:00 2001 From: ystaticy Date: Mon, 1 Apr 2024 11:57:15 +0800 Subject: [PATCH 5/8] client/http: change the keyspace config name for the GC management type (#8000) ref tikv/pd#8002 Change `safe_point_version` to 'gc_management_type' in keyspace meta config Signed-off-by: y_static_y@sina.com Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/http/interface.go | 15 ++++++++----- client/http/request_info.go | 2 +- client/http/types.go | 15 ++++++++----- tests/integrations/client/http_client_test.go | 22 ++++++++++--------- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/client/http/interface.go b/client/http/interface.go index 6d1bb4131450..13d684e648b5 100644 --- a/client/http/interface.go +++ b/client/http/interface.go @@ -98,7 +98,12 @@ type Client interface { DeleteOperators(context.Context) error /* Keyspace interface */ - UpdateKeyspaceSafePointVersion(ctx context.Context, keyspaceName string, keyspaceSafePointVersion *KeyspaceSafePointVersionConfig) error + + // UpdateKeyspaceGCManagementType update the `gc_management_type` in keyspace meta config. + // If `gc_management_type` is `global_gc`, it means the current keyspace requires a tidb without 'keyspace-name' + // configured to run a global gc worker to calculate a global gc safe point. + // If `gc_management_type` is `keyspace_level_gc` it means the current keyspace can calculate gc safe point by its own. + UpdateKeyspaceGCManagementType(ctx context.Context, keyspaceName string, keyspaceGCManagementType *KeyspaceGCManagementTypeConfig) error GetKeyspaceMetaByName(ctx context.Context, keyspaceName string) (*keyspacepb.KeyspaceMeta, error) /* Client-related methods */ @@ -921,14 +926,14 @@ func (c *client) DeleteOperators(ctx context.Context) error { WithMethod(http.MethodDelete)) } -// UpdateKeyspaceSafePointVersion patches the keyspace config. -func (c *client) UpdateKeyspaceSafePointVersion(ctx context.Context, keyspaceName string, keyspaceSafePointVersion *KeyspaceSafePointVersionConfig) error { - keyspaceConfigPatchJSON, err := json.Marshal(keyspaceSafePointVersion) +// UpdateKeyspaceGCManagementType patches the keyspace config. +func (c *client) UpdateKeyspaceGCManagementType(ctx context.Context, keyspaceName string, keyspaceGCmanagementType *KeyspaceGCManagementTypeConfig) error { + keyspaceConfigPatchJSON, err := json.Marshal(keyspaceGCmanagementType) if err != nil { return errors.Trace(err) } return c.request(ctx, newRequestInfo(). - WithName(UpdateKeyspaceSafePointVersionName). + WithName(UpdateKeyspaceGCManagementTypeName). WithURI(GetUpdateKeyspaceConfigURL(keyspaceName)). WithMethod(http.MethodPatch). WithBody(keyspaceConfigPatchJSON)) diff --git a/client/http/request_info.go b/client/http/request_info.go index d63cd534a5eb..93a4ecf53071 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -78,7 +78,7 @@ const ( setSnapshotRecoveringMarkName = "SetSnapshotRecoveringMark" deleteSnapshotRecoveringMarkName = "DeleteSnapshotRecoveringMark" deleteOperators = "DeleteOperators" - UpdateKeyspaceSafePointVersionName = "UpdateKeyspaceSafePointVersion" + UpdateKeyspaceGCManagementTypeName = "UpdateKeyspaceGCManagementType" GetKeyspaceMetaByNameName = "GetKeyspaceMetaByName" ) diff --git a/client/http/types.go b/client/http/types.go index 91a2463ff644..31b2bfdaea7e 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -624,14 +624,17 @@ type MicroServiceMember struct { StartTimestamp int64 `json:"start-timestamp"` } -// KeyspaceSafePointVersion represents parameters needed to modify the safe point version. -type KeyspaceSafePointVersion struct { - SafePointVersion string `json:"safe_point_version,omitempty"` +// KeyspaceGCManagementType represents parameters needed to modify the gc management type. +// If `gc_management_type` is `global_gc`, it means the current keyspace requires a tidb without 'keyspace-name' +// configured to run a global gc worker to calculate a global gc safe point. +// If `gc_management_type` is `keyspace_level_gc` it means the current keyspace can calculate gc safe point by its own. +type KeyspaceGCManagementType struct { + GCManagementType string `json:"gc_management_type,omitempty"` } -// KeyspaceSafePointVersionConfig represents parameters needed to modify target keyspace's configs. -type KeyspaceSafePointVersionConfig struct { - Config KeyspaceSafePointVersion `json:"config"` +// KeyspaceGCManagementTypeConfig represents parameters needed to modify target keyspace's configs. +type KeyspaceGCManagementTypeConfig struct { + Config KeyspaceGCManagementType `json:"config"` } // tempKeyspaceMeta is the keyspace meta struct that returned from the http interface. diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 5945b70bb2c2..9efbc5878476 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -784,28 +784,30 @@ func (suite *httpClientTestSuite) TestRedirectWithMetrics() { c.Close() } -func (suite *httpClientTestSuite) TestUpdateKeyspaceSafePointVersion() { - suite.RunTestInTwoModes(suite.checkUpdateKeyspaceSafePointVersion) +func (suite *httpClientTestSuite) TestUpdateKeyspaceGCManagementType() { + suite.RunTestInTwoModes(suite.checkUpdateKeyspaceGCManagementType) } -func (suite *httpClientTestSuite) checkUpdateKeyspaceSafePointVersion(mode mode, client pd.Client) { +func (suite *httpClientTestSuite) checkUpdateKeyspaceGCManagementType(mode mode, client pd.Client) { re := suite.Require() env := suite.env[mode] keyspaceName := "DEFAULT" - safePointVersion := "v2" + expectGCManagementType := "keyspace_level_gc" - keyspaceSafePointVersionConfig := pd.KeyspaceSafePointVersionConfig{ - Config: pd.KeyspaceSafePointVersion{ - SafePointVersion: safePointVersion, + keyspaceSafePointVersionConfig := pd.KeyspaceGCManagementTypeConfig{ + Config: pd.KeyspaceGCManagementType{ + GCManagementType: expectGCManagementType, }, } - err := client.UpdateKeyspaceSafePointVersion(env.ctx, keyspaceName, &keyspaceSafePointVersionConfig) + err := client.UpdateKeyspaceGCManagementType(env.ctx, keyspaceName, &keyspaceSafePointVersionConfig) re.NoError(err) keyspaceMetaRes, err := client.GetKeyspaceMetaByName(env.ctx, keyspaceName) re.NoError(err) - val, ok := keyspaceMetaRes.Config["safe_point_version"] + val, ok := keyspaceMetaRes.Config["gc_management_type"] + + // Check it can get expect key and value in keyspace meta config. re.True(ok) - re.Equal(safePointVersion, val) + re.Equal(expectGCManagementType, val) } From f5599403e2e1c0522504f6f1e8892b7521885710 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 1 Apr 2024 13:24:15 +0800 Subject: [PATCH 6/8] config: remove defaultTraceRegionFlow (#8009) ref tikv/pd#7917 Remove unused `defaultTraceRegionFlow`. Signed-off-by: JmPotato --- server/config/config.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/config/config.go b/server/config/config.go index 264c72bc1eac..93aa3bb87d38 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -207,7 +207,6 @@ const ( defaultLeaderPriorityCheckInterval = time.Minute defaultUseRegionStorage = true - defaultTraceRegionFlow = true defaultFlowRoundByDigit = 3 // KB maxTraceFlowRoundByDigit = 5 // 0.1 MB defaultMaxResetTSGap = 24 * time.Hour From 6fe44d75b200cb406ff1cbc19f4a62400aa1db8e Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 2 Apr 2024 11:28:47 +0800 Subject: [PATCH 7/8] client: support specifying target member (#7909) ref tikv/pd#7905 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- .gitignore | 1 + client/errs/errno.go | 1 + client/grpcutil/grpcutil.go | 10 +++++----- client/http/client.go | 19 ++++++++++++++++++- client/http/client_test.go | 16 +++++++++++++++- client/http/interface.go | 5 ++++- client/http/request_info.go | 7 +++++++ client/mock_pd_service_discovery.go | 2 +- 8 files changed, 52 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 748d24872b68..b9be6099e244 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ coverage.xml coverage *.txt go.work* +embedded_assets_handler.go diff --git a/client/errs/errno.go b/client/errs/errno.go index c095bbe4b4af..c3f5c27275ac 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -51,6 +51,7 @@ var ( ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo")) ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember")) ErrClientNoAvailableMember = errors.Normalize("no available member", errors.RFCCodeText("PD:client:ErrClientNoAvailableMember")) + ErrClientNoTargetMember = errors.Normalize("no target member", errors.RFCCodeText("PD:client:ErrClientNoTargetMember")) ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal")) ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse")) ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint")) diff --git a/client/grpcutil/grpcutil.go b/client/grpcutil/grpcutil.go index fb9e84f0ca1a..0e987825c028 100644 --- a/client/grpcutil/grpcutil.go +++ b/client/grpcutil/grpcutil.go @@ -124,17 +124,17 @@ func getValueFromMetadata(ctx context.Context, key string, f func(context.Contex // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr. // Returns the old one if's already existed in the clientConns; otherwise creates a new one and returns it. -func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tls.Config, opt ...grpc.DialOption) (*grpc.ClientConn, error) { - conn, ok := clientConns.Load(addr) +func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, url string, tlsCfg *tls.Config, opt ...grpc.DialOption) (*grpc.ClientConn, error) { + conn, ok := clientConns.Load(url) if ok { // TODO: check the connection state. return conn.(*grpc.ClientConn), nil } dCtx, cancel := context.WithTimeout(ctx, dialTimeout) defer cancel() - cc, err := GetClientConn(dCtx, addr, tlsCfg, opt...) + cc, err := GetClientConn(dCtx, url, tlsCfg, opt...) failpoint.Inject("unreachableNetwork2", func(val failpoint.Value) { - if val, ok := val.(string); ok && val == addr { + if val, ok := val.(string); ok && val == url { cc = nil err = errors.Errorf("unreachable network") } @@ -142,7 +142,7 @@ func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string if err != nil { return nil, err } - conn, loaded := clientConns.LoadOrStore(addr, cc) + conn, loaded := clientConns.LoadOrStore(url, cc) if !loaded { // Successfully stored the connection. return cc, nil diff --git a/client/http/client.go b/client/http/client.go index 18802346a4c5..30144ebe2c50 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -129,8 +129,13 @@ func (ci *clientInner) requestWithRetry( if len(clients) == 0 { return errs.ErrClientNoAvailableMember } + skipNum := 0 for _, cli := range clients { url := cli.GetURL() + if reqInfo.targetURL != "" && reqInfo.targetURL != url { + skipNum++ + continue + } statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...) if err == nil || noNeedRetry(statusCode) { return err @@ -138,6 +143,9 @@ func (ci *clientInner) requestWithRetry( log.Debug("[pd] request url failed", zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err)) } + if skipNum == len(clients) { + return errs.ErrClientNoTargetMember + } return err } if reqInfo.bo == nil { @@ -244,6 +252,7 @@ type client struct { callerID string respHandler respHandleFunc bo *retry.Backoffer + targetURL string } // ClientOption configures the HTTP client. @@ -343,6 +352,13 @@ func (c *client) WithBackoffer(bo *retry.Backoffer) Client { return &newClient } +// WithTargetURL sets and returns a new client with the given target URL. +func (c *client) WithTargetURL(targetURL string) Client { + newClient := *c + newClient.targetURL = targetURL + return &newClient +} + // Header key definition constants. const ( pdAllowFollowerHandleKey = "PD-Allow-Follower-Handle" @@ -363,7 +379,8 @@ func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts . return c.inner.requestWithRetry(ctx, reqInfo. WithCallerID(c.callerID). WithRespHandler(c.respHandler). - WithBackoffer(c.bo), + WithBackoffer(c.bo). + WithTargetURL(c.targetURL), headerOpts...) } diff --git a/client/http/client_test.go b/client/http/client_test.go index 49faefefaec0..8769fa53f9ae 100644 --- a/client/http/client_test.go +++ b/client/http/client_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/retry" "go.uber.org/atomic" ) @@ -49,7 +50,7 @@ func TestPDAllowFollowerHandleHeader(t *testing.T) { re.Equal(2, checked) } -func TestCallerID(t *testing.T) { +func TestWithCallerID(t *testing.T) { re := require.New(t) checked := 0 expectedVal := atomic.NewString(defaultCallerID) @@ -96,3 +97,16 @@ func TestWithBackoffer(t *testing.T) { re.InDelta(3*time.Second, time.Since(start), float64(250*time.Millisecond)) re.ErrorIs(err, context.DeadlineExceeded) } + +func TestWithTargetURL(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := newClientWithMockServiceDiscovery("test-with-target-url", []string{"http://127.0.0.1", "http://127.0.0.2", "http://127.0.0.3"}) + defer c.Close() + + _, err := c.WithTargetURL("http://127.0.0.4").GetStatus(ctx) + re.ErrorIs(err, errs.ErrClientNoTargetMember) + _, err = c.WithTargetURL("http://127.0.0.2").GetStatus(ctx) + re.ErrorContains(err, "connect: connection refused") +} diff --git a/client/http/interface.go b/client/http/interface.go index 13d684e648b5..7b15291d9e7e 100644 --- a/client/http/interface.go +++ b/client/http/interface.go @@ -116,6 +116,8 @@ type Client interface { WithRespHandler(func(resp *http.Response, res any) error) Client // WithBackoffer sets and returns a new client with the given backoffer. WithBackoffer(*retry.Backoffer) Client + // WithTargetURL sets and returns a new client with the given target URL. + WithTargetURL(string) Client // Close gracefully closes the HTTP client. Close() } @@ -472,7 +474,8 @@ func (c *client) GetStatus(ctx context.Context) (*State, error) { WithName(getStatusName). WithURI(Status). WithMethod(http.MethodGet). - WithResp(&status)) + WithResp(&status), + WithAllowFollowerHandle()) if err != nil { return nil, err } diff --git a/client/http/request_info.go b/client/http/request_info.go index 93a4ecf53071..0ce7072d1ba0 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -91,6 +91,7 @@ type requestInfo struct { res any respHandler respHandleFunc bo *retry.Backoffer + targetURL string } // newRequestInfo creates a new request info. @@ -146,6 +147,12 @@ func (ri *requestInfo) WithBackoffer(bo *retry.Backoffer) *requestInfo { return ri } +// WithTargetURL sets the target URL of the request. +func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo { + ri.targetURL = targetURL + return ri +} + func (ri *requestInfo) getURL(addr string) string { return fmt.Sprintf("%s%s", addr, ri.uri) } diff --git a/client/mock_pd_service_discovery.go b/client/mock_pd_service_discovery.go index b33c8405af9e..17613a2f9e44 100644 --- a/client/mock_pd_service_discovery.go +++ b/client/mock_pd_service_discovery.go @@ -41,7 +41,7 @@ func NewMockPDServiceDiscovery(urls []string, tlsCfg *tls.Config) *mockPDService func (m *mockPDServiceDiscovery) Init() error { m.clients = make([]ServiceClient, 0, len(m.urls)) for _, url := range m.urls { - m.clients = append(m.clients, newPDServiceClient(url, url, nil, false)) + m.clients = append(m.clients, newPDServiceClient(url, m.urls[0], nil, false)) } return nil } From a2b0e3c6fa723879dc218d0e7ba36b0c1c185108 Mon Sep 17 00:00:00 2001 From: TonsnakeLin <87681388+TonsnakeLin@users.noreply.github.com> Date: Wed, 3 Apr 2024 11:16:47 +0800 Subject: [PATCH 8/8] schedule: add check action when poll the opeators from opNotifierQueue (#8010) close tikv/pd#7992 Signed-off-by: TonsnakeLin Co-authored-by: TonsnakeLin Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/schedule/operator/operator_controller.go | 30 ++++- .../operator/operator_controller_test.go | 125 ++++++++++++++++++ 2 files changed, 152 insertions(+), 3 deletions(-) diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index 07cafb9c566e..f5e86f812c98 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -217,10 +217,12 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { if !ok || op == nil { return nil, true } - r = oc.cluster.GetRegion(regionID) - if r == nil { + // Check the operator lightly. It cant't dispatch the op for some scenario. + var reason CancelReasonType + r, reason = oc.checkOperatorLightly(op) + if len(reason) != 0 { _ = oc.removeOperatorLocked(op) - if op.Cancel(RegionNotFound) { + if op.Cancel(reason) { log.Warn("remove operator because region disappeared", zap.Uint64("region-id", op.RegionID()), zap.Stringer("operator", op)) @@ -301,6 +303,7 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int { if isMerge { // count two merge operators as one, so wopStatus.ops[desc] should // not be updated here + // TODO: call checkAddOperator ... i++ added++ oc.wop.PutOperator(ops[i]) @@ -455,6 +458,27 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool return reason != Expired, reason } +// checkOperatorLightly checks whether the ops can be dispatched in Controller::pollNeedDispatchRegion. +// The operators can't be dispatched for some scenarios, such as region disappeared, region changed ... +// `region` is the target region of `op`. +func (oc *Controller) checkOperatorLightly(op *Operator) (*core.RegionInfo, CancelReasonType) { + region := oc.cluster.GetRegion(op.RegionID()) + if region == nil { + operatorCounter.WithLabelValues(op.Desc(), "not-found").Inc() + return nil, RegionNotFound + } + + // It may be suitable for all kinds of operator but not merge-region. + // But to be cautions, it only takes effect on merge-region currently. + // If the version of epoch is changed, the region has been splitted or merged, and the key range has been changed. + // The changing for conf_version of epoch doesn't modify the region key range, skip it. + if (op.Kind()&OpMerge != 0) && region.GetRegionEpoch().GetVersion() > op.RegionEpoch().GetVersion() { + operatorCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc() + return nil, EpochNotMatch + } + return region, "" +} + func isHigherPriorityOperator(new, old *Operator) bool { return new.GetPriorityLevel() > old.GetPriorityLevel() } diff --git a/pkg/schedule/operator/operator_controller_test.go b/pkg/schedule/operator/operator_controller_test.go index 6369dea897ce..643dbda9d737 100644 --- a/pkg/schedule/operator/operator_controller_test.go +++ b/pkg/schedule/operator/operator_controller_test.go @@ -407,6 +407,131 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() { re.False(next) } +// issue #7992 +func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() { + re := suite.Require() + opts := mockconfig.NewTestOptions() + cluster := mockcluster.NewCluster(suite.ctx, opts) + stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) + controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream) + cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + + source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(source) + target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(target) + + ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge) + re.NoError(err) + re.Len(ops, 2) + re.Equal(2, controller.AddWaitingOperator(ops...)) + // Change next push time to now, it's used to make test case faster. + controller.opNotifierQueue[0].time = time.Now() + + // first poll gets source region op. + r, next := controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, source) + + // second poll gets target region op. + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, target) + + // third poll removes the two merge-region ops. + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Len(controller.opNotifierQueue, 1) + re.Empty(controller.operators) + re.Empty(controller.wop.ListOperator()) + re.NotNil(controller.records.Get(101)) + re.NotNil(controller.records.Get(102)) + + // fourth poll removes target region op from opNotifierQueue + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Empty(controller.opNotifierQueue) + + // Add the two ops to waiting operators again. + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 0} + controller.records.ttl.Remove(101) + controller.records.ttl.Remove(102) + ops, err = CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge) + re.NoError(err) + re.Equal(2, controller.AddWaitingOperator(ops...)) + // change the target RegionEpoch + // first poll gets source region from opNotifierQueue + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, source) + + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Len(controller.opNotifierQueue, 1) + re.Empty(controller.operators) + re.Empty(controller.wop.ListOperator()) + re.NotNil(controller.records.Get(101)) + re.NotNil(controller.records.Get(102)) + + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Empty(controller.opNotifierQueue) +} + +func (suite *operatorControllerTestSuite) TestCheckOperatorLightly() { + re := suite.Require() + opts := mockconfig.NewTestOptions() + cluster := mockcluster.NewCluster(suite.ctx, opts) + stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) + controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream) + cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + + source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(source) + target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(target) + + ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge) + re.NoError(err) + re.Len(ops, 2) + + // check successfully + r, reason := controller.checkOperatorLightly(ops[0]) + re.Empty(reason) + re.Equal(r, source) + + // check failed because of region disappeared + cluster.RemoveRegion(target) + r, reason = controller.checkOperatorLightly(ops[1]) + re.Nil(r) + re.Equal(reason, RegionNotFound) + + // check failed because of verions of region epoch changed + cluster.PutRegion(target) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + r, reason = controller.checkOperatorLightly(ops[0]) + re.Nil(r) + re.Equal(reason, EpochNotMatch) +} + func (suite *operatorControllerTestSuite) TestStoreLimit() { re := suite.Require() opt := mockconfig.NewTestOptions()