Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 9, 2024
1 parent d5389fc commit 18a8c02
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 13 deletions.
4 changes: 4 additions & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
RetryTimeoutErr = "retry timeout"
// NotPrimaryErr indicates the non-primary member received the requests which should be received by primary.
NotPrimaryErr = "not primary"
// NotFoundTSOErr indicates the tso address is not found.
NotFoundTSOErr = "not found tso address"
// MaximumRetriesExceededErr indicates the maximum retries exceeded.
MaximumRetriesExceededErr = "maximum number of retries exceeded"
)

// client errors
Expand Down
4 changes: 2 additions & 2 deletions client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func IsServiceModeChange(err error) bool {
return false
}
errMsg := err.Error()
return strings.Contains(errMsg, "not found tso address") ||
strings.Contains(errMsg, "maximum number of retries exceeded")
return strings.Contains(errMsg, NotFoundTSOErr) ||
strings.Contains(errMsg, MaximumRetriesExceededErr)
}

// ZapError is used to make the log output easier.
Expand Down
9 changes: 6 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,12 @@ type RaftCluster struct {
regionSyncer *syncer.RegionSyncer
changedRegions chan *core.RegionInfo
keyspaceGroupManager *keyspace.GroupManager
independentServices sync.Map
hbstreams *hbstream.HeartbeatStreams
tsoAllocator *tso.AllocatorManager
// independentServices is a map of services that are independent of the PD server.
// If a service is not in this map, the PD server will provide the service itself.
// Otherwise, the service will be provided by the corresponding micro-service.
independentServices sync.Map
hbstreams *hbstream.HeartbeatStreams
tsoAllocator *tso.AllocatorManager

// heartbeatRunner is used to process the subtree update task asynchronously.
heartbeatRunner ratelimit.Runner
Expand Down
29 changes: 21 additions & 8 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,10 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() {
suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader())
}

// TestTSOServiceSwitch1 tests the switching behavior of the TSO service when TSO fallback is enabled.
// The test ensures that initially, the TSO service is provided by PD. When a TSO server is started,
// the service switches to the TSO server.
// If the TSO server is stopped, the service should switch back to being provided by PD.
func TestTSOServiceSwitch1(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`))
Expand Down Expand Up @@ -615,7 +619,7 @@ func TestTSOServiceSwitch1(t *testing.T) {
ch := make(chan struct{})
ch1 := make(chan struct{})
wg.Add(1)
go func(ctx context.Context, wg *sync.WaitGroup, ch, ch1 chan struct{}) {
go func() {
defer wg.Done()
var lastPhysical, lastLogical int64
for {
Expand All @@ -641,20 +645,25 @@ func TestTSOServiceSwitch1(t *testing.T) {
t.Log(err)
}
}
}(ctx, &wg, ch, ch1)
ch1 <- struct{}{}
<-ch
}()
waitOneTs := func() {
ch1 <- struct{}{}
<-ch
}
waitOneTs()
tsoCluster, err := tests.NewTestTSOCluster(ctx, 1, backendEndpoints)
re.NoError(err)
tsoCluster.WaitForDefaultPrimaryServing(re)
ch1 <- struct{}{}
<-ch
waitOneTs()
tsoCluster.Destroy()
ch1 <- struct{}{}
<-ch
waitOneTs()
re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode"))
}

// TestTSOServiceSwitch2 tests the behavior of TSO service switching when TSO fallback is enabled.
// Initially, the TSO service should be provided by PD. After starting a TSO server, the service should switch to the TSO server.
// When the TSO server is stopped, the PD should resume providing the TSO service if fallback is enabled.
// If fallback is disabled, the PD should not provide TSO service after the TSO server is stopped.
func TestTSOServiceSwitch2(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`))
Expand Down Expand Up @@ -727,6 +736,10 @@ func TestTSOServiceSwitch2(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode"))
}

// TestTSOServiceSwitch3 tests the behavior of TSO service switching under different configurations.
// This test verifies that after disabling TSO fallback, the PD should not provide TSO service.
// Then, it starts a TSO server and verifies that the TSO service is provided by this server.
// Finally, it stops the TSO server and verifies that the PD no longer provides TSO service.
func TestTSOServiceSwitch3(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 18a8c02

Please sign in to comment.