Skip to content

Commit

Permalink
mcs: forward current http request to mcs
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Sep 11, 2023
1 parent 31d5fb4 commit eb64cd5
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewService(srv *rmserver.Service) *Service {
manager := srv.GetManager()
apiHandlerEngine.Use(func(c *gin.Context) {
// manager implements the interface of basicserver.Service.
c.Set("service", manager.GetBasicServer())
c.Set(multiservicesapi.ServiceContextKey, manager.GetBasicServer())
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/scheduling/api/v1/"
const APIPathPrefix = "/scheduling/api/v1"

var (
once sync.Once
Expand Down
2 changes: 2 additions & 0 deletions pkg/utils/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,14 @@ func PostJSON(client *http.Client, url string, data []byte) (*http.Response, err
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Add("Accept-Encoding", "identity")
return client.Do(req)
}

// GetJSON is used to send GET request to specific url
func GetJSON(client *http.Client, url string, data []byte) (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, url, bytes.NewBuffer(data))
req.Header.Add("Accept-Encoding", "identity")
if err != nil {
return nil, err
}
Expand Down
17 changes: 12 additions & 5 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package serverapi
import (
"net/http"
"net/url"
"strings"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -108,24 +109,30 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
return false, ""
}
for _, rule := range h.microserviceRedirectRules {
if rule.matchPath == r.URL.Path {
if strings.HasPrefix(r.URL.Path, rule.matchPath) {
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when try match redirect rules",
zap.String("path", r.URL.Path))
}
r.URL.Path = rule.targetPath
// Extract parameters from the URL path
pathParams := strings.TrimPrefix(r.URL.Path, rule.matchPath)
if len(pathParams) > 0 && pathParams[0] == '/' {
pathParams = pathParams[1:] // Remove leading '/'
}
r.URL.Path = rule.targetPath + "/" + pathParams
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
return true, addr
}
}
return false, ""
}

func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
matchedFlag, targetAddr := h.matchMicroServiceRedirectRules(r)
needRedirectToMicroService, targetAddr := h.matchMicroServiceRedirectRules(r)
allowFollowerHandle := len(r.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0
isLeader := h.s.GetMember().IsLeader()
if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !matchedFlag {
if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !needRedirectToMicroService {
next(w, r)
return
}
Expand All @@ -150,7 +157,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
}

var clientUrls []string
if matchedFlag {
if needRedirectToMicroService {
if len(targetAddr) == 0 {
http.Error(w, apiutil.ErrRedirectFailed, http.StatusInternalServerError)
return
Expand Down
26 changes: 21 additions & 5 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"

"github.com/gorilla/mux"
scheapi "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand All @@ -35,14 +36,29 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
Name: "core",
IsCore: true,
}
router := mux.NewRouter()
prefix := apiPrefix + "/api/v1"
r := createRouter(apiPrefix, svr)
router := mux.NewRouter()
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
serverapi.NewRedirector(svr, serverapi.MicroserviceRedirectRule(
apiPrefix+"/api/v1"+"/admin/reset-ts",
tsoapi.APIPathPrefix+"/admin/reset-ts",
mcs.TSOServiceName)),
serverapi.NewRedirector(svr,
serverapi.MicroserviceRedirectRule(
prefix+"/admin/reset-ts",
tsoapi.APIPathPrefix+"/admin/reset-ts",
mcs.TSOServiceName),
serverapi.MicroserviceRedirectRule(
prefix+"/operators",
scheapi.APIPathPrefix+"/operators",
mcs.SchedulingServiceName),
serverapi.MicroserviceRedirectRule(
prefix+"/checker", // Note: this is a typo in the original code
scheapi.APIPathPrefix+"/checkers",
mcs.SchedulingServiceName),
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers",
scheapi.APIPathPrefix+"/schedulers",
mcs.SchedulingServiceName),
),
negroni.Wrap(r)),
)

Expand Down
31 changes: 31 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,34 @@ func (suite *apiTestSuite) TestGetCheckerByName() {
suite.False(resp["paused"].(bool))
}
}

func (suite *apiTestSuite) TestAPIForward() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

urlPrefix := fmt.Sprintf("%s/pd/api/v1", suite.backendEndpoints)
var slice []string
var resp map[string]interface{}

// Test opeartor
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators"), &slice)
re.NoError(err)
re.Len(slice, 0)

err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators/2"), &resp)
re.NoError(err)
re.Nil(resp)

// Test checker
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "checker/merge"), &resp)
re.NoError(err)
suite.False(resp["paused"].(bool))

// Test scheduler
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), &slice)
re.NoError(err)
re.Contains(slice, "balance-leader-scheduler")
}
32 changes: 27 additions & 5 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
apis "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
)
Expand All @@ -47,10 +48,11 @@ var dialClient = &http.Client{

type tsoAPITestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
pdCluster *tests.TestCluster
tsoCluster *tests.TestTSOCluster
ctx context.Context
cancel context.CancelFunc
pdCluster *tests.TestCluster
tsoCluster *tests.TestTSOCluster
backendEndpoints string
}

