Skip to content

Commit

Permalink
skip update member when request reach the limit
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jul 27, 2023
1 parent a034984 commit 65b693d
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 11 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tlsutil"
"github.com/tikv/pd/client/tsoutil"
"github.com/tikv/pd/pkg/errs"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -960,6 +959,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()

var err error
var resp *pdpb.GetRegionResponse
for _, url := range memberURLs {
conn, err := c.pdSvcDiscovery.GetOrCreateGRPCConn(url)
Expand Down
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestGRPCDialOption(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond)
defer cancel()
cli := &pdServiceDiscovery{
checkMembershipCh: make(chan struct{}, 1),
checkMembershipCh: make(chan error, 1),
ctx: ctx,
cancel: cancel,
tlsCfg: &tlsutil.TLSConfig{},
Expand Down
7 changes: 3 additions & 4 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tlsutil"
"github.com/tikv/pd/pkg/errs"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -84,7 +83,7 @@ type ServiceDiscovery interface {
// ScheduleCheckMemberChanged is used to trigger a check to see if there is any membership change
// among the leader/followers in a quorum-based cluster or among the primary/secondaries in a
// primary/secondary configured cluster.
ScheduleCheckMemberChanged()
ScheduleCheckMemberChanged(err error)
// CheckMemberChanged immediately check if there is any membership change among the leader/followers
// in a quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster.
CheckMemberChanged() error
Expand Down Expand Up @@ -142,7 +141,7 @@ type pdServiceDiscovery struct {
// leader is updated.
tsoGlobalAllocLeaderUpdatedCb tsoGlobalServAddrUpdatedFunc

checkMembershipCh chan err
checkMembershipCh chan error

wg *sync.WaitGroup
ctx context.Context
Expand Down Expand Up @@ -246,7 +245,7 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
return
case <-ticker.C:
case err := <-c.checkMembershipCh:
if err != nil && strings.Contains(err.Error(), errs.ErrRateLimitExceeded) {
if err != nil && strings.Contains(err.Error(), errs.ErrRateLimitExceeded.Error()) {
continue
}
}
Expand Down
8 changes: 4 additions & 4 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *tsoClient) dispatchRequest(dcLocation string, request *tsoRequest) erro
if !ok {
err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", dcLocation))
log.Error("[tso] dispatch tso request error", zap.String("dc-location", dcLocation), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
c.svcDiscovery.ScheduleCheckMemberChanged(nil)
return err
}
dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request
Expand Down Expand Up @@ -439,7 +439,7 @@ tsoBatchLoop:
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
c.svcDiscovery.ScheduleCheckMemberChanged(nil)
c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
timer.Stop()
continue tsoBatchLoop
Expand Down Expand Up @@ -483,7 +483,7 @@ tsoBatchLoop:
return
default:
}
c.svcDiscovery.ScheduleCheckMemberChanged()
c.svcDiscovery.ScheduleCheckMemberChanged(nil)
log.Error("[tso] getTS error",
zap.String("dc-location", dc),
zap.String("stream-addr", streamAddr),
Expand Down Expand Up @@ -589,7 +589,7 @@ func (c *tsoClient) tryConnectToTSO(
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
c.svcDiscovery.ScheduleCheckMemberChanged()
c.svcDiscovery.ScheduleCheckMemberChanged(nil)
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout)
Expand Down
2 changes: 1 addition & 1 deletion client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (c *tsoServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn
}

// ScheduleCheckMemberChanged is used to trigger a check to see if there is any change in service endpoints.
func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() {
func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged(err error) {
select {
case c.checkMembershipCh <- struct{}{}:
default:
Expand Down

0 comments on commit 65b693d

Please sign in to comment.