Skip to content

Commit

Permalink
server,pg: fix shutdown sequence (#611)
Browse files Browse the repository at this point in the history
* server,pg: fix shutdown sequence

* don't panic on clean shutdown

* i can spell
  • Loading branch information
jchappelow committed Mar 25, 2024
1 parent 5dc2d6b commit 8326adf
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 32 deletions.
4 changes: 4 additions & 0 deletions cmd/kwild/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package root

import (
"context"
"errors"
"fmt"
"net/http"
_ "net/http/pprof"
Expand Down Expand Up @@ -75,6 +76,9 @@ func RootCmd() *cobra.Command {

svr, err := server.New(ctx, kwildCfg, genesisConfig, nodeKey, autoGen)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil // clean shutdown
}
return err
}

Expand Down
45 changes: 36 additions & 9 deletions cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,15 @@ type coreDependencies struct {
// it is used to close all resources on shutdown
type closeFuncs struct {
closers []func() error
logger log.Logger
}

func (c *closeFuncs) addCloser(f func() error) {
c.closers = append([]func() error{f}, c.closers...) // slices.Insert(c.closers, 0, f)
func (c *closeFuncs) addCloser(f func() error, msg string) {
// push to top of stack
c.closers = slices.Insert(c.closers, 0, func() error {
c.logger.Info(msg)
return f()
})
}

// closeAll closes all closers
Expand Down Expand Up @@ -355,7 +360,7 @@ func buildEventStore(d *coreDependencies, closers *closeFuncs) *events.EventStor
if err != nil {
failBuild(err, "failed to build event store")
}
closers.addCloser(db.Close)
closers.addCloser(db.Close, "closing event store")

e, err := events.NewEventStore(d.ctx, db)
if err != nil {
Expand Down Expand Up @@ -391,7 +396,7 @@ func buildDB(d *coreDependencies, closer *closeFuncs) *pg.DB {
if err != nil {
failBuild(err, "kwild database open failed")
}
closer.addCloser(db.Close)
closer.addCloser(db.Close, "closing main DB")

return db
}
Expand Down Expand Up @@ -560,7 +565,7 @@ func buildAdminService(d *coreDependencies, closer *closeFuncs, admsvc admpb.Adm
if err != nil {
failBuild(err, "failed to build grpc server")
}
closer.addCloser(lis.Close)
closer.addCloser(lis.Close, "stopping admin service")

// client certs
caCertPool := x509.NewCertPool()
Expand Down Expand Up @@ -632,7 +637,7 @@ func buildAdminService(d *coreDependencies, closer *closeFuncs, admsvc admpb.Adm
failBuild(err, "failed to listen to unix socket")
}

closer.addCloser(lis.Close)
closer.addCloser(lis.Close, "stopping admin service")

err = os.Chmod(u.Target, 0777) // TODO: probably want this to be more restrictive
if err != nil {
Expand Down Expand Up @@ -679,7 +684,7 @@ func buildCometNode(d *coreDependencies, closer *closeFuncs, abciApp abciTypes.A
if err != nil {
failBuild(err, "failed to build comet node")
}
closer.addCloser(db.Close)
closer.addCloser(db.Close, "closing signing store")

readWriter := &atomicReadWriter{
kv: db,
Expand All @@ -700,7 +705,7 @@ func buildCometNode(d *coreDependencies, closer *closeFuncs, abciApp abciTypes.A
}
}

node, err := cometbft.NewCometBftNode(abciApp, nodeCfg, genDoc, d.privKey,
node, err := cometbft.NewCometBftNode(d.ctx, abciApp, nodeCfg, genDoc, d.privKey,
readWriter, &d.log)
if err != nil {
failBuild(err, "failed to build comet node")
Expand All @@ -709,9 +714,31 @@ func buildCometNode(d *coreDependencies, closer *closeFuncs, abciApp abciTypes.A
return node
}

// panicErr is the type given to panic from failBuild so that the wrapped error
// may be type-inspected.
type panicErr struct {
err error
msg string
}

func (pe panicErr) String() string {
return pe.msg
}

func (pe panicErr) Error() string { // error interface
return pe.msg
}

func (pe panicErr) Unwrap() error {
return pe.err
}

func failBuild(err error, msg string) {
if err != nil {
panic(fmt.Sprintf("%s: %s", msg, err.Error()))
panic(panicErr{
err: err,
msg: fmt.Sprintf("%s: %s", msg, err),
})
}
}

Expand Down
24 changes: 15 additions & 9 deletions cmd/kwild/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,32 @@ const (

// New builds the kwild server.
func New(ctx context.Context, cfg *config.KwildConfig, genesisCfg *config.GenesisConfig, nodeKey *crypto.Ed25519PrivateKey, autogen bool) (svr *Server, err error) {
logger, err := log.NewChecked(*cfg.LogConfig())
if err != nil {
return nil, fmt.Errorf("invalid logger config: %w", err)
}
logger = *logger.Named("kwild")

closers := &closeFuncs{
closers: make([]func() error, 0),
logger: logger,
}

defer func() {
if r := recover(); r != nil {
svr = nil
stack := make([]byte, 8192)
length := runtime.Stack(stack, false)
err = fmt.Errorf("panic while building kwild: %v\n\nstack:\n\n%v", r, string(stack[:length]))
if pe, ok := r.(panicErr); ok && errors.Is(pe.err, context.Canceled) {
logger.Warnf("Shutdown signaled: %v", pe.msg)
err = pe // interrupt request (shutdown) during bringup, not a crash
} else {
stack := make([]byte, 8192)
length := runtime.Stack(stack, false)
err = fmt.Errorf("panic while building kwild: %v\n\nstack:\n\n%v", r, string(stack[:length]))
}
closers.closeAll()
}
}()

logger, err := log.NewChecked(*cfg.LogConfig())
if err != nil {
return nil, fmt.Errorf("invalid logger config: %w", err)
}
logger = *logger.Named("kwild")

if cfg.AppCfg.TLSKeyFile == "" || cfg.AppCfg.TLSCertFile == "" {
return nil, errors.New("unspecified TLS key and/or certificate")
}
Expand Down
11 changes: 8 additions & 3 deletions internal/abci/cometbft/node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cometbft

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -163,7 +164,7 @@ WARNING: These files are overwritten on kwild startup.`
}

// NewCometBftNode creates a new CometBFT node.
func NewCometBftNode(app abciTypes.Application, conf *cometConfig.Config, genDoc *types.GenesisDoc, privateKey cometEd25519.PrivKey, atomicStore privval.AtomicReadWriter, log *log.Logger) (*CometBftNode, error) {
func NewCometBftNode(ctx context.Context, app abciTypes.Application, conf *cometConfig.Config, genDoc *types.GenesisDoc, privateKey cometEd25519.PrivKey, atomicStore privval.AtomicReadWriter, log *log.Logger) (*CometBftNode, error) {
if err := writeCometBFTConfigs(conf, genDoc); err != nil {
return nil, fmt.Errorf("failed to write the effective cometbft config files: %w", err)
}
Expand All @@ -180,7 +181,8 @@ func NewCometBftNode(app abciTypes.Application, conf *cometConfig.Config, genDoc
return nil, fmt.Errorf("failed to create private validator: %v", err)
}

node, err := cometNodes.NewNode(
node, err := cometNodes.NewNodeWithContext(
ctx,
conf,
privateValidator,
&p2p.NodeKey{
Expand All @@ -193,7 +195,10 @@ func NewCometBftNode(app abciTypes.Application, conf *cometConfig.Config, genDoc
logger,
)
if err != nil {
return nil, fmt.Errorf("failed to create CometBFT node: %v", err)
if errors.Is(ctx.Err(), context.Canceled) {
err = context.Canceled // canceled and comet forgot to use %w in doHandshake and elsewhere
}
return nil, fmt.Errorf("failed to create CometBFT node: %w", err)
}

return &CometBftNode{
Expand Down
5 changes: 4 additions & 1 deletion internal/sql/pg/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,10 @@ func (db *DB) precommit(ctx context.Context) ([]byte, error) {

// Wait for the "commit id" from the replication monitor.
select {
case commitID := <-resChan:
case commitID, ok := <-resChan:
if !ok {
return nil, errors.New("resChan unexpectedly closed")
}
logger.Debugf("received commit ID %x", commitID)
// The transaction is ready to commit, stored in a file with postgres in
// the pg_twophase folder of the pg cluster data_directory.
Expand Down
13 changes: 9 additions & 4 deletions internal/sql/pg/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ func replConn(ctx context.Context, host, port, user, pass, dbName string) (*pgco
return pgconn.Connect(ctx, connStr)
}

// startRepl creates a replication slot and begins receiving data. Cancelling
// the context only cancels creation of the connection. Use the quit function to
// terminate the monitoring goroutine.
func startRepl(ctx context.Context, conn *pgconn.PgConn, publicationName, slotName string,
schemaFilter func(string) bool) (chan []byte, chan error, error) {
schemaFilter func(string) bool) (chan []byte, chan error, context.CancelFunc, error) {
// Create the replication slot and start postgres sending WAL data.
startLSN, err := createRepl(ctx, conn, publicationName, slotName)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

// Launch the receiver goroutine, which will send commit digests and an
Expand All @@ -60,12 +63,14 @@ func startRepl(ctx context.Context, conn *pgconn.PgConn, publicationName, slotNa
// to the sentry table that would cause a send with no receiver.
commitHash := make(chan []byte, 1)

// Tie captureRepl goroutine to a new context now that connections are established.
ctx2, cancel := context.WithCancel(context.Background())
go func() {
defer close(commitHash)
done <- captureRepl(ctx, conn, uint64(startLSN), commitHash, schemaFilter)
done <- captureRepl(ctx2, conn, uint64(startLSN), commitHash, schemaFilter)
}()

return commitHash, done, nil
return commitHash, done, cancel, nil
}

func createRepl(ctx context.Context, conn *pgconn.PgConn, publicationName, slotName string) (pglogrepl.LSN, error) {
Expand Down
7 changes: 4 additions & 3 deletions internal/sql/pg/repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func Test_repl(t *testing.T) {

const publicationName = "kwild_repl"
var slotName = publicationName + random.String(8)
commitChan, errChan, err := startRepl(ctx, conn, publicationName, slotName, schemaFilter)
commitChan, errChan, quit, err := startRepl(ctx, conn, publicationName, slotName, schemaFilter)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -92,7 +92,7 @@ func Test_repl(t *testing.T) {
if !bytes.Equal(commitHash, wantCommitHash) {
t.Errorf("commit hash mismatch, got %x, wanted %x", commitHash, wantCommitHash)
}
cancel()
quit()
case err := <-errChan:
if errors.Is(err, context.Canceled) {
return
Expand All @@ -103,7 +103,7 @@ func Test_repl(t *testing.T) {
}
if err != nil {
t.Error(err)
cancel()
quit()
}
return
}
Expand Down Expand Up @@ -132,4 +132,5 @@ func Test_repl(t *testing.T) {
}

wg.Wait()
connQ.Close(ctx)
}
4 changes: 1 addition & 3 deletions internal/sql/pg/replmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,13 @@ type replMon struct {
// request a commit ID promise using the recvID method prior to committing a
// transaction.
func newReplMon(ctx context.Context, host, port, user, pass, dbName string, schemaFilter func(string) bool) (*replMon, error) {
ctx, quit := context.WithCancel(ctx)
conn, err := replConn(ctx, host, port, user, pass, dbName)
if err != nil {
quit()
return nil, err
}

var slotName = publicationName + random.String(8) // arbitrary, so just avoid collisions
commitChan, errChan, err := startRepl(ctx, conn, publicationName, slotName, schemaFilter)
commitChan, errChan, quit, err := startRepl(ctx, conn, publicationName, slotName, schemaFilter)
if err != nil {
quit()
conn.Close(ctx)
Expand Down

0 comments on commit 8326adf

Please sign in to comment.