func TestTSOAPI(t *testing.T) {
Expand All @@ -69,7 +71,8 @@ func (suite *tsoAPITestSuite) SetupTest() {
leaderName := suite.pdCluster.WaitLeader()
pdLeaderServer := suite.pdCluster.GetServer(leaderName)
re.NoError(pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr())
suite.backendEndpoints = pdLeaderServer.GetAddr()
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
}

Expand All @@ -95,6 +98,25 @@ func (suite *tsoAPITestSuite) TestGetKeyspaceGroupMembers() {
re.Equal(primaryMember.GetLeaderID(), defaultGroupMember.PrimaryID)
}

func (suite *tsoAPITestSuite) TestResetTS() {
re := suite.Require()
primary := suite.tsoCluster.WaitForDefaultPrimaryServing(re)
re.NotNil(primary)
url := suite.backendEndpoints + "/pd/api/v1/admin/reset-ts"

// Test reset ts
input := []byte(`{"tso":"121312", "force-use-larger":true}`)
err := testutil.CheckPostJSON(dialClient, url, input,
testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully"))
suite.NoError(err)

// Test reset ts with invalid tso
input = []byte(`{}`)
err = testutil.CheckPostJSON(dialClient, url, input,
testutil.StatusNotOK(re), testutil.StringContain(re, "invalid tso value"))
re.NoError(err)
}

func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember {
httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+tsoKeyspaceGroupsPrefix+"/members", nil)
re.NoError(err)
Expand Down
30 changes: 13 additions & 17 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,19 @@ func (suite *tsoServerTestSuite) TestTSOServerStartAndStopNormally() {
cc, err := grpc.DialContext(suite.ctx, s.GetAddr(), grpc.WithInsecure())
re.NoError(err)
cc.Close()
url := s.GetAddr() + tsoapi.APIPathPrefix
{
resetJSON := `{"tso":"121312", "force-use-larger":true}`
re.NoError(err)
resp, err := http.Post(url+"/admin/reset-ts", "application/json", strings.NewReader(resetJSON))
re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusOK, resp.StatusCode)
}
{
resetJSON := `{}`
re.NoError(err)
resp, err := http.Post(url+"/admin/reset-ts", "application/json", strings.NewReader(resetJSON))
re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusBadRequest, resp.StatusCode)
}

url := s.GetAddr() + tsoapi.APIPathPrefix + "/admin/reset-ts"
// Test reset ts
input := []byte(`{"tso":"121312", "force-use-larger":true}`)
err = testutil.CheckPostJSON(dialClient, url, input,
testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully"))
suite.NoError(err)

// Test reset ts with invalid tso
input = []byte(`{}`)
err = testutil.CheckPostJSON(dialClient, suite.backendEndpoints+"/pd/api/v1/admin/reset-ts", input,
testutil.StatusNotOK(re), testutil.StringContain(re, "invalid tso value"))
re.NoError(err)
}

func (suite *tsoServerTestSuite) TestParticipantStartWithAdvertiseListenAddr() {
Expand Down
30 changes: 30 additions & 0 deletions tests/pdctl/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package operator_test
import (
"context"
"encoding/hex"
"encoding/json"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -251,3 +252,32 @@ func TestOperator(t *testing.T) {
return strings.Contains(string(output1), "Success!") || strings.Contains(string(output2), "Success!")
})
}

func TestMicroservice(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 1)
re.NoError(err)
re.NoError(cluster.RunInitialServers())
re.NotEmpty(cluster.WaitLeader())
server := cluster.GetServer(cluster.GetLeader())
re.NoError(server.BootstrapCluster())
backendEndpoints := server.GetAddr()
tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

cmd := pdctlCmd.GetRootCmd()
args := []string{"-u", backendEndpoints, "operator", "show"}
var slice []string
output, err := pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.NoError(json.Unmarshal(output, &slice))
re.Len(slice, 0)
args = []string{"-u", backendEndpoints, "operator", "check", "2"}
output, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.Contains(string(output), "null")
}
25 changes: 25 additions & 0 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,3 +486,28 @@ func TestScheduler(t *testing.T) {
re.NoError(err)
checkSchedulerWithStatusCommand(nil, "disabled", nil)
}

func TestMicroservice(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 1)
re.NoError(err)
re.NoError(cluster.RunInitialServers())
re.NotEmpty(cluster.WaitLeader())
server := cluster.GetServer(cluster.GetLeader())
re.NoError(server.BootstrapCluster())
backendEndpoints := server.GetAddr()
tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

cmd := pdctlCmd.GetRootCmd()
args := []string{"-u", backendEndpoints, "scheduler", "show"}
var slice []string
output, err := pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.NoError(json.Unmarshal(output, &slice))
re.Contains(slice, "balance-leader-scheduler")
}
2 changes: 2 additions & 0 deletions tools/pd-ctl/pdctl/command/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func requestJSON(cmd *cobra.Command, method, prefix string, input map[string]int
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Add("Accept-Encoding", "identity")
resp, err = dialClient.Do(req)
default:
err := errors.Errorf("method %s not supported", method)
Expand Down Expand Up @@ -228,6 +229,7 @@ func do(endpoint, prefix, method string, resp *string, customHeader http.Header,
var req *http.Request

req, err = http.NewRequest(method, url, b.body)
req.Header.Add("Accept-Encoding", "identity")
if err != nil {
return err
}
Expand Down

0 comments on commit eb64cd5

Please sign in to comment.