-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mcs: forward current http request to mcs #7078
Changes from 7 commits
4e6f3b3
780736b
4012981
e8a88b9
0f733dc
32a81e3
7eafe59
40f129f
98a573b
590d971
2193aff
f070884
338058d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
import ( | ||
"net/http" | ||
"net/url" | ||
"strings" | ||
|
||
"github.com/pingcap/log" | ||
"github.com/tikv/pd/pkg/errs" | ||
|
@@ -108,24 +109,35 @@ | |
return false, "" | ||
} | ||
for _, rule := range h.microserviceRedirectRules { | ||
if rule.matchPath == r.URL.Path { | ||
if strings.HasPrefix(r.URL.Path, rule.matchPath) { | ||
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName) | ||
if !ok || addr == "" { | ||
log.Warn("failed to get the service primary addr when try match redirect rules", | ||
log.Warn("failed to get the service primary addr when trying to match redirect rules", | ||
zap.String("path", r.URL.Path)) | ||
} | ||
r.URL.Path = rule.targetPath | ||
// Extract parameters from the URL path | ||
lhy1024 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// e.g. r.URL.Path = /pd/api/v1/operators/1 (before redirect) | ||
// matchPath = /pd/api/v1/operators | ||
// targetPath = /scheduling/api/v1/operators | ||
// r.URL.Path = /scheduling/api/v1/operator/1 (after redirect) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about using the custom way to do the transfer? Because we might change the previous path parameters to query parameters. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will add TODO when meet other interfaces, which not support restful |
||
pathParams := strings.TrimPrefix(r.URL.Path, rule.matchPath) | ||
pathParams = strings.Trim(pathParams, "/") // Remove leading and trailing '/' | ||
if len(pathParams) > 0 { | ||
r.URL.Path = rule.targetPath + "/" + pathParams | ||
} else { | ||
r.URL.Path = rule.targetPath | ||
} | ||
return true, addr | ||
} | ||
} | ||
return false, "" | ||
} | ||
|
||
func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { | ||
matchedFlag, targetAddr := h.matchMicroServiceRedirectRules(r) | ||
redirectToMicroService, targetAddr := h.matchMicroServiceRedirectRules(r) | ||
allowFollowerHandle := len(r.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0 | ||
isLeader := h.s.GetMember().IsLeader() | ||
if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !matchedFlag { | ||
if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !redirectToMicroService { | ||
next(w, r) | ||
return | ||
} | ||
|
@@ -150,7 +162,7 @@ | |
} | ||
|
||
var clientUrls []string | ||
if matchedFlag { | ||
if redirectToMicroService { | ||
if len(targetAddr) == 0 { | ||
http.Error(w, apiutil.ErrRedirectFailed, http.StatusInternalServerError) | ||
return | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ import ( | |
"net/http" | ||
|
||
"github.com/gorilla/mux" | ||
scheapi "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" | ||
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" | ||
mcs "github.com/tikv/pd/pkg/mcs/utils" | ||
"github.com/tikv/pd/pkg/utils/apiutil" | ||
|
@@ -35,14 +36,29 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP | |
Name: "core", | ||
IsCore: true, | ||
} | ||
router := mux.NewRouter() | ||
prefix := apiPrefix + "/api/v1" | ||
r := createRouter(apiPrefix, svr) | ||
router := mux.NewRouter() | ||
router.PathPrefix(apiPrefix).Handler(negroni.New( | ||
serverapi.NewRuntimeServiceValidator(svr, group), | ||
serverapi.NewRedirector(svr, serverapi.MicroserviceRedirectRule( | ||
apiPrefix+"/api/v1"+"/admin/reset-ts", | ||
tsoapi.APIPathPrefix+"/admin/reset-ts", | ||
mcs.TSOServiceName)), | ||
serverapi.NewRedirector(svr, | ||
serverapi.MicroserviceRedirectRule( | ||
prefix+"/admin/reset-ts", | ||
tsoapi.APIPathPrefix+"/admin/reset-ts", | ||
mcs.TSOServiceName), | ||
serverapi.MicroserviceRedirectRule( | ||
prefix+"/operators", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about the config or other paths? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will resolve them in other PRs, this PR only forwards the current HTTP method in scheduling server. |
||
scheapi.APIPathPrefix+"/operators", | ||
mcs.SchedulingServiceName), | ||
serverapi.MicroserviceRedirectRule( | ||
prefix+"/checker", // Note: this is a typo in the original code | ||
scheapi.APIPathPrefix+"/checkers", | ||
mcs.SchedulingServiceName), | ||
serverapi.MicroserviceRedirectRule( | ||
prefix+"/schedulers", | ||
scheapi.APIPathPrefix+"/schedulers", | ||
mcs.SchedulingServiceName), | ||
lhy1024 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
), | ||
negroni.Wrap(r)), | ||
) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ package operator_test | |
import ( | ||
"context" | ||
"encoding/hex" | ||
"encoding/json" | ||
"strconv" | ||
"strings" | ||
"testing" | ||
|
@@ -251,3 +252,32 @@ func TestOperator(t *testing.T) { | |
return strings.Contains(string(output1), "Success!") || strings.Contains(string(output2), "Success!") | ||
}) | ||
} | ||
|
||
func TestMicroservice(t *testing.T) { | ||
re := require.New(t) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
cluster, err := tests.NewTestAPICluster(ctx, 1) | ||
re.NoError(err) | ||
re.NoError(cluster.RunInitialServers()) | ||
re.NotEmpty(cluster.WaitLeader()) | ||
server := cluster.GetServer(cluster.GetLeader()) | ||
re.NoError(server.BootstrapCluster()) | ||
backendEndpoints := server.GetAddr() | ||
tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints) | ||
re.NoError(err) | ||
defer tc.Destroy() | ||
tc.WaitForPrimaryServing(re) | ||
|
||
cmd := pdctlCmd.GetRootCmd() | ||
args := []string{"-u", backendEndpoints, "operator", "show"} | ||
var slice []string | ||
output, err := pdctl.ExecuteCommand(cmd, args...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How should we make sure this request is forwarded rather than being handled directly in this test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
re.NoError(err) | ||
re.NoError(json.Unmarshal(output, &slice)) | ||
re.Len(slice, 0) | ||
args = []string{"-u", backendEndpoints, "operator", "check", "2"} | ||
output, err = pdctl.ExecuteCommand(cmd, args...) | ||
re.NoError(err) | ||
re.Contains(string(output), "null") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -530,3 +530,28 @@ func mightExec(re *require.Assertions, cmd *cobra.Command, args []string, v inte | |
} | ||
json.Unmarshal(output, v) | ||
} | ||
|
||
func TestMicroservice(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How to know if the output comes from the scheduling server? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we have already disabled PD scheduling in api mode? How about adding info in the response header for further confirmation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about using a more concrete name? |
||
re := require.New(t) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
cluster, err := tests.NewTestAPICluster(ctx, 1) | ||
re.NoError(err) | ||
re.NoError(cluster.RunInitialServers()) | ||
re.NotEmpty(cluster.WaitLeader()) | ||
server := cluster.GetServer(cluster.GetLeader()) | ||
re.NoError(server.BootstrapCluster()) | ||
backendEndpoints := server.GetAddr() | ||
tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints) | ||
re.NoError(err) | ||
defer tc.Destroy() | ||
tc.WaitForPrimaryServing(re) | ||
|
||
cmd := pdctlCmd.GetRootCmd() | ||
args := []string{"-u", backendEndpoints, "scheduler", "show"} | ||
var slice []string | ||
output, err := pdctl.ExecuteCommand(cmd, args...) | ||
re.NoError(err) | ||
re.NoError(json.Unmarshal(output, &slice)) | ||
re.Contains(slice, "balance-leader-scheduler") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a concern about it and am not sure if only two keys will affect it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible, but for now gzip only affects these two, and we'll do more testing as we add interfaces.