Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: forward current http request to mcs #7078

Merged
merged 13 commits into from
Sep 18, 2023
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewService(srv *rmserver.Service) *Service {
manager := srv.GetManager()
apiHandlerEngine.Use(func(c *gin.Context) {
// manager implements the interface of basicserver.Service.
c.Set("service", manager.GetBasicServer())
c.Set(multiservicesapi.ServiceContextKey, manager.GetBasicServer())
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/scheduling/api/v1/"
const APIPathPrefix = "/scheduling/api/v1"

var (
once sync.Once
Expand Down
33 changes: 15 additions & 18 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,7 @@ func (m *Participant) setLeader(member participant) {

// unsetLeader unsets the member's leader.
func (m *Participant) unsetLeader() {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
leader := NewParticipantByService(m.serviceName)
m.leader.Store(leader)
m.lastLeaderUpdatedTime.Store(time.Now())
}
Expand Down Expand Up @@ -225,15 +217,7 @@ func (m *Participant) PreCheckLeader() error {

// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) getPersistentLeader() (participant, int64, error) {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
leader := NewParticipantByService(m.serviceName)
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -399,3 +383,16 @@ func (m *Participant) campaignCheck() bool {
func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) {
m.campaignChecker.Store(checker)
}

// NewParticipantByService creates a new participant by service name.
func NewParticipantByService(serviceName string) (p participant) {
switch serviceName {
case utils.TSOServiceName:
p = &tsopb.Participant{}
case utils.SchedulingServiceName:
p = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
p = &resource_manager.Participant{}
}
return p
}
6 changes: 6 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string {
return path.Join(electionPath, utils.PrimaryKey)
}

// SchedulingPrimaryPath returns the path of scheduling primary.
// Path: /ms/{cluster_id}/scheduling/primary
func SchedulingPrimaryPath(clusterID uint64) string {
return path.Join(SchedulingSvcRootPath(clusterID), utils.PrimaryKey)
}

// KeyspaceGroupsElectionPath returns the path of keyspace groups election.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
Expand Down
10 changes: 10 additions & 0 deletions pkg/utils/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,13 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request)
reader = resp.Body
}

// We need to copy the response headers before we write the header.
// Otherwise, we cannot set the header.
// And we need to write the header before we copy the response body.
// Otherwise, we cannot set the status code.
copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)

for {
if _, err = io.CopyN(w, reader, chunkSize); err != nil {
if err == io.EOF {
Expand All @@ -457,6 +462,11 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request)

func copyHeader(dst, src http.Header) {
for k, vv := range src {
// skip Content-Encoding and Content-Length header
// because they need to be set by http.ResponseWriter when gzip is enabled
if k == "Content-Encoding" || k == "Content-Length" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a concern about it and am not sure if only two keys will affect it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible, but for now gzip only affects these two, and we'll do more testing as we add interfaces.

continue
}
values := dst[k]
for _, v := range vv {
if !slice.Contains(values, v) {
Expand Down
17 changes: 12 additions & 5 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package serverapi
import (
"net/http"
"net/url"
"strings"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -108,24 +109,30 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
return false, ""
}
for _, rule := range h.microserviceRedirectRules {
if rule.matchPath == r.URL.Path {
if strings.HasPrefix(r.URL.Path, rule.matchPath) {
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when try match redirect rules",
zap.String("path", r.URL.Path))
}
r.URL.Path = rule.targetPath
// Extract parameters from the URL path
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
pathParams := strings.TrimPrefix(r.URL.Path, rule.matchPath)
if len(pathParams) > 0 && pathParams[0] == '/' {
pathParams = pathParams[1:] // Remove leading '/'
}
r.URL.Path = rule.targetPath + "/" + pathParams
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
return true, addr
}
}
return false, ""
}

func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
matchedFlag, targetAddr := h.matchMicroServiceRedirectRules(r)
needRedirectToMicroService, targetAddr := h.matchMicroServiceRedirectRules(r)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
allowFollowerHandle := len(r.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0
isLeader := h.s.GetMember().IsLeader()
if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !matchedFlag {
if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !needRedirectToMicroService {
next(w, r)
return
}
Expand All @@ -150,7 +157,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
}

var clientUrls []string
if matchedFlag {
if needRedirectToMicroService {
if len(targetAddr) == 0 {
http.Error(w, apiutil.ErrRedirectFailed, http.StatusInternalServerError)
return
Expand Down
25 changes: 22 additions & 3 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"net/http"

"github.com/gorilla/mux"
scheapi "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/serverapi"
"github.com/tikv/pd/server"
Expand All @@ -34,12 +36,29 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
Name: "core",
IsCore: true,
}
router := mux.NewRouter()
prefix := apiPrefix + "/api/v1"
r := createRouter(apiPrefix, svr)
router := mux.NewRouter()
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
serverapi.NewRedirector(svr, serverapi.MicroserviceRedirectRule(
apiPrefix+"/api/v1"+"/admin/reset-ts", tsoapi.APIPathPrefix+"/admin/reset-ts", "tso")),
serverapi.NewRedirector(svr,
serverapi.MicroserviceRedirectRule(
prefix+"/admin/reset-ts",
tsoapi.APIPathPrefix+"/admin/reset-ts",
mcs.TSOServiceName),
serverapi.MicroserviceRedirectRule(
prefix+"/operators",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the config or other paths?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will resolve them in other PRs, this PR only forwards the current HTTP method in scheduling server.

scheapi.APIPathPrefix+"/operators",
mcs.SchedulingServiceName),
serverapi.MicroserviceRedirectRule(
prefix+"/checker", // Note: this is a typo in the original code
scheapi.APIPathPrefix+"/checkers",
mcs.SchedulingServiceName),
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers",
scheapi.APIPathPrefix+"/schedulers",
mcs.SchedulingServiceName),
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
),
negroni.Wrap(r)),
)

Expand Down
36 changes: 25 additions & 11 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,11 @@ type Server struct {

auditBackends []audit.Backend

registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
schedulingPrimaryWatcher *etcdutil.LoopWatcher
}

// HandlerBuilder builds a server HTTP handler.
Expand Down Expand Up @@ -617,7 +618,7 @@ func (s *Server) startServerLoop(ctx context.Context) {
go s.encryptionKeyManagerLoop()
if s.IsAPIServiceMode() {
s.initTSOPrimaryWatcher()
s.tsoPrimaryWatcher.StartWatchLoop()
s.initSchedulingPrimaryWatcher()
}
}

Expand Down Expand Up @@ -1962,16 +1963,28 @@ func (s *Server) initTSOPrimaryWatcher() {
serviceName := mcs.TSOServiceName
tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID)
tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID)
s.tsoPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, tsoServicePrimaryKey)
s.tsoPrimaryWatcher.StartWatchLoop()
}

func (s *Server) initSchedulingPrimaryWatcher() {
serviceName := mcs.SchedulingServiceName
primaryKey := endpoint.SchedulingPrimaryPath(s.clusterID)
s.schedulingPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, primaryKey)
s.schedulingPrimaryWatcher.StartWatchLoop()
}

func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string) *etcdutil.LoopWatcher {
putFn := func(kv *mvccpb.KeyValue) error {
primary := &tsopb.Participant{} // TODO: use Generics
primary := member.NewParticipantByService(serviceName)
if err := proto.Unmarshal(kv.Value, primary); err != nil {
return err
}
listenUrls := primary.GetListenUrls()
if len(listenUrls) > 0 {
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update tso primary", zap.String("primary", listenUrls[0]))
log.Info("update service primary", zap.String("service-name", serviceName), zap.String("primary", listenUrls[0]))
}
return nil
}
Expand All @@ -1981,16 +1994,17 @@ func (s *Server) initTSOPrimaryWatcher() {
if ok {
oldPrimary = v.(string)
}
log.Info("delete tso primary", zap.String("old-primary", oldPrimary))
log.Info("delete service primary", zap.String("service-name", serviceName), zap.String("old-primary", oldPrimary))
s.servicePrimaryMap.Delete(serviceName)
return nil
}
s.tsoPrimaryWatcher = etcdutil.NewLoopWatcher(
name := fmt.Sprintf("%s-primary-watcher", serviceName)
return etcdutil.NewLoopWatcher(
s.serverLoopCtx,
&s.serverLoopWg,
s.client,
"tso-primary-watcher",
tsoServicePrimaryKey,
name,
primaryKey,
putFn,
deleteFn,
func() error { return nil },
Expand Down
5 changes: 5 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,11 @@ func (s *TestServer) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.server.GetTSOAllocatorManager()
}

// GetServicePrimaryAddr returns the primary address of the service.
func (s *TestServer) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) {
return s.server.GetServicePrimaryAddr(ctx, serviceName)
}

// TestCluster is only for test.
type TestCluster struct {
config *clusterConfig
Expand Down
31 changes: 31 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,34 @@ func (suite *apiTestSuite) TestGetCheckerByName() {
suite.False(resp["paused"].(bool))
}
}

func (suite *apiTestSuite) TestAPIForward() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

urlPrefix := fmt.Sprintf("%s/pd/api/v1", suite.backendEndpoints)
var slice []string
var resp map[string]interface{}

// Test opeartor
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators"), &slice)
re.NoError(err)
re.Len(slice, 0)

err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators/2"), &resp)
re.NoError(err)
re.Nil(resp)

// Test checker
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "checker/merge"), &resp)
re.NoError(err)
suite.False(resp["paused"].(bool))

// Test scheduler
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), &slice)
re.NoError(err)
re.Contains(slice, "balance-leader-scheduler")
}
15 changes: 13 additions & 2 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/suite"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
"go.uber.org/goleak"
Expand Down Expand Up @@ -107,11 +108,21 @@ func (suite *serverTestSuite) TestPrimaryChange() {
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
primary := tc.GetPrimaryServer()
addr := primary.GetAddr()
oldPrimaryAddr := primary.GetAddr()
re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5)
testutil.Eventually(re, func() bool {
watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName)
return ok && oldPrimaryAddr == watchedAddr
})
// transfer leader
primary.Close()
tc.WaitForPrimaryServing(re)
primary = tc.GetPrimaryServer()
re.NotEqual(addr, primary.GetAddr())
newPrimaryAddr := primary.GetAddr()
re.NotEqual(oldPrimaryAddr, newPrimaryAddr)
re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5)
testutil.Eventually(re, func() bool {
watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName)
return ok && newPrimaryAddr == watchedAddr
})
}
Loading
Loading