Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: forward current http request to mcs #7078

Merged
merged 13 commits into from
Sep 18, 2023
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewService(srv *rmserver.Service) *Service {
manager := srv.GetManager()
apiHandlerEngine.Use(func(c *gin.Context) {
// manager implements the interface of basicserver.Service.
c.Set("service", manager.GetBasicServer())
c.Set(multiservicesapi.ServiceContextKey, manager.GetBasicServer())
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/scheduling/api/v1/"
const APIPathPrefix = "/scheduling/api/v1"

var (
once sync.Once
Expand Down
15 changes: 15 additions & 0 deletions pkg/utils/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,17 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request)
reader = resp.Body
}

// We need to copy the response headers before we write the header.
// Otherwise, we cannot set the header after w.WriteHeader() is called.
// And we need to write the header before we copy the response body.
// Otherwise, we cannot set the status code after w.Write() is called.
// In other words, we must perform the following steps strictly in order:
// 1. Set the response headers.
// 2. Write the response header.
// 3. Write the response body.
copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)

for {
if _, err = io.CopyN(w, reader, chunkSize); err != nil {
if err == io.EOF {
Expand All @@ -455,8 +464,14 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request)
http.Error(w, ErrRedirectFailed, http.StatusInternalServerError)
}

// copyHeader duplicates the HTTP headers from the source `src` to the destination `dst`.
// It skips the "Content-Encoding" and "Content-Length" headers because they should be set by `http.ResponseWriter`.
// These headers may be modified after a redirect when gzip compression is enabled.
func copyHeader(dst, src http.Header) {
for k, vv := range src {
if k == "Content-Encoding" || k == "Content-Length" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a concern about it and am not sure if only two keys will affect it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible, but for now gzip only affects these two, and we'll do more testing as we add interfaces.

continue
}
values := dst[k]
for _, v := range vv {
if !slice.Contains(values, v) {
Expand Down
29 changes: 22 additions & 7 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import (
"net/http"
"net/url"
"strings"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/server"
"github.com/urfave/negroni"
Expand Down Expand Up @@ -75,6 +77,7 @@
matchPath string
targetPath string
targetServiceName string
matchMethods []string
}

// NewRedirector redirects request to the leader if needs to be handled in the leader.
Expand All @@ -90,12 +93,13 @@
type RedirectorOption func(*redirector)

// MicroserviceRedirectRule new a microservice redirect rule option
func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string) RedirectorOption {
func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, methods []string) RedirectorOption {
return func(s *redirector) {
s.microserviceRedirectRules = append(s.microserviceRedirectRules, &microserviceRedirectRule{
matchPath,
targetPath,
targetServiceName,
methods,
})
}
}
Expand All @@ -108,24 +112,35 @@
return false, ""
}
for _, rule := range h.microserviceRedirectRules {
if rule.matchPath == r.URL.Path {
if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) {
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when try match redirect rules",
log.Warn("failed to get the service primary addr when trying to match redirect rules",

Check warning on line 118 in pkg/utils/apiutil/serverapi/middleware.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/apiutil/serverapi/middleware.go#L118

Added line #L118 was not covered by tests
zap.String("path", r.URL.Path))
}
r.URL.Path = rule.targetPath
// Extract parameters from the URL path
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// e.g. r.URL.Path = /pd/api/v1/operators/1 (before redirect)
// matchPath = /pd/api/v1/operators
// targetPath = /scheduling/api/v1/operators
// r.URL.Path = /scheduling/api/v1/operator/1 (after redirect)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using the custom way to do the transfer? Because we might change the previous path parameters to query parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add TODO when meet other interfaces, which not support restful

pathParams := strings.TrimPrefix(r.URL.Path, rule.matchPath)
pathParams = strings.Trim(pathParams, "/") // Remove leading and trailing '/'
if len(pathParams) > 0 {
r.URL.Path = rule.targetPath + "/" + pathParams
} else {
r.URL.Path = rule.targetPath
}
return true, addr
}
}
return false, ""
}

func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
matchedFlag, targetAddr := h.matchMicroServiceRedirectRules(r)
redirectToMicroService, targetAddr := h.matchMicroServiceRedirectRules(r)
allowFollowerHandle := len(r.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0
isLeader := h.s.GetMember().IsLeader()
if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !matchedFlag {
if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !redirectToMicroService {
next(w, r)
return
}
Expand All @@ -150,7 +165,7 @@
}

var clientUrls []string
if matchedFlag {
if redirectToMicroService {
if len(targetAddr) == 0 {
http.Error(w, apiutil.ErrRedirectFailed, http.StatusInternalServerError)
return
Expand Down
34 changes: 29 additions & 5 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"

"github.com/gorilla/mux"
scheapi "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand All @@ -35,14 +36,37 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
Name: "core",
IsCore: true,
}
router := mux.NewRouter()
prefix := apiPrefix + "/api/v1"
r := createRouter(apiPrefix, svr)
router := mux.NewRouter()
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
serverapi.NewRedirector(svr, serverapi.MicroserviceRedirectRule(
apiPrefix+"/api/v1"+"/admin/reset-ts",
tsoapi.APIPathPrefix+"/admin/reset-ts",
mcs.TSOServiceName)),
serverapi.NewRedirector(svr,
serverapi.MicroserviceRedirectRule(
prefix+"/admin/reset-ts",
tsoapi.APIPathPrefix+"/admin/reset-ts",
mcs.TSOServiceName,
[]string{http.MethodPost}),
serverapi.MicroserviceRedirectRule(
prefix+"/operators",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the config or other paths?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will resolve them in other PRs, this PR only forwards the current HTTP method in scheduling server.

scheapi.APIPathPrefix+"/operators",
mcs.SchedulingServiceName,
[]string{http.MethodPost, http.MethodGet, http.MethodDelete}),
// because the writing of all the meta information of the scheduling service is in the API server,
// we only forward read-only requests about checkers and schedulers to the scheduling service.
serverapi.MicroserviceRedirectRule(
prefix+"/checker", // Note: this is a typo in the original code
scheapi.APIPathPrefix+"/checkers",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers",
scheapi.APIPathPrefix+"/schedulers",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
// TODO: we need to consider the case that v1 api not support restful api.
// we might change the previous path parameters to query parameters.
),
negroni.Wrap(r)),
)

Expand Down
31 changes: 31 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,34 @@ func (suite *apiTestSuite) TestGetCheckerByName() {
suite.False(resp["paused"].(bool))
}
}

func (suite *apiTestSuite) TestAPIForward() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

urlPrefix := fmt.Sprintf("%s/pd/api/v1", suite.backendEndpoints)
var slice []string
var resp map[string]interface{}

// Test opeartor
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators"), &slice)
re.NoError(err)
re.Len(slice, 0)

err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators/2"), &resp)
re.NoError(err)
re.Nil(resp)

// Test checker
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "checker/merge"), &resp)
re.NoError(err)
suite.False(resp["paused"].(bool))

