From c8d08d2e7d88a56d5eb026b85f90a5b2f50c7afa Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 27 May 2024 12:10:49 +0800 Subject: [PATCH 1/2] Add retry logic for obtaining PD leader in redirector Signed-off-by: JmPotato --- pkg/utils/apiutil/serverapi/middleware.go | 52 +++++++++++++++++++---- tests/server/api/api_test.go | 18 ++++++++ 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 2432e15c967..18dd2f52155 100755 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -18,7 +18,9 @@ import ( "net/http" "net/url" "strings" + "time" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" @@ -204,20 +206,19 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http clientUrls = append(clientUrls, targetAddr) // Add a header to the response, it is used to mark whether the request has been forwarded to the micro service. w.Header().Add(apiutil.XForwardedToMicroServiceHeader, "true") - } else { - leader := h.s.GetMember().GetLeader() + } else if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) == 0 { + leader := h.waitForLeader(r) if leader == nil { http.Error(w, "no leader", http.StatusServiceUnavailable) return } clientUrls = leader.GetClientUrls() - // Prevent more than one redirection among PD/API servers. - if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 { - log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect)) - http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError) - return - } r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name()) + } else { + // Prevent more than one redirection among PD/API servers. + log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect)) + http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError) + return } urls := make([]url.URL, 0, len(clientUrls)) @@ -233,3 +234,38 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http client := h.s.GetHTTPClient() apiutil.NewCustomReverseProxies(client, urls).ServeHTTP(w, r) } + +const ( + backoffMaxDelay = 3 * time.Second + backoffInterval = 100 * time.Millisecond +) + +// If current server does not have a leader, backoff to increase the chance of success. +func (h *redirector) waitForLeader(r *http.Request) (leader *pdpb.Member) { + var ( + interval = backoffInterval + maxDelay = backoffMaxDelay + curDelay = time.Duration(0) + ) + for { + leader = h.s.GetMember().GetLeader() + if leader != nil { + return + } + select { + case <-time.After(interval): + curDelay += interval + if curDelay >= maxDelay { + return + } + interval *= 2 + if curDelay+interval > maxDelay { + interval = maxDelay - curDelay + } + case <-r.Context().Done(): + return + case <-h.s.LoopContext().Done(): + return + } + } +} diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index b70c688993d..f874e69aa1b 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -605,6 +605,24 @@ func (suite *redirectorTestSuite) TestRedirect() { re.Equal(h, header) } } + // Test redirect during leader election. + leader = suite.cluster.GetLeaderServer() + re.NotNil(leader) + err := leader.ResignLeader() + re.NoError(err) + for _, svr := range suite.cluster.GetServers() { + url := fmt.Sprintf("%s/pd/api/v1/version", svr.GetServer().GetAddr()) + testutil.Eventually(re, func() bool { + resp, err := tests.TestDialClient.Get(url) + re.NoError(err) + defer resp.Body.Close() + _, err = io.ReadAll(resp.Body) + re.NoError(err) + // Should not meet 503 since the retry logic ensure the request is sent to the new leader eventually. + re.NotEqual(http.StatusServiceUnavailable, resp.StatusCode) + return resp.StatusCode == http.StatusOK + }) + } } func (suite *redirectorTestSuite) TestAllowFollowerHandle() { From 97da4ea23ba0fb0e0fe0757b775d519bbb708970 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 1 Aug 2024 11:11:29 +0800 Subject: [PATCH 2/2] Resolve the conflicts Signed-off-by: JmPotato --- pkg/utils/apiutil/serverapi/middleware.go | 2 +- tests/server/api/api_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 18dd2f52155..ce3617453d2 100755 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -264,7 +264,7 @@ func (h *redirector) waitForLeader(r *http.Request) (leader *pdpb.Member) { } case <-r.Context().Done(): return - case <-h.s.LoopContext().Done(): + case <-h.s.Context().Done(): return } } diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index f874e69aa1b..b32739b631e 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -611,9 +611,10 @@ func (suite *redirectorTestSuite) TestRedirect() { err := leader.ResignLeader() re.NoError(err) for _, svr := range suite.cluster.GetServers() { - url := fmt.Sprintf("%s/pd/api/v1/version", svr.GetServer().GetAddr()) + request, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", svr.GetServer().GetAddr()), http.NoBody) + re.NoError(err) testutil.Eventually(re, func() bool { - resp, err := tests.TestDialClient.Get(url) + resp, err := dialClient.Do(request) re.NoError(err) defer resp.Body.Close() _, err = io.ReadAll(resp.Body)