diff --git a/cmd/kwild/root/root.go b/cmd/kwild/root/root.go index b6032b4d1..5610f4ad4 100644 --- a/cmd/kwild/root/root.go +++ b/cmd/kwild/root/root.go @@ -2,6 +2,7 @@ package root import ( "context" + "errors" "fmt" "net/http" _ "net/http/pprof" @@ -75,6 +76,9 @@ func RootCmd() *cobra.Command { svr, err := server.New(ctx, kwildCfg, genesisConfig, nodeKey, autoGen) if err != nil { + if errors.Is(err, context.Canceled) { + return nil // clean shutdown + } return err } diff --git a/cmd/kwild/server/build.go b/cmd/kwild/server/build.go index d3a4e3204..68bbe4b31 100644 --- a/cmd/kwild/server/build.go +++ b/cmd/kwild/server/build.go @@ -289,10 +289,15 @@ type coreDependencies struct { // it is used to close all resources on shutdown type closeFuncs struct { closers []func() error + logger log.Logger } -func (c *closeFuncs) addCloser(f func() error) { - c.closers = append([]func() error{f}, c.closers...) // slices.Insert(c.closers, 0, f) +func (c *closeFuncs) addCloser(f func() error, msg string) { + // push to top of stack + c.closers = slices.Insert(c.closers, 0, func() error { + c.logger.Info(msg) + return f() + }) } // closeAll closes all closers @@ -355,7 +360,7 @@ func buildEventStore(d *coreDependencies, closers *closeFuncs) *events.EventStor if err != nil { failBuild(err, "failed to build event store") } - closers.addCloser(db.Close) + closers.addCloser(db.Close, "closing event store") e, err := events.NewEventStore(d.ctx, db) if err != nil { @@ -391,7 +396,7 @@ func buildDB(d *coreDependencies, closer *closeFuncs) *pg.DB { if err != nil { failBuild(err, "kwild database open failed") } - closer.addCloser(db.Close) + closer.addCloser(db.Close, "closing main DB") return db } @@ -560,7 +565,7 @@ func buildAdminService(d *coreDependencies, closer *closeFuncs, admsvc admpb.Adm if err != nil { failBuild(err, "failed to build grpc server") } - closer.addCloser(lis.Close) + closer.addCloser(lis.Close, "stopping admin service") // client certs caCertPool := x509.NewCertPool() @@ -632,7 +637,7 @@ func buildAdminService(d *coreDependencies, closer *closeFuncs, admsvc admpb.Adm failBuild(err, "failed to listen to unix socket") } - closer.addCloser(lis.Close) + closer.addCloser(lis.Close, "stopping admin service") err = os.Chmod(u.Target, 0777) // TODO: probably want this to be more restrictive if err != nil { @@ -679,7 +684,7 @@ func buildCometNode(d *coreDependencies, closer *closeFuncs, abciApp abciTypes.A if err != nil { failBuild(err, "failed to build comet node") } - closer.addCloser(db.Close) + closer.addCloser(db.Close, "closing signing store") readWriter := &atomicReadWriter{ kv: db, @@ -700,7 +705,7 @@ func buildCometNode(d *coreDependencies, closer *closeFuncs, abciApp abciTypes.A } } - node, err := cometbft.NewCometBftNode(abciApp, nodeCfg, genDoc, d.privKey, + node, err := cometbft.NewCometBftNode(d.ctx, abciApp, nodeCfg, genDoc, d.privKey, readWriter, &d.log) if err != nil { failBuild(err, "failed to build comet node") @@ -709,9 +714,31 @@ func buildCometNode(d *coreDependencies, closer *closeFuncs, abciApp abciTypes.A return node } +// panicErr is the type given to panic from failBuild so that the wrapped error +// may be type-inspected. +type panicErr struct { + err error + msg string +} + +func (pe panicErr) String() string { + return pe.msg +} + +func (pe panicErr) Error() string { // error interface + return pe.msg +} + +func (pe panicErr) Unwrap() error { + return pe.err +} + func failBuild(err error, msg string) { if err != nil { - panic(fmt.Sprintf("%s: %s", msg, err.Error())) + panic(panicErr{ + err: err, + msg: fmt.Sprintf("%s: %s", msg, err), + }) } } diff --git a/cmd/kwild/server/server.go b/cmd/kwild/server/server.go index 98200b248..d34f97104 100644 --- a/cmd/kwild/server/server.go +++ b/cmd/kwild/server/server.go @@ -54,26 +54,32 @@ const ( // New builds the kwild server. func New(ctx context.Context, cfg *config.KwildConfig, genesisCfg *config.GenesisConfig, nodeKey *crypto.Ed25519PrivateKey, autogen bool) (svr *Server, err error) { + logger, err := log.NewChecked(*cfg.LogConfig()) + if err != nil { + return nil, fmt.Errorf("invalid logger config: %w", err) + } + logger = *logger.Named("kwild") + closers := &closeFuncs{ closers: make([]func() error, 0), + logger: logger, } defer func() { if r := recover(); r != nil { svr = nil - stack := make([]byte, 8192) - length := runtime.Stack(stack, false) - err = fmt.Errorf("panic while building kwild: %v\n\nstack:\n\n%v", r, string(stack[:length])) + if pe, ok := r.(panicErr); ok && errors.Is(pe.err, context.Canceled) { + logger.Warnf("Shutdown signaled: %v", pe.msg) + err = pe // interrupt request (shutdown) during bringup, not a crash + } else { + stack := make([]byte, 8192) + length := runtime.Stack(stack, false) + err = fmt.Errorf("panic while building kwild: %v\n\nstack:\n\n%v", r, string(stack[:length])) + } closers.closeAll() } }() - logger, err := log.NewChecked(*cfg.LogConfig()) - if err != nil { - return nil, fmt.Errorf("invalid logger config: %w", err) - } - logger = *logger.Named("kwild") - if cfg.AppCfg.TLSKeyFile == "" || cfg.AppCfg.TLSCertFile == "" { return nil, errors.New("unspecified TLS key and/or certificate") } diff --git a/internal/abci/cometbft/node.go b/internal/abci/cometbft/node.go index eb5bdef81..a8898a82c 100644 --- a/internal/abci/cometbft/node.go +++ b/internal/abci/cometbft/node.go @@ -1,6 +1,7 @@ package cometbft import ( + "context" "errors" "fmt" "os" @@ -163,7 +164,7 @@ WARNING: These files are overwritten on kwild startup.` } // NewCometBftNode creates a new CometBFT node. -func NewCometBftNode(app abciTypes.Application, conf *cometConfig.Config, genDoc *types.GenesisDoc, privateKey cometEd25519.PrivKey, atomicStore privval.AtomicReadWriter, log *log.Logger) (*CometBftNode, error) { +func NewCometBftNode(ctx context.Context, app abciTypes.Application, conf *cometConfig.Config, genDoc *types.GenesisDoc, privateKey cometEd25519.PrivKey, atomicStore privval.AtomicReadWriter, log *log.Logger) (*CometBftNode, error) { if err := writeCometBFTConfigs(conf, genDoc); err != nil { return nil, fmt.Errorf("failed to write the effective cometbft config files: %w", err) } @@ -180,7 +181,8 @@ func NewCometBftNode(app abciTypes.Application, conf *cometConfig.Config, genDoc return nil, fmt.Errorf("failed to create private validator: %v", err) } - node, err := cometNodes.NewNode( + node, err := cometNodes.NewNodeWithContext( + ctx, conf, privateValidator, &p2p.NodeKey{ @@ -193,7 +195,10 @@ func NewCometBftNode(app abciTypes.Application, conf *cometConfig.Config, genDoc logger, ) if err != nil { - return nil, fmt.Errorf("failed to create CometBFT node: %v", err) + if errors.Is(ctx.Err(), context.Canceled) { + err = context.Canceled // canceled and comet forgot to use %w in doHandshake and elsewhere + } + return nil, fmt.Errorf("failed to create CometBFT node: %w", err) } return &CometBftNode{ diff --git a/internal/sql/pg/db.go b/internal/sql/pg/db.go index 6b51a58f5..6ed6d2a31 100644 --- a/internal/sql/pg/db.go +++ b/internal/sql/pg/db.go @@ -320,7 +320,10 @@ func (db *DB) precommit(ctx context.Context) ([]byte, error) { // Wait for the "commit id" from the replication monitor. select { - case commitID := <-resChan: + case commitID, ok := <-resChan: + if !ok { + return nil, errors.New("resChan unexpectedly closed") + } logger.Debugf("received commit ID %x", commitID) // The transaction is ready to commit, stored in a file with postgres in // the pg_twophase folder of the pg cluster data_directory. diff --git a/internal/sql/pg/repl.go b/internal/sql/pg/repl.go index ebf16792c..1b67f250d 100644 --- a/internal/sql/pg/repl.go +++ b/internal/sql/pg/repl.go @@ -40,12 +40,15 @@ func replConn(ctx context.Context, host, port, user, pass, dbName string) (*pgco return pgconn.Connect(ctx, connStr) } +// startRepl creates a replication slot and begins receiving data. Cancelling +// the context only cancels creation of the connection. Use the quit function to +// terminate the monitoring goroutine. func startRepl(ctx context.Context, conn *pgconn.PgConn, publicationName, slotName string, - schemaFilter func(string) bool) (chan []byte, chan error, error) { + schemaFilter func(string) bool) (chan []byte, chan error, context.CancelFunc, error) { // Create the replication slot and start postgres sending WAL data. startLSN, err := createRepl(ctx, conn, publicationName, slotName) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // Launch the receiver goroutine, which will send commit digests and an @@ -60,12 +63,14 @@ func startRepl(ctx context.Context, conn *pgconn.PgConn, publicationName, slotNa // to the sentry table that would cause a send with no receiver. commitHash := make(chan []byte, 1) + // Tie captureRepl goroutine to a new context now that connections are established. + ctx2, cancel := context.WithCancel(context.Background()) go func() { defer close(commitHash) - done <- captureRepl(ctx, conn, uint64(startLSN), commitHash, schemaFilter) + done <- captureRepl(ctx2, conn, uint64(startLSN), commitHash, schemaFilter) }() - return commitHash, done, nil + return commitHash, done, cancel, nil } func createRepl(ctx context.Context, conn *pgconn.PgConn, publicationName, slotName string) (pglogrepl.LSN, error) { diff --git a/internal/sql/pg/repl_test.go b/internal/sql/pg/repl_test.go index 06f209212..7e52fe91b 100644 --- a/internal/sql/pg/repl_test.go +++ b/internal/sql/pg/repl_test.go @@ -57,7 +57,7 @@ func Test_repl(t *testing.T) { const publicationName = "kwild_repl" var slotName = publicationName + random.String(8) - commitChan, errChan, err := startRepl(ctx, conn, publicationName, slotName, schemaFilter) + commitChan, errChan, quit, err := startRepl(ctx, conn, publicationName, slotName, schemaFilter) if err != nil { t.Fatal(err) } @@ -92,7 +92,7 @@ func Test_repl(t *testing.T) { if !bytes.Equal(commitHash, wantCommitHash) { t.Errorf("commit hash mismatch, got %x, wanted %x", commitHash, wantCommitHash) } - cancel() + quit() case err := <-errChan: if errors.Is(err, context.Canceled) { return @@ -103,7 +103,7 @@ func Test_repl(t *testing.T) { } if err != nil { t.Error(err) - cancel() + quit() } return } @@ -132,4 +132,5 @@ func Test_repl(t *testing.T) { } wg.Wait() + connQ.Close(ctx) } diff --git a/internal/sql/pg/replmon.go b/internal/sql/pg/replmon.go index 1d37a7b07..2ddc9c45e 100644 --- a/internal/sql/pg/replmon.go +++ b/internal/sql/pg/replmon.go @@ -55,15 +55,13 @@ type replMon struct { // request a commit ID promise using the recvID method prior to committing a // transaction. func newReplMon(ctx context.Context, host, port, user, pass, dbName string, schemaFilter func(string) bool) (*replMon, error) { - ctx, quit := context.WithCancel(ctx) conn, err := replConn(ctx, host, port, user, pass, dbName) if err != nil { - quit() return nil, err } var slotName = publicationName + random.String(8) // arbitrary, so just avoid collisions - commitChan, errChan, err := startRepl(ctx, conn, publicationName, slotName, schemaFilter) + commitChan, errChan, quit, err := startRepl(ctx, conn, publicationName, slotName, schemaFilter) if err != nil { quit() conn.Close(ctx)