From 18a8c02759676c3990e9120effdcbdd2c8da9ab5 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 9 Oct 2024 16:47:34 +0800 Subject: [PATCH] address comments Signed-off-by: Ryan Leung --- client/errs/errno.go | 4 ++++ client/errs/errs.go | 4 ++-- server/cluster/cluster.go | 9 ++++--- tests/integrations/mcs/tso/server_test.go | 29 ++++++++++++++++------- 4 files changed, 33 insertions(+), 13 deletions(-) diff --git a/client/errs/errno.go b/client/errs/errno.go index 95c6bffdfa4..a35b5373db8 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -34,6 +34,10 @@ const ( RetryTimeoutErr = "retry timeout" // NotPrimaryErr indicates the non-primary member received the requests which should be received by primary. NotPrimaryErr = "not primary" + // NotFoundTSOErr indicates the tso address is not found. + NotFoundTSOErr = "not found tso address" + // MaximumRetriesExceededErr indicates the maximum retries exceeded. + MaximumRetriesExceededErr = "maximum number of retries exceeded" ) // client errors diff --git a/client/errs/errs.go b/client/errs/errs.go index 77623de81d0..a99e5eb7d3a 100644 --- a/client/errs/errs.go +++ b/client/errs/errs.go @@ -44,8 +44,8 @@ func IsServiceModeChange(err error) bool { return false } errMsg := err.Error() - return strings.Contains(errMsg, "not found tso address") || - strings.Contains(errMsg, "maximum number of retries exceeded") + return strings.Contains(errMsg, NotFoundTSOErr) || + strings.Contains(errMsg, MaximumRetriesExceededErr) } // ZapError is used to make the log output easier. diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 31105a646d8..618b83b094a 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -175,9 +175,12 @@ type RaftCluster struct { regionSyncer *syncer.RegionSyncer changedRegions chan *core.RegionInfo keyspaceGroupManager *keyspace.GroupManager - independentServices sync.Map - hbstreams *hbstream.HeartbeatStreams - tsoAllocator *tso.AllocatorManager + // independentServices is a map of services that are independent of the PD server. + // If a service is not in this map, the PD server will provide the service itself. + // Otherwise, the service will be provided by the corresponding micro-service. + independentServices sync.Map + hbstreams *hbstream.HeartbeatStreams + tsoAllocator *tso.AllocatorManager // heartbeatRunner is used to process the subtree update task asynchronously. heartbeatRunner ratelimit.Runner diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 00084738582..b73aaf550fc 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -586,6 +586,10 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() { suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader()) } +// TestTSOServiceSwitch1 tests the switching behavior of the TSO service when TSO fallback is enabled. +// The test ensures that initially, the TSO service is provided by PD. When a TSO server is started, +// the service switches to the TSO server. +// If the TSO server is stopped, the service should switch back to being provided by PD. func TestTSOServiceSwitch1(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`)) @@ -615,7 +619,7 @@ func TestTSOServiceSwitch1(t *testing.T) { ch := make(chan struct{}) ch1 := make(chan struct{}) wg.Add(1) - go func(ctx context.Context, wg *sync.WaitGroup, ch, ch1 chan struct{}) { + go func() { defer wg.Done() var lastPhysical, lastLogical int64 for { @@ -641,20 +645,25 @@ func TestTSOServiceSwitch1(t *testing.T) { t.Log(err) } } - }(ctx, &wg, ch, ch1) - ch1 <- struct{}{} - <-ch + }() + waitOneTs := func() { + ch1 <- struct{}{} + <-ch + } + waitOneTs() tsoCluster, err := tests.NewTestTSOCluster(ctx, 1, backendEndpoints) re.NoError(err) tsoCluster.WaitForDefaultPrimaryServing(re) - ch1 <- struct{}{} - <-ch + waitOneTs() tsoCluster.Destroy() - ch1 <- struct{}{} - <-ch + waitOneTs() re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode")) } +// TestTSOServiceSwitch2 tests the behavior of TSO service switching when TSO fallback is enabled. +// Initially, the TSO service should be provided by PD. After starting a TSO server, the service should switch to the TSO server. +// When the TSO server is stopped, the PD should resume providing the TSO service if fallback is enabled. +// If fallback is disabled, the PD should not provide TSO service after the TSO server is stopped. func TestTSOServiceSwitch2(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`)) @@ -727,6 +736,10 @@ func TestTSOServiceSwitch2(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode")) } +// TestTSOServiceSwitch3 tests the behavior of TSO service switching under different configurations. +// This test verifies that after disabling TSO fallback, the PD should not provide TSO service. +// Then, it starts a TSO server and verifies that the TSO service is provided by this server. +// Finally, it stops the TSO server and verifies that the PD no longer provides TSO service. func TestTSOServiceSwitch3(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background())