Skip to content

Commit

Permalink
check available tso members
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Aug 15, 2024
1 parent 4b7d0a6 commit 8cc0a70
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,7 @@ func transferPrimary(c *gin.Context) {
}

if err := mcsutils.TransferPrimary(svr.GetClient(), svr.GetParticipant().GetExpectedPrimaryLease(),
constant.SchedulingServiceName, svr.Name(), newPrimary, 0); err != nil {
constant.SchedulingServiceName, svr.Name(), newPrimary, 0, nil); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,15 @@ func transferPrimary(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
// only members of specific group are valid primary candidates.
group := svr.GetKeyspaceGroupManager().GetKeyspaceGroups()[keyspaceGroupID]
memberMap := make(map[string]bool, len(group.Members))
for _, member := range group.Members {
memberMap[member.Address] = true
}

if err := utils.TransferPrimary(svr.GetClient(), globalAllocator.(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(),
constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID); err != nil {
constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID, memberMap); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/mcs/utils/expected_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func watchExpectedPrimary(ctx context.Context,
// TransferPrimary transfers the primary of the specified service.
// keyspaceGroupID is optional, only used for TSO service.
func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName,
oldPrimary, newPrimary string, keyspaceGroupID uint32) error {
oldPrimary, newPrimary string, keyspaceGroupID uint32, tsoMembersMap map[string]bool) error {
if lease == nil {
return errors.New("current lease is nil, please check leadership")
}
Expand All @@ -139,6 +139,10 @@ func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName

var primaryIDs []string
for _, member := range entries {
// only members of specific group are valid primary candidates for TSO service.
if tsoMembersMap != nil && !tsoMembersMap[member.ServiceAddr] {
continue
}
if (newPrimary == "" && member.Name != oldPrimary) || (newPrimary != "" && member.Name == newPrimary) {
primaryIDs = append(primaryIDs, member.ServiceAddr)
}
Expand Down
51 changes: 48 additions & 3 deletions tests/integrations/mcs/members/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pdClient "github.com/tikv/pd/client/http"
bs "github.com/tikv/pd/pkg/basicserver"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
Expand All @@ -42,6 +46,9 @@ type memberTestSuite struct {
backendEndpoints string
pdClient pdClient.Client

// We only test `DefaultKeyspaceGroupID` here.
// tsoAvailMembers is used to check the tso members which in the DefaultKeyspaceGroupID.
tsoAvailMembers map[string]bool
tsoNodes map[string]bs.Server
schedulingNodes map[string]bs.Server
}
Expand All @@ -52,6 +59,7 @@ func TestMemberTestSuite(t *testing.T) {

func (suite *memberTestSuite) SetupTest() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
suite.ctx = ctx
cluster, err := tests.NewTestAPICluster(suite.ctx, 1)
Expand All @@ -66,15 +74,24 @@ func (suite *memberTestSuite) SetupTest() {

// TSO
nodes := make(map[string]bs.Server)
for i := 0; i < constant.DefaultKeyspaceGroupReplicaCount; i++ {
// mock 3 tso nodes, which is more than the default replica count(DefaultKeyspaceGroupReplicaCount).
for i := 0; i < 3; i++ {
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
nodes[s.GetAddr()] = s
suite.cleanupFunc = append(suite.cleanupFunc, func() {
cleanup()
})
}
tests.WaitForPrimaryServing(re, nodes)
primary := tests.WaitForPrimaryServing(re, nodes)
members := mustGetKeyspaceGroupMembers(re, nodes[primary].(*tso.Server))
// Get the tso nodes
suite.tsoNodes = nodes
// We only test `DefaultKeyspaceGroupID` here.
// tsoAvailMembers is used to check the tso members which in the DefaultKeyspaceGroupID.
suite.tsoAvailMembers = make(map[string]bool)
for _, member := range members[constant.DefaultKeyspaceGroupID].Group.Members {
suite.tsoAvailMembers[member.Address] = true
}

// Scheduling
nodes = make(map[string]bs.Server)
Expand All @@ -101,13 +118,15 @@ func (suite *memberTestSuite) TearDownTest() {
suite.pdClient.Close()
}
suite.cluster.Destroy()
re := suite.Require()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

func (suite *memberTestSuite) TestMembers() {
re := suite.Require()
members, err := suite.pdClient.GetMicroServiceMembers(suite.ctx, "tso")
re.NoError(err)
re.Len(members, constant.DefaultKeyspaceGroupReplicaCount)
re.Len(members, 3)

members, err = suite.pdClient.GetMicroServiceMembers(suite.ctx, "scheduling")
re.NoError(err)
Expand Down Expand Up @@ -199,6 +218,9 @@ func (suite *memberTestSuite) TestTransferPrimary() {
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand Down Expand Up @@ -250,6 +272,9 @@ func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() {
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand Down Expand Up @@ -301,6 +326,9 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpired() {
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand Down Expand Up @@ -353,6 +381,9 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown(
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand Down Expand Up @@ -387,3 +418,17 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown(
re.NotEqual(newPrimary, onlyPrimary)
}
}

func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember {
httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+"/tso/api/v1/keyspace-groups/members", nil)
re.NoError(err)
httpResp, err := tests.TestDialClient.Do(httpReq)
re.NoError(err)
defer httpResp.Body.Close()
data, err := io.ReadAll(httpResp.Body)
re.NoError(err)
re.Equal(http.StatusOK, httpResp.StatusCode, string(data))
var resp map[uint32]*apis.KeyspaceGroupMember
re.NoError(json.Unmarshal(data, &resp))
return resp
}

0 comments on commit 8cc0a70

Please sign in to comment.