Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Aug 24, 2023
1 parent 670bc4a commit 36e8caf
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 0 deletions.
15 changes: 15 additions & 0 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type ServiceDiscovery interface {
// in a quorum-based cluster or any primary/secondary in a primary/secondary configured cluster
// is changed.
AddServiceAddrsSwitchedCallback(callbacks ...func())
// GetBackoffer returns the backoffer.
GetBackoffer() *retry.Backoffer
}

type updateKeyspaceIDFunc func() error
Expand Down Expand Up @@ -272,6 +274,9 @@ func (c *pdServiceDiscovery) reconnectMemberLoop() {
}

func (c *pdServiceDiscovery) waitForReady() error {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()

if e1 := c.waitForLeaderReady(); e1 != nil {
log.Error("[pd.waitForReady] failed to wait for leader ready", errs.ZapError(e1))
return errors.WithStack(e1)
Expand All @@ -282,13 +287,19 @@ func (c *pdServiceDiscovery) waitForReady() error {
}

deadline := time.Now().Add(requestTimeout)
failpoint.Inject("acceleratedRequestTimeout", func() {
deadline = time.Now().Add(500 * time.Millisecond)
})
for {
select {
case <-c.successReConnect:
return nil
case <-time.After(time.Until(deadline)):
log.Error("[pd.waitForReady] timeout")
return errors.New("wait for ready timeout")
case <-ctx.Done():
log.Info("[pd.waitForReady] exit")
return nil
}
}
}
Expand Down Expand Up @@ -767,3 +778,7 @@ func (c *pdServiceDiscovery) switchTSOAllocatorLeaders(allocatorMap map[string]*
func (c *pdServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...)
}

func (c *pdServiceDiscovery) GetBackoffer() *retry.Backoffer {
return c.bo
}
5 changes: 5 additions & 0 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -641,3 +642,7 @@ func (c *tsoServiceDiscovery) discoverWithLegacyPath() ([]string, error) {
}
return listenUrls, nil
}

func (c *tsoServiceDiscovery) GetBackoffer() *retry.Backoffer {
panic("unimplemented")
}
52 changes: 52 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mock/mockid"
Expand Down Expand Up @@ -1518,3 +1519,54 @@ func TestClientWatchWithRevision(t *testing.T) {
}
}
}

func (suite *clientTestSuite) TestRetryMemberUpdate() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedRequestTimeout", `return(true)`))
defer func() {
failpoint.Disable("github.com/tikv/pd/client/acceleratedRequestTimeout")
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
re.NoError(err)
defer cluster.Destroy()

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
defer cli.Close()
innerCli, ok := cli.(interface{ GetServiceDiscovery() pd.ServiceDiscovery })
re.True(ok)

leader := cluster.GetLeader()
waitLeader(re, innerCli.GetServiceDiscovery(), cluster.GetServer(leader).GetConfig().ClientUrls)
memberID := cluster.GetServer(leader).GetLeader().GetMemberId()

re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`))

leader2 := waitLeaderChange(re, cluster, leader, innerCli.GetServiceDiscovery())
re.NotEqual(leader, leader2)

re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader"))

bo := innerCli.GetServiceDiscovery().GetBackoffer()
retryTimes := bo.GetBackoffTimes()[retry.BoMemberUpdate.String()]
re.Greater(retryTimes, 0)
}

func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string, cli pd.ServiceDiscovery) string {
var leader string
testutil.Eventually(re, func() bool {
cli.ScheduleCheckMemberChanged()
leader = cluster.GetLeader()
if leader == old || leader == "" {
return false
}
return true
})
return leader
}

0 comments on commit 36e8caf

Please sign in to comment.