Skip to content

Commit

Permalink
Fixing issues with agent shutdown process
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkmcc committed Nov 24, 2023
1 parent 2d66a05 commit 983c663
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 21 deletions.
20 changes: 8 additions & 12 deletions cmd/cloudcored/main.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package main

import (
"context"
"github.com/clarkmcc/cloudcore/internal/agent"
"github.com/clarkmcc/cloudcore/internal/logger"
"github.com/clarkmcc/cloudcore/internal/sysinfo"
"github.com/clarkmcc/cloudcore/internal/tasks"
_ "github.com/clarkmcc/cloudcore/internal/tasks/registered"
"github.com/spf13/cobra"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
"go.uber.org/zap"
"gopkg.in/tomb.v2"
"os"
"os/signal"
)

var cmd = &cobra.Command{
Use: "cloudcored",
RunE: func(cmd *cobra.Command, args []string) error {
t := tomb.Tomb{}
app := fx.New(
fx.Provide(literal(cmd)),
fx.Provide(signaller(&t)),
fx.Invoke(shutdowner),
fx.Provide(agent.NewConfig),
fx.Provide(agent.NewDatabase),
fx.Provide(agent.NewServer),
Expand All @@ -29,30 +30,25 @@ var cmd = &cobra.Command{
sysinfo.NewSystemMetadataProvider,
fx.As(new(agent.SystemMetadataProvider)))),
fx.Invoke(agent.NewLifecycleNotifications),
fx.Provide(func() (*tomb.Tomb, context.Context) {
tomb := tomb.Tomb{}
ctx, _ := signal.NotifyContext(tomb.Context(context.Background()), os.Interrupt)
return &tomb, ctx
}),
fx.Decorate(func(config *agent.Config) *agent.Logging {
return &config.Logging
}),
fx.Provide(func(config *agent.Config) *zap.Logger {
return logger.New(config.Logging.Level, config.Logging.Debug)
}),
fx.WithLogger(func(logger *zap.Logger) fxevent.Logger {
return &fxevent.ZapLogger{Logger: logger.Named("fx")}
}),
fx.Invoke(func(e *tasks.Executor) {
e.Initialize()
}),
fx.Invoke(func(s fx.Shutdowner, tomb *tomb.Tomb) error {
<-tomb.Dead()
return s.Shutdown(fx.ExitCode(0))
}),
)
err := app.Err()
if err != nil {
return err
}
app.Run()
<-t.Dead()
return nil
},
}
Expand Down
42 changes: 42 additions & 0 deletions cmd/cloudcored/providers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"context"
"go.uber.org/fx"
"go.uber.org/zap"
"gopkg.in/tomb.v2"
"os"
"os/signal"
)

// signaller accepts a global tomb that doesn't need to be provided via
// the fx framework and returns a fx.Provider function that takes control
// of the tomb and connects it to a signal handler. When the signal is
// received, then the tomb is killed.
//
// Why a global tomb rather than a tomb scoped to the fx app you may ask?
// Because we need the final shutdown step of the application to be waiting
// for the tomb to die, and this needs to happen outside the fx app.
func signaller(t *tomb.Tomb) func(logger *zap.Logger) (*tomb.Tomb, context.Context) {
return func(logger *zap.Logger) (*tomb.Tomb, context.Context) {
ctx, _ := signal.NotifyContext(t.Context(context.Background()), os.Interrupt)
go func() {
<-ctx.Done()
logger.Info("received shutdown signal")
t.Kill(ctx.Err())
}()
return t, ctx
}
}

// shutdowner is a fx.Invoke-compatible function that triggers an fx shutdown
// when we see that the tomb is dying.
func shutdowner(s fx.Shutdowner, tomb *tomb.Tomb, logger *zap.Logger) {
go func() {
<-tomb.Dying()
err := s.Shutdown(fx.ExitCode(0))
if err != nil {
logger.Error("shutting down", zap.Error(err))
}
}()
}
48 changes: 39 additions & 9 deletions internal/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ import (
"gopkg.in/tomb.v2"
"io"
"sync"
"time"
)

type Client struct {
dialer func(ctx context.Context) (*brpc.ClientConn, error)
service rpc.AgentServer
logger *zap.Logger
tomb *tomb.Tomb
dialer func(ctx context.Context) (*brpc.ClientConn, error)
service rpc.AgentServer
logger *zap.Logger
tomb *tomb.Tomb
shutdown <-chan struct{}

tokenManager *tokenManager

Expand Down Expand Up @@ -165,14 +167,15 @@ func (c *Client) connectStreamsLocked(ctx context.Context) (err error) {
return nil
}
}
c.notify, err = c.agent.Notifications(ctx)
c.notify, err = c.agent.Notifications(c.shutdownCtx(ctx))
if err != nil {
return err
}
return nil
}

func (c *Client) setupClientsLocked(ctx context.Context) (err error) {
ctx = c.shutdownCtx(ctx)
c.conn, err = c.dialer(ctx)
if err != nil {
return err
Expand All @@ -181,7 +184,7 @@ func (c *Client) setupClientsLocked(ctx context.Context) (err error) {
c.agent = rpc.NewAgentManagerClient(c.conn)

c.tomb.Go(func() error {
err = brpc.ServeClientService[rpc.AgentServer](c.tomb.Dying(), c.conn, func(registrar grpc.ServiceRegistrar) {
err = brpc.ServeClientService[rpc.AgentServer](ctx.Done(), c.conn, func(registrar grpc.ServiceRegistrar) {
rpc.RegisterAgentServer(registrar, c.service)
})
if err != nil {
Expand All @@ -198,7 +201,33 @@ func (c *Client) setupClientsLocked(ctx context.Context) (err error) {
return nil
}

// shutdownCtx returns a special context that cancels once the tomb is fully
// dead. The idea here is that the client should be the last thing to shut
// down because some shutdown procedures need a working client in order to
// work (such as sending shutdown notifications).
//
// If the provided context dies, we wait for 5 seconds for the tomb to fully
// die before cancelling the return context. The idea is we:
// 1. Don't want to block forever if the provided context is cancelled but
// the tomb doesn't die.
// 2. Don't want to cancel the returned context at the same time we get the
// shutdown signal.
func (c *Client) shutdownCtx(ctx context.Context) context.Context {
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
go func() {
select {
case <-ctx.Done():
case <-c.shutdown:
}
time.Sleep(5 * time.Second)
c.logger.Error("shutting down client context")
cancel()
}()
return ctx
}

func NewClient(
ctx context.Context,
config *Config,
tomb *tomb.Tomb,
cmd *cobra.Command,
Expand All @@ -208,9 +237,10 @@ func NewClient(
metadataProvider SystemMetadataProvider,
) *Client {
c := &Client{
tomb: tomb,
service: service,
logger: logger.Named("client"),
tomb: tomb,
service: service,
shutdown: ctx.Done(),
logger: logger.Named("client"),
dialer: func(ctx context.Context) (*brpc.ClientConn, error) {
return brpc.DialContext(ctx, config.Server.Endpoint, &tls.Config{
InsecureSkipVerify: cast.ToBool(cmd.Flag("insecure-skip-verify").Value.String()),
Expand Down

0 comments on commit 983c663

Please sign in to comment.