// Test scheduler
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), &slice)
re.NoError(err)
re.Contains(slice, "balance-leader-scheduler")
}
32 changes: 27 additions & 5 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
apis "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
)
Expand All @@ -47,10 +48,11 @@ var dialClient = &http.Client{

type tsoAPITestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
pdCluster *tests.TestCluster
tsoCluster *tests.TestTSOCluster
ctx context.Context
cancel context.CancelFunc
pdCluster *tests.TestCluster
tsoCluster *tests.TestTSOCluster
backendEndpoints string
}

func TestTSOAPI(t *testing.T) {
Expand All @@ -69,7 +71,8 @@ func (suite *tsoAPITestSuite) SetupTest() {
leaderName := suite.pdCluster.WaitLeader()
pdLeaderServer := suite.pdCluster.GetServer(leaderName)
re.NoError(pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr())
suite.backendEndpoints = pdLeaderServer.GetAddr()
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
}

Expand All @@ -95,6 +98,25 @@ func (suite *tsoAPITestSuite) TestGetKeyspaceGroupMembers() {
re.Equal(primaryMember.GetLeaderID(), defaultGroupMember.PrimaryID)
}

func (suite *tsoAPITestSuite) TestForwardResetTS() {
re := suite.Require()
primary := suite.tsoCluster.WaitForDefaultPrimaryServing(re)
re.NotNil(primary)
url := suite.backendEndpoints + "/pd/api/v1/admin/reset-ts"

// Test reset ts
input := []byte(`{"tso":"121312", "force-use-larger":true}`)
err := testutil.CheckPostJSON(dialClient, url, input,
testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully"))
suite.NoError(err)

// Test reset ts with invalid tso
input = []byte(`{}`)
err = testutil.CheckPostJSON(dialClient, url, input,
testutil.StatusNotOK(re), testutil.StringContain(re, "invalid tso value"))
re.NoError(err)
}

func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember {
httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+tsoKeyspaceGroupsPrefix+"/members", nil)
re.NoError(err)
Expand Down
30 changes: 13 additions & 17 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,19 @@ func (suite *tsoServerTestSuite) TestTSOServerStartAndStopNormally() {
cc, err := grpc.DialContext(suite.ctx, s.GetAddr(), grpc.WithInsecure())
re.NoError(err)
cc.Close()
url := s.GetAddr() + tsoapi.APIPathPrefix
{
resetJSON := `{"tso":"121312", "force-use-larger":true}`
re.NoError(err)
resp, err := http.Post(url+"/admin/reset-ts", "application/json", strings.NewReader(resetJSON))
re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusOK, resp.StatusCode)
}
{
resetJSON := `{}`
re.NoError(err)
resp, err := http.Post(url+"/admin/reset-ts", "application/json", strings.NewReader(resetJSON))
re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusBadRequest, resp.StatusCode)
}

url := s.GetAddr() + tsoapi.APIPathPrefix + "/admin/reset-ts"
// Test reset ts
input := []byte(`{"tso":"121312", "force-use-larger":true}`)
err = testutil.CheckPostJSON(dialClient, url, input,
testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully"))
suite.NoError(err)

// Test reset ts with invalid tso
input = []byte(`{}`)
err = testutil.CheckPostJSON(dialClient, suite.backendEndpoints+"/pd/api/v1/admin/reset-ts", input,
testutil.StatusNotOK(re), testutil.StringContain(re, "invalid tso value"))
re.NoError(err)
}

func (suite *tsoServerTestSuite) TestParticipantStartWithAdvertiseListenAddr() {
Expand Down
30 changes: 30 additions & 0 deletions tests/pdctl/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package operator_test
import (
"context"
"encoding/hex"
"encoding/json"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -251,3 +252,32 @@ func TestOperator(t *testing.T) {
return strings.Contains(string(output1), "Success!") || strings.Contains(string(output2), "Success!")
})
}

func TestForwardOperatorRequest(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 1)
re.NoError(err)
re.NoError(cluster.RunInitialServers())
re.NotEmpty(cluster.WaitLeader())
server := cluster.GetServer(cluster.GetLeader())
re.NoError(server.BootstrapCluster())
backendEndpoints := server.GetAddr()
tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

cmd := pdctlCmd.GetRootCmd()
args := []string{"-u", backendEndpoints, "operator", "show"}
var slice []string
output, err := pdctl.ExecuteCommand(cmd, args...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should we make sure this request is forwarded rather than being handled directly in this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re.NoError(err)
re.NoError(json.Unmarshal(output, &slice))
re.Len(slice, 0)
args = []string{"-u", backendEndpoints, "operator", "check", "2"}
output, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.Contains(string(output), "null")
}
25 changes: 25 additions & 0 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,28 @@ func mightExec(re *require.Assertions, cmd *cobra.Command, args []string, v inte
}
json.Unmarshal(output, v)
}

func TestForwardSchedulerRequest(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 1)
re.NoError(err)
re.NoError(cluster.RunInitialServers())
re.NotEmpty(cluster.WaitLeader())
server := cluster.GetServer(cluster.GetLeader())
re.NoError(server.BootstrapCluster())
backendEndpoints := server.GetAddr()
tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

cmd := pdctlCmd.GetRootCmd()
args := []string{"-u", backendEndpoints, "scheduler", "show"}
var slice []string
output, err := pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.NoError(json.Unmarshal(output, &slice))
re.Contains(slice, "balance-leader-scheduler")
}
Loading