From 947701a32c055e8fb4c112529028fed3492150ac Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 4 Sep 2023 12:03:43 +0800 Subject: [PATCH] client: backport ErrClientTSOStreamClosed (#7021) ref pingcap/tidb#46540 Signed-off-by: lance6716 --- client/client.go | 16 ++++++++++++++-- client/errs/errno.go | 1 + 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/client/client.go b/client/client.go index cb4d3a1ba93..9d95d0edbee 100644 --- a/client/client.go +++ b/client/client.go @@ -17,6 +17,7 @@ package pd import ( "context" "fmt" + "io" "math/rand" "strings" "sync" @@ -1090,14 +1091,22 @@ func (c *client) processTSORequests(stream pdpb.PD_TsoClient, dcLocation string, } if err := stream.Send(req); err != nil { - err = errors.WithStack(err) + if err == io.EOF { + err = errs.ErrClientTSOStreamClosed + } else { + err = errors.WithStack(err) + } c.finishTSORequest(requests, 0, 0, 0, err) return err } tsoBatchSendLatency.Observe(float64(time.Since(tbc.batchStartTime))) resp, err := stream.Recv() if err != nil { - err = errors.WithStack(err) + if err == io.EOF { + err = errs.ErrClientTSOStreamClosed + } else { + err = errors.WithStack(err) + } c.finishTSORequest(requests, 0, 0, 0, err) return err } @@ -1810,6 +1819,9 @@ func addrsToUrls(addrs []string) []string { // IsLeaderChange will determine whether there is a leader change. func IsLeaderChange(err error) bool { + if err == errs.ErrClientTSOStreamClosed { + return true + } errMsg := err.Error() return strings.Contains(errMsg, errs.NotLeaderErr) || strings.Contains(errMsg, errs.MismatchLeaderErr) } diff --git a/client/errs/errno.go b/client/errs/errno.go index 14bcb933046..6a490039921 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -27,6 +27,7 @@ const ( // client errors var ( ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream")) + ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed")) ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout")) ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO")) ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader"))