diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 2432e15c967..ce3617453d2 100755 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -18,7 +18,9 @@ import ( "net/http" "net/url" "strings" + "time" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" @@ -204,20 +206,19 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http clientUrls = append(clientUrls, targetAddr) // Add a header to the response, it is used to mark whether the request has been forwarded to the micro service. w.Header().Add(apiutil.XForwardedToMicroServiceHeader, "true") - } else { - leader := h.s.GetMember().GetLeader() + } else if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) == 0 { + leader := h.waitForLeader(r) if leader == nil { http.Error(w, "no leader", http.StatusServiceUnavailable) return } clientUrls = leader.GetClientUrls() - // Prevent more than one redirection among PD/API servers. - if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 { - log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect)) - http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError) - return - } r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name()) + } else { + // Prevent more than one redirection among PD/API servers. + log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect)) + http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError) + return } urls := make([]url.URL, 0, len(clientUrls)) @@ -233,3 +234,38 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http client := h.s.GetHTTPClient() apiutil.NewCustomReverseProxies(client, urls).ServeHTTP(w, r) } + +const ( + backoffMaxDelay = 3 * time.Second + backoffInterval = 100 * time.Millisecond +) + +// If current server does not have a leader, backoff to increase the chance of success. +func (h *redirector) waitForLeader(r *http.Request) (leader *pdpb.Member) { + var ( + interval = backoffInterval + maxDelay = backoffMaxDelay + curDelay = time.Duration(0) + ) + for { + leader = h.s.GetMember().GetLeader() + if leader != nil { + return + } + select { + case <-time.After(interval): + curDelay += interval + if curDelay >= maxDelay { + return + } + interval *= 2 + if curDelay+interval > maxDelay { + interval = maxDelay - curDelay + } + case <-r.Context().Done(): + return + case <-h.s.Context().Done(): + return + } + } +} diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index b70c688993d..b32739b631e 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -605,6 +605,25 @@ func (suite *redirectorTestSuite) TestRedirect() { re.Equal(h, header) } } + // Test redirect during leader election. + leader = suite.cluster.GetLeaderServer() + re.NotNil(leader) + err := leader.ResignLeader() + re.NoError(err) + for _, svr := range suite.cluster.GetServers() { + request, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", svr.GetServer().GetAddr()), http.NoBody) + re.NoError(err) + testutil.Eventually(re, func() bool { + resp, err := dialClient.Do(request) + re.NoError(err) + defer resp.Body.Close() + _, err = io.ReadAll(resp.Body) + re.NoError(err) + // Should not meet 503 since the retry logic ensure the request is sent to the new leader eventually. + re.NotEqual(http.StatusServiceUnavailable, resp.StatusCode) + return resp.StatusCode == http.StatusOK + }) + } } func (suite *redirectorTestSuite) TestAllowFollowerHandle() {