Skip to content

Commit

Permalink
init HTTP handler
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 16, 2023
1 parent 99024a6 commit 11ee189
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 413 deletions.
5 changes: 5 additions & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ import (
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/join"
"go.uber.org/zap"

// register microservice HTTP API
_ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/apis/v1"
_ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
_ "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
)

func main() {
Expand Down
207 changes: 105 additions & 102 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,29 @@ import (
"os/signal"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/types"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -76,9 +75,15 @@ type Server struct {
etcdClient *clientv3.Client
httpClient *http.Client

secure bool
muxListener net.Listener
grpcServer *grpc.Server
httpServer *http.Server
service *Service

// Store as map[string]*grpc.ClientConn
clientConns sync.Map

// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
Expand All @@ -105,16 +110,19 @@ func (s *Server) GetAddr() string {

// Run runs the Resource Manager server.
func (s *Server) Run() (err error) {
if err = s.initClient(); err != nil {
return err
skipWaitAPIServiceReady := false
failpoint.Inject("skipWaitAPIServiceReady", func() {
skipWaitAPIServiceReady = true
})
if !skipWaitAPIServiceReady {
if err := utils.WaitAPIServiceReady(s); err != nil {
return err
}
}
if err = s.startServer(); err != nil {
if err := utils.InitClient(s); err != nil {
return err
}

s.startServerLoop()

return nil
return s.startServer()
}

func (s *Server) startServerLoop() {
Expand Down Expand Up @@ -211,6 +219,8 @@ func (s *Server) Close() {

log.Info("closing resource manager server ...")
s.serviceRegister.Deregister()
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.muxListener.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()
Expand Down Expand Up @@ -258,103 +268,97 @@ func (s *Server) IsClosed() bool {
return s != nil && atomic.LoadInt64(&s.isRunning) == 0
}

// IsSecure checks if the server enable TLS.
func (s *Server) IsSecure() bool {
return s.secure
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

func (s *Server) initClient() error {
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
}
u, err := types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ","))
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
return err
// GetBackendEndpoints returns the backend endpoints.
func (s *Server) GetBackendEndpoints() string {
return s.cfg.BackendEndpoints
}

func (s *Server) startGRPCServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetClientConns returns the client connections.
func (s *Server) GetClientConns() *sync.Map {
return &s.clientConns
}

gs := grpc.NewServer()
s.service.RegisterGRPCService(gs)
err := gs.Serve(l)
log.Info("gRPC server stop serving")
// ServerLoopWgDone decreases the server loop wait group.
func (s *Server) ServerLoopWgDone() {
s.serverLoopWg.Done()
}

// Attempt graceful stop (waits for pending RPCs), but force a stop if
// it doesn't happen in a reasonable amount of time.
done := make(chan struct{})
go func() {
defer logutil.LogPanic()
log.Info("try to gracefully stop the server now")
gs.GracefulStop()
close(done)
}()
timer := time.NewTimer(utils.DefaultGRPCGracefulStopTimeout)
defer timer.Stop()
select {
case <-done:
case <-timer.C:
log.Info("stopping grpc gracefully is taking longer than expected and force stopping now", zap.Duration("default", utils.DefaultGRPCGracefulStopTimeout))
gs.Stop()
}
if s.IsClosed() {
log.Info("grpc server stopped")
} else {
log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err))
}
// ServerLoopWgAdd increases the server loop wait group.
func (s *Server) ServerLoopWgAdd(n int) {
s.serverLoopWg.Add(n)
}

func (s *Server) startHTTPServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetHTTPServer returns the http server.
func (s *Server) GetHTTPServer() *http.Server {
return s.httpServer
}

handler, _ := SetUpRestHandler(s.service)
hs := &http.Server{
Handler: handler,
ReadTimeout: 5 * time.Minute,
ReadHeaderTimeout: 5 * time.Second,
}
err := hs.Serve(l)
log.Info("http server stop serving")
// SetHTTPServer sets the http server.
func (s *Server) SetHTTPServer(httpServer *http.Server) {
s.httpServer = httpServer
}

ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout)
defer cancel()
if err := hs.Shutdown(ctx); err != nil {
log.Error("http server shutdown encountered problem", errs.ZapError(err))
} else {
log.Info("all http(s) requests finished")
}
if s.IsClosed() {
log.Info("http server stopped")
} else {
log.Fatal("http server stopped unexpectedly", errs.ZapError(err))
}
// SetUpRestHandler sets up the REST handler.
func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) {
return SetUpRestHandler(s.service)
}

func (s *Server) startGRPCAndHTTPServers(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetGRPCServer returns the grpc server.
func (s *Server) GetGRPCServer() *grpc.Server {
return s.grpcServer
}

mux := cmux.New(l)
grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
httpL := mux.Match(cmux.Any())
// SetGRPCServer sets the grpc server.
func (s *Server) SetGRPCServer(grpcServer *grpc.Server) {
s.grpcServer = grpcServer
}

s.serverLoopWg.Add(2)
go s.startGRPCServer(grpcL)
go s.startHTTPServer(httpL)
// RegisterGRPCService registers the grpc service.
func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) {
s.service.RegisterGRPCService(grpcServer)
}

if err := mux.Serve(); err != nil {
if s.IsClosed() {
log.Info("mux stop serving", errs.ZapError(err))
} else {
log.Fatal("mux stop serving unexpectedly", errs.ZapError(err))
// SetETCDClient sets the etcd client.
func (s *Server) SetETCDClient(etcdClient *clientv3.Client) {
s.etcdClient = etcdClient
}

// SetHTTPClient sets the http client.
func (s *Server) SetHTTPClient(httpClient *http.Client) {
s.httpClient = httpClient
}

// GetTLSConfig gets the security config.
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}

// GetDelegateClient returns grpc client connection talking to the forwarded host
func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) {
client, ok := s.clientConns.Load(forwardedHost)
if !ok {
tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
if err != nil {
return nil, err
}
cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig)
if err != nil {
return nil, err
}
client = cc
s.clientConns.Store(forwardedHost, cc)
}
return client.(*grpc.ClientConn), nil
}

// GetLeaderListenUrls gets service endpoints from the leader in election group.
Expand All @@ -378,7 +382,10 @@ func (s *Server) startServer() (err error) {
s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)

s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}
s.service = &Service{
ctx: s.ctx,
manager: NewManager[*Server](s),
Expand All @@ -388,11 +395,8 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}
if tlsConfig != nil {
s.secure = true
s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig)
} else {
s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host)
Expand All @@ -401,8 +405,12 @@ func (s *Server) startServer() (err error) {
return err
}

serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
s.serverLoopWg.Add(1)
go s.startGRPCAndHTTPServers(s.muxListener)
go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.muxListener)
<-serverReadyChan
s.startServerLoop()

// Run callbacks
log.Info("triggering the start callback functions")
Expand Down Expand Up @@ -455,7 +463,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
return
} else if printVersion {
versioninfo.Print()
exit(0)
utils.Exit(0)
}

// New zap logger
Expand Down Expand Up @@ -500,13 +508,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
utils.Exit(0)
default:
exit(1)
utils.Exit(1)
}
}

func exit(code int) {
log.Sync()
os.Exit(code)
}
Loading

0 comments on commit 11ee189

Please sign in to comment.