Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB committed Mar 28, 2024
1 parent 3fdc914 commit d291db3
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 18 deletions.
12 changes: 0 additions & 12 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,21 +342,9 @@ func (c *tsoClient) processRequests(
count := int64(len(requests))
reqKeyspaceGroupID := c.ServiceDiscovery.GetKeyspaceGroupID()

select {
case <-stream.(*pdTSOStream).stream.Context().Done():
fmt.Println("stream context done before sleep")
default:
fmt.Println("stream context not done before sleep")
}
failpoint.Inject("waitBeforeProcessTSO", func() {
time.Sleep(time.Second * 5)
})
select {
case <-stream.(*pdTSOStream).stream.Context().Done():
fmt.Println("stream context done after sleep")
default:
fmt.Println("stream context not done after sleep")
}

respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.ServiceDiscovery.GetClusterID(), c.ServiceDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
Expand Down
9 changes: 4 additions & 5 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (d *tsoDispatcher) allowTSOFollowerProxy() bool {
}

func (d *tsoDispatcher) checkAllocator(
dispatcherCtx context.Context,
forwardCtx context.Context,
forwardCancel context.CancelFunc,
dc, forwardedHostTrim, addr, url string,
Expand All @@ -101,15 +100,15 @@ func (d *tsoDispatcher) checkAllocator(
healthCli = healthpb.NewHealthClient(cc)
}
if healthCli != nil {
healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, d.option.timeout)
healthCtx, healthCancel := context.WithTimeout(d.dispatcherCtx, d.option.timeout)
resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""})
failpoint.Inject("unreachableNetwork", func() {
resp.Status = healthpb.HealthCheckResponse_UNKNOWN
})
healthCancel()
if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING {
// create a stream of the original allocator
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx, cancel := context.WithCancel(d.dispatcherCtx)
stream, err := d.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, d.option.timeout)
if err == nil && stream != nil {
log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url))
Expand All @@ -119,7 +118,7 @@ func (d *tsoDispatcher) checkAllocator(
}
}
select {
case <-dispatcherCtx.Done():
case <-d.dispatcherCtx.Done():
return
case <-forwardCtx.Done():
return
Expand Down Expand Up @@ -263,7 +262,7 @@ func (d *tsoDispatcher) tryConnectToTSO() error {
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addr := trimHTTPPrefix(backupURL)
// the goroutine is used to check the network and change back to the original stream
go d.checkAllocator(dispatcherCtx, cctx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear)
go d.checkAllocator(cctx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear)
requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1)
updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel})
return nil
Expand Down
3 changes: 2 additions & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ func TestGetTSAfterTransferLeader(t *testing.T) {
newLeader := cluster.WaitLeader()
re.NotEmpty(newLeader)
re.NotEqual(leader, newLeader)
leader = newLeader
leader = cluster.WaitLeader()
re.NotEmpty(leader)
err = cli.GetServiceDiscovery().CheckMemberChanged()
re.NoError(err)

Expand Down

0 comments on commit d291db3

Please sign in to comment.