From fe8a393e5cc898ab65c8d683b2f7aaa33252dfc1 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 10 Nov 2023 16:45:42 +0800 Subject: [PATCH] mcs: tso service should not forward again (#7348) ref tikv/pd#5836 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/tso/server/grpc_service.go | 34 ------------------------------ pkg/mcs/tso/server/server.go | 3 --- 2 files changed, 37 deletions(-) diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index 40a308c72f8b..9006faf49da0 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -28,8 +28,6 @@ import ( bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" - "github.com/tikv/pd/pkg/utils/grpcutil" - "github.com/tikv/pd/pkg/utils/tsoutil" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -88,21 +86,9 @@ func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler // Tso returns a stream of timestamps func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { - var ( - doneCh chan struct{} - errCh chan error - ) ctx, cancel := context.WithCancel(stream.Context()) defer cancel() for { - // Prevent unnecessary performance overhead of the channel. - if errCh != nil { - select { - case err := <-errCh: - return errors.WithStack(err) - default: - } - } request, err := stream.Recv() if err == io.EOF { return nil @@ -111,26 +97,6 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { return errors.WithStack(err) } - streamCtx := stream.Context() - forwardedHost := grpcutil.GetForwardedHost(streamCtx) - if !s.IsLocalRequest(forwardedHost) { - clientConn, err := s.GetDelegateClient(s.Context(), s.GetTLSConfig(), forwardedHost) - if err != nil { - return errors.WithStack(err) - } - - if errCh == nil { - doneCh = make(chan struct{}) - defer close(doneCh) - errCh = make(chan error) - } - - tsoProtoFactory := s.tsoProtoFactory - tsoRequest := tsoutil.NewTSOProtoRequest(forwardedHost, clientConn, request, stream) - s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh) - continue - } - start := time.Now() // TSO uses leader lease to determine validity. No need to check leader here. if s.IsClosed() { diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 16ef3216c629..1a2430477d89 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -78,9 +78,6 @@ type Server struct { service *Service keyspaceGroupManager *tso.KeyspaceGroupManager - // tsoDispatcher is used to dispatch the TSO requests to - // the corresponding forwarding TSO channels. - tsoDispatcher *tsoutil.TSODispatcher // tsoProtoFactory is the abstract factory for creating tso // related data structures defined in the tso grpc protocol tsoProtoFactory *tsoutil.TSOProtoFactory