From ae1e8bb48a5c559f89a7976e8221184fac7037c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Thu, 2 May 2024 14:10:32 +0200 Subject: [PATCH] feat: use runners to startup the services --- ocis-pkg/runner/factory.go | 134 +++++++++++++++++++ ocis-pkg/runner/types.go | 6 + services/collaboration/pkg/command/server.go | 58 ++++---- services/thumbnails/pkg/command/server.go | 59 +++----- services/webfinger/pkg/command/server.go | 52 +++---- 5 files changed, 209 insertions(+), 100 deletions(-) create mode 100644 ocis-pkg/runner/factory.go diff --git a/ocis-pkg/runner/factory.go b/ocis-pkg/runner/factory.go new file mode 100644 index 00000000000..3dcd0a05547 --- /dev/null +++ b/ocis-pkg/runner/factory.go @@ -0,0 +1,134 @@ +package runner + +import ( + "context" + "errors" + "net" + "net/http" + "time" + + ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" + ohttp "github.com/owncloud/ocis/v2/ocis-pkg/service/http" + "google.golang.org/grpc" +) + +// NewGoMicroGrpcServerRunner creates a new runner based on the provided go-micro's +// GRPC service. The service is expected to be created via +// "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc".NewService(...) function +// +// The runner will behave as described: +// * The task is to start a server and listen for connections. If the server +// can't start, the task will finish with that error. +// * The stopper will call the server's stop method and send the result to +// the task. +// * The stopper will run asynchronously because the stop method could take a +// while and we don't want to block +func NewGoMicroGrpcServerRunner(name string, server ogrpc.Service, opts ...Option) *Runner { + httpCh := make(chan error, 1) + r := New(name, func() error { + // start the server and return if it fails + if err := server.Server().Start(); err != nil { + return err + } + return <-httpCh // wait for the result + }, func() { + // stop implies deregistering and waiting for request to finish, + // so don't block + go func() { + httpCh <- server.Server().Stop() // stop and send result through channel + close(httpCh) + }() + }, opts...) + return r +} + +// NewGoMicroHttpServerRunner creates a new runner based on the provided go-micro's +// HTTP service. The service is expected to be created via +// "github.com/owncloud/ocis/v2/ocis-pkg/service/http".NewService(...) function +// +// The runner will behave as described: +// * The task is to start a server and listen for connections. If the server +// can't start, the task will finish with that error. +// * The stopper will call the server's stop method and send the result to +// the task. +// * The stopper will run asynchronously because the stop method could take a +// while and we don't want to block +func NewGoMicroHttpServerRunner(name string, server ohttp.Service, opts ...Option) *Runner { + httpCh := make(chan error, 1) + r := New(name, func() error { + // start the server and return if it fails + if err := server.Server().Start(); err != nil { + return err + } + return <-httpCh // wait for the result + }, func() { + // stop implies deregistering and waiting for request to finish, + // so don't block + go func() { + httpCh <- server.Server().Stop() // stop and send result through channel + close(httpCh) + }() + }, opts...) + return r +} + +// NewGolangHttpServerRunner creates a new runner based on the provided HTTP server. +// The HTTP server is expected to be created via +// "github.com/owncloud/ocis/v2/ocis-pkg/service/debug".NewService(...) function +// and it's expected to be a regular golang HTTP server +// +// The runner will behave as described: +// * The task starts a server and listen for connections. If the server +// can't start, the task will finish with that error. If the server is shutdown +// the task will wait for the shutdown to return that result (task won't finish +// immediately, but wait until shutdown returns) +// * The stopper will call the server's shutdown method and send the result to +// the task. The stopper will wait up to 5 secs for the shutdown. +// * The stopper will run asynchronously because the shutdown could take a +// while and we don't want to block +func NewGolangHttpServerRunner(name string, server *http.Server, opts ...Option) *Runner { + debugCh := make(chan error, 1) + r := New(name, func() error { + // start listening and return if the error is NOT ErrServerClosed. + // ListenAndServe will always return a non-nil error. + // We need to wait and get the result of the Shutdown call. + // App shouldn't exit until Shutdown has returned. + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + // wait for the shutdown and return the result + return <-debugCh + }, func() { + // Since Shutdown might take some time, don't block + go func() { + // give 5 secs for the shutdown to finish + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + debugCh <- server.Shutdown(shutdownCtx) + close(debugCh) + }() + }, opts...) + + return r +} + +// NewGolangGrpcServerRunner creates a new runner based on the provided GRPC +// server. The GRPC server is expected to be a regular golang GRPC server, +// created via "google.golang.org/grpc".NewServer(...) +// A listener also needs to be provided for the server to listen there. +// +// The runner will just start the GRPC server in the listener, and the server +// will be gracefully stopped when interrupted +func NewGolangGrpcServerRunner(name string, server *grpc.Server, listener net.Listener, opts ...Option) *Runner { + r := New(name, func() error { + return server.Serve(listener) + }, func() { + // Since GracefulStop might take some time, don't block + go func() { + server.GracefulStop() + }() + }, opts...) + + return r +} diff --git a/ocis-pkg/runner/types.go b/ocis-pkg/runner/types.go index c4914e32634..220117df011 100644 --- a/ocis-pkg/runner/types.go +++ b/ocis-pkg/runner/types.go @@ -1,10 +1,16 @@ package runner import ( + "os" "strings" + "syscall" "time" ) +var ( + StopSignals = []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL} +) + // Runable represent a task that can be executed by the Runner. // It expected to be a long running task with an indefinite execution time, // so it's suitable for servers or services. diff --git a/services/collaboration/pkg/command/server.go b/services/collaboration/pkg/command/server.go index c18b6414866..bd3643c8360 100644 --- a/services/collaboration/pkg/command/server.go +++ b/services/collaboration/pkg/command/server.go @@ -4,9 +4,10 @@ import ( "context" "fmt" "net" + "os/signal" - "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/ocis-pkg/runner" "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/services/collaboration/pkg/config" "github.com/owncloud/ocis/v2/services/collaboration/pkg/config/parser" @@ -35,14 +36,12 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr := run.Group{} - ctx, cancel := func() (context.Context, context.CancelFunc) { - if cfg.Context == nil { - return context.WithCancel(context.Background()) - } - return context.WithCancel(cfg.Context) - }() - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } // prepare components if err := helpers.RegisterOcisService(ctx, cfg, logger); err != nil { @@ -63,6 +62,8 @@ func Server(cfg *config.Config) *cli.Command { return err } + gr := runner.NewGroup() + // start GRPC server grpcServer, teardown, err := grpc.Server( grpc.AppURLs(appUrls), @@ -78,20 +79,11 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - l, err := net.Listen("tcp", cfg.GRPC.Addr) - if err != nil { - return err - } - return grpcServer.Serve(l) - }, - func(_ error) { - logger.Error(). - Err(err). - Str("server", "grpc"). - Msg("shutting down server") - cancel() - }) + l, err := net.Listen("tcp", cfg.GRPC.Addr) + if err != nil { + return err + } + gr.Add(runner.NewGolangGrpcServerRunner("collaboration_grpc", grpcServer, l)) // start debug server debugServer, err := debug.Server( @@ -103,11 +95,7 @@ func Server(cfg *config.Config) *cli.Command { logger.Info().Err(err).Str("transport", "debug").Msg("Failed to initialize server") return err } - - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("collaboration_debug", debugServer)) // start HTTP server httpServer, err := http.Server( @@ -117,11 +105,17 @@ func Server(cfg *config.Config) *cli.Command { http.Context(ctx), http.TracerProvider(traceProvider), ) - gr.Add(httpServer.Run, func(_ error) { - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("collaboration_http", httpServer)) + + grResults := gr.Run(ctx) - return gr.Run() + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/thumbnails/pkg/command/server.go b/services/thumbnails/pkg/command/server.go index d62095dd28e..ea2a9076e59 100644 --- a/services/thumbnails/pkg/command/server.go +++ b/services/thumbnails/pkg/command/server.go @@ -3,10 +3,10 @@ package command import ( "context" "fmt" - "os" + "os/signal" - "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/ocis-pkg/runner" ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/ocis-pkg/version" @@ -41,20 +41,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - var ( - gr = run.Group{} - ctx, cancel = func() (context.Context, context.CancelFunc) { - if cfg.Context == nil { - return context.WithCancel(context.Background()) - } - return context.WithCancel(cfg.Context) - }() - m = metrics.New() - ) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - defer cancel() + metrics := metrics.New() + metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1) - m.BuildInfo.WithLabelValues(version.GetString()).Set(1) + gr := runner.NewGroup() service := grpc.NewService( grpc.Logger(logger), @@ -66,16 +63,7 @@ func Server(cfg *config.Config) *cli.Command { grpc.Metrics(m), grpc.TraceProvider(traceProvider), ) - - gr.Add(service.Run, func(_ error) { - logger.Error(). - Err(err). - Str("server", "grpc"). - Msg("Shutting down server") - - cancel() - os.Exit(1) - }) + gr.Add(runner.NewGoMicroGrpcServerRunner("thumbnails_grpc", service)) server, err := debug.Server( debug.Logger(logger), @@ -85,11 +73,7 @@ func Server(cfg *config.Config) *cli.Command { logger.Info().Err(err).Str("transport", "debug").Msg("Failed to initialize server") return err } - - gr.Add(server.ListenAndServe, func(_ error) { - _ = server.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("thumbnails_debug", server)) httpServer, err := http.Server( http.Logger(logger), @@ -107,16 +91,17 @@ func Server(cfg *config.Config) *cli.Command { return err } + gr.Add(runner.NewGoMicroHttpServerRunner("thumbnails_http", httpServer)) - gr.Add(httpServer.Run, func(_ error) { - logger.Error(). - Err(err). - Str("server", "http"). - Msg("Shutting down server") - cancel() - }) + grResults := gr.Run(ctx) - return gr.Run() + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/webfinger/pkg/command/server.go b/services/webfinger/pkg/command/server.go index 2d8c4b8e45c..fe543e5caa4 100644 --- a/services/webfinger/pkg/command/server.go +++ b/services/webfinger/pkg/command/server.go @@ -3,10 +3,10 @@ package command import ( "context" "fmt" - "os" + "os/signal" - "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/ocis-pkg/runner" "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/ocis-pkg/version" "github.com/owncloud/ocis/v2/services/webfinger/pkg/config" @@ -36,21 +36,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - var ( - gr = run.Group{} - ctx, cancel = func() (context.Context, context.CancelFunc) { - if cfg.Context == nil { - return context.WithCancel(context.Background()) - } - return context.WithCancel(cfg.Context) - }() - m = metrics.New(metrics.Logger(logger)) - ) - - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + m := metrics.New(metrics.Logger(logger)) m.BuildInfo.WithLabelValues(version.GetString()).Set(1) + gr := runner.NewGroup() { relationProviders, err := getRelationProviders(cfg) if err != nil { @@ -88,17 +84,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - return server.Run() - }, func(err error) { - logger.Error(). - Err(err). - Str("server", "http"). - Msg("Shutting down server") - - cancel() - os.Exit(1) - }) + gr.Add(runner.NewGoMicroHttpServerRunner("webfinger_http", server)) } { @@ -113,14 +99,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(server.ListenAndServe, func(err error) { - logger.Error().Err(err) - _ = server.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("webfinger_debug", server)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } }