Skip to content

Commit

Permalink
mcs: forward current http request to mcs
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Sep 11, 2023
1 parent 5e1e5f5 commit dca91b5
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 23 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewService(srv *rmserver.Service) *Service {
manager := srv.GetManager()
apiHandlerEngine.Use(func(c *gin.Context) {
// manager implements the interface of basicserver.Service.
c.Set("service", manager.GetBasicServer())
c.Set(multiservicesapi.ServiceContextKey, manager.GetBasicServer())
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/scheduling/api/v1/"
const APIPathPrefix = "/scheduling/api/v1"

var (
once sync.Once
Expand Down Expand Up @@ -101,19 +101,19 @@ func NewService(srv *scheserver.Service) *Service {

// RegisterSchedulersRouter registers the router of the schedulers handler.
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router := s.root.Group("schedulers", gzip.Gzip(gzip.DefaultCompression))
router.GET("", getSchedulers)
}

// RegisterCheckersRouter registers the router of the checkers handler.
func (s *Service) RegisterCheckersRouter() {
router := s.root.Group("checkers")
router := s.root.Group("checkers", gzip.Gzip(gzip.DefaultCompression))
router.GET("/:name", getCheckerByName)
}

// RegisterOperatorsRouter registers the router of the operators handler.
func (s *Service) RegisterOperatorsRouter() {
router := s.root.Group("operators")
router := s.root.Group("operators", gzip.Gzip(gzip.DefaultCompression))
router.GET("", getOperators)
router.GET("/:id", getOperatorByID)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewService(srv *tsoserver.Service) *Service {

// RegisterAdminRouter registers the router of the TSO admin handler.
func (s *Service) RegisterAdminRouter() {
router := s.root.Group("admin")
router := s.root.Group("admin", gzip.Gzip(gzip.DefaultCompression))
router.POST("/reset-ts", ResetTS)
}

Expand Down
17 changes: 12 additions & 5 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package serverapi
import (
"net/http"
"net/url"
"strings"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -108,24 +109,30 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
return false, ""
}
for _, rule := range h.microserviceRedirectRules {
if rule.matchPath == r.URL.Path {
if strings.HasPrefix(r.URL.Path, rule.matchPath) {
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when try match redirect rules",
zap.String("path", r.URL.Path))
}
r.URL.Path = rule.targetPath
// Extract parameters from the URL path
pathParams := strings.TrimPrefix(r.URL.Path, rule.matchPath)
if len(pathParams) > 0 && pathParams[0] == '/' {
pathParams = pathParams[1:] // Remove leading '/'
}
r.URL.Path = rule.targetPath + "/" + pathParams
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
return true, addr
}
}
return false, ""
}

func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
matchedFlag, targetAddr := h.matchMicroServiceRedirectRules(r)
needRedirectToMicroService, targetAddr := h.matchMicroServiceRedirectRules(r)
allowFollowerHandle := len(r.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0
isLeader := h.s.GetMember().IsLeader()
if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !matchedFlag {
if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !needRedirectToMicroService {
next(w, r)
return
}
Expand All @@ -150,7 +157,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
}

var clientUrls []string
if matchedFlag {
if needRedirectToMicroService {
if len(targetAddr) == 0 {
http.Error(w, apiutil.ErrRedirectFailed, http.StatusInternalServerError)
return
Expand Down
26 changes: 21 additions & 5 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"

"github.com/gorilla/mux"
scheapi "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand All @@ -35,14 +36,29 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
Name: "core",
IsCore: true,
}
router := mux.NewRouter()
prefix := apiPrefix + "/api/v1"
r := createRouter(apiPrefix, svr)
router := mux.NewRouter()
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
serverapi.NewRedirector(svr, serverapi.MicroserviceRedirectRule(
apiPrefix+"/api/v1"+"/admin/reset-ts",
tsoapi.APIPathPrefix+"/admin/reset-ts",
mcs.TSOServiceName)),
serverapi.NewRedirector(svr,
serverapi.MicroserviceRedirectRule(
prefix+"/admin/reset-ts",
tsoapi.APIPathPrefix+"/admin/reset-ts",
mcs.TSOServiceName),
serverapi.MicroserviceRedirectRule(
prefix+"/operators",
scheapi.APIPathPrefix+"/operators",
mcs.SchedulingServiceName),
serverapi.MicroserviceRedirectRule(
prefix+"/checker", // Note: this is a typo in the original code
scheapi.APIPathPrefix+"/checkers",
mcs.SchedulingServiceName),
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers",
scheapi.APIPathPrefix+"/schedulers",
mcs.SchedulingServiceName),
),
negroni.Wrap(r)),
)

Expand Down
57 changes: 57 additions & 0 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ package scheduling

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/testutil"
Expand Down Expand Up @@ -126,3 +131,55 @@ func (suite *serverTestSuite) TestPrimaryChange() {
return ok && newPrimaryAddr == watchedAddr
})
}

func (suite *serverTestSuite) TestAPIForward() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

urlPrefix := fmt.Sprintf("%s/pd/api/v1", suite.backendEndpoints)
cli := &http.Client{}
defer cli.CloseIdleConnections()
var list []string
var res map[string]interface{}

// opeartor
resp := checkAPIForward(re, cli, http.MethodGet, urlPrefix+"/operators")
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
re.NoError(json.Unmarshal(data, &list))
re.Len(list, 0)

resp = checkAPIForward(re, cli, http.MethodGet, urlPrefix+"/operators/2")
defer resp.Body.Close()
data, _ = io.ReadAll(resp.Body)
re.NoError(json.Unmarshal(data, &res))
re.Nil(res)

// checker
resp = checkAPIForward(re, cli, http.MethodGet, urlPrefix+"/checker/merge")
defer resp.Body.Close()
data, _ = io.ReadAll(resp.Body)
re.NoError(json.Unmarshal(data, &res))
re.False(res["paused"].(bool))

// scheduler
resp = checkAPIForward(re, cli, http.MethodGet, urlPrefix+"/schedulers")
defer resp.Body.Close()
data, _ = io.ReadAll(resp.Body)
re.NoError(json.Unmarshal(data, &list))
re.Contains(list, "balance-leader-scheduler")
}

func checkAPIForward(re *require.Assertions, cli *http.Client, method string, urlPrefix string) (resp *http.Response) {
var err error
switch method {
case http.MethodGet:
resp, err = cli.Get(urlPrefix)
}
re.NoError(err)
re.Equal(http.StatusOK, resp.StatusCode)
return resp
}
38 changes: 33 additions & 5 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"io"
"net/http"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -47,10 +48,11 @@ var dialClient = &http.Client{

type tsoAPITestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
pdCluster *tests.TestCluster
tsoCluster *tests.TestTSOCluster
ctx context.Context
cancel context.CancelFunc
pdCluster *tests.TestCluster
tsoCluster *tests.TestTSOCluster
backendEndpoints string
}

func TestTSOAPI(t *testing.T) {
Expand All @@ -69,7 +71,8 @@ func (suite *tsoAPITestSuite) SetupTest() {
leaderName := suite.pdCluster.WaitLeader()
pdLeaderServer := suite.pdCluster.GetServer(leaderName)
re.NoError(pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr())
suite.backendEndpoints = pdLeaderServer.GetAddr()
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
}

Expand All @@ -95,6 +98,31 @@ func (suite *tsoAPITestSuite) TestGetKeyspaceGroupMembers() {
re.Equal(primaryMember.GetLeaderID(), defaultGroupMember.PrimaryID)
}

func (suite *tsoAPITestSuite) TestResetTS() {
re := suite.Require()
primary := suite.tsoCluster.WaitForDefaultPrimaryServing(re)
re.NotNil(primary)
url := suite.backendEndpoints + "/pd/api/v1/admin/reset-ts"

// reset ts
resetJSON := `{"tso":"121312", "force-use-larger":true}`
resp, err := http.Post(url, "application/json", strings.NewReader(resetJSON))
re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusOK, resp.StatusCode)
data, _ := io.ReadAll(resp.Body)
re.Equal("Reset ts successfully.", string(data))

// reset ts with invalid tso
resetJSON = `{}`
resp, err = http.Post(url, "application/json", strings.NewReader(resetJSON))
re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusBadRequest, resp.StatusCode)
data, _ = io.ReadAll(resp.Body)
re.Equal("invalid tso value", string(data))
}

func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember {
httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+tsoKeyspaceGroupsPrefix+"/members", nil)
re.NoError(err)
Expand Down
2 changes: 0 additions & 2 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,13 @@ func (suite *tsoServerTestSuite) TestTSOServerStartAndStopNormally() {
url := s.GetAddr() + tsoapi.APIPathPrefix
{
resetJSON := `{"tso":"121312", "force-use-larger":true}`
re.NoError(err)
resp, err := http.Post(url+"/admin/reset-ts", "application/json", strings.NewReader(resetJSON))
re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusOK, resp.StatusCode)
}
{
resetJSON := `{}`
re.NoError(err)
resp, err := http.Post(url+"/admin/reset-ts", "application/json", strings.NewReader(resetJSON))
re.NoError(err)
defer resp.Body.Close()
Expand Down

0 comments on commit dca91b5

Please sign in to comment.