Skip to content

Commit

Permalink
Merge pull request #300 from batchcorp/blinktag/fix_cli_shutdown_cleanup
Browse files Browse the repository at this point in the history
Fixing NATS cleanup on shutdown
  • Loading branch information
blinktag committed Jul 28, 2022
2 parents 6bc46e0 + 537b143 commit 1836f7c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 23 deletions.
50 changes: 32 additions & 18 deletions backends/nats-jetstream/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"strings"
"time"

"github.com/batchcorp/plumber-schemas/build/go/protos/args"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"

"github.com/batchcorp/plumber/validate"

"github.com/batchcorp/plumber-schemas/build/go/protos/args"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"
)
Expand All @@ -22,7 +22,7 @@ func (n *NatsJetstream) Read(ctx context.Context, readOpts *opts.ReadOptions, re
return errors.Wrap(err, "invalid read options")
}

jsCtx, err := n.client.JetStream()
jsCtx, err := n.client.JetStream(nats.Context(ctx))
if err != nil {
return errors.Wrap(err, "failed to get jetstream context")
}
Expand All @@ -32,6 +32,14 @@ func (n *NatsJetstream) Read(ctx context.Context, readOpts *opts.ReadOptions, re
// nats.Subscribe is async, use channel to wait to exit
doneCh := make(chan struct{})

go func() {
// NATS library does not appear to be respecting contexts
// sub.Fetch() will block indefinitely regardless of the context
// passed to nats.Conn.JetStream(), so force an exit.
<-ctx.Done()
doneCh <- struct{}{}
}()

var count int64

var consumerInfo *nats.ConsumerInfo
Expand Down Expand Up @@ -101,35 +109,41 @@ func (n *NatsJetstream) Read(ctx context.Context, readOpts *opts.ReadOptions, re
n.log.Errorf("unable to subscribe: %s", err)
}

defer sub.Unsubscribe()
defer n.cleanupConsumer(jsCtx, consumerInfo, readOpts.NatsJetstream.Args)

if sub.Type() == nats.PullSubscription {
TOP:
for {
msgs, err := sub.Fetch(1, nats.MaxWait(60*time.Second))
if err != nil {
if strings.Contains(err.Error(), "timeout") {
continue
go func() {
TOP:
for {
msgs, err := sub.Fetch(1, nats.MaxWait(60*time.Second))
if err != nil {
if strings.Contains(err.Error(), "timeout") {
continue
}
}
}

for _, m := range msgs {
handler(m)
for _, m := range msgs {
handler(m)

if !readOpts.Continuous {
break TOP
if !readOpts.Continuous {
break TOP
}
}
}
}
}()
}

<-doneCh

// Don't use defer for these since Read() is ran in a goroutine and there is no guarantee of execution of defers.
// Let these block in order to ensure proper cleanup and shutdown
sub.Unsubscribe()
n.cleanupConsumer(jsCtx, consumerInfo, readOpts.NatsJetstream.Args)

return nil
}

func (n *NatsJetstream) cleanupConsumer(jsCtx nats.JetStreamContext, ci *nats.ConsumerInfo, readArgs *args.NatsJetstreamReadArgs) {
defer n.log.Debug("Exited cleanup consumer")

// Nothing to do if no consumer info or read args
if ci == nil || readArgs == nil {
return
Expand All @@ -140,7 +154,7 @@ func (n *NatsJetstream) cleanupConsumer(jsCtx nats.JetStreamContext, ci *nats.Co
return
}

// Nothing to do if no jctx
// Nothing to do if no jsCtx
if jsCtx == nil {
return
}
Expand Down
5 changes: 2 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
defer persistentConfig.Save()

// We only want to intercept interrupt signals in relay or server mode
if cliOpts.Global.XAction == "relay" || cliOpts.Global.XAction == "server" {
if cliOpts.Global.XAction == "relay" || cliOpts.Global.XAction == "server" || cliOpts.Global.XAction == "read" {
logrus.Debug("Intercepting signals")

c := make(chan os.Signal, 1)
Expand All @@ -74,9 +74,8 @@ func main() {

go func() {
sig := <-c
logrus.Info("Shutting down plumber server...")
logrus.Debugf("Received system call: %+v", sig)

logrus.Info("Shutting down plumber...")
serviceShutdownFunc()
}()

Expand Down
9 changes: 7 additions & 2 deletions plumber/cli_read.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package plumber

import (
"context"
"os"

"github.com/pkg/errors"
Expand Down Expand Up @@ -29,10 +28,13 @@ func (p *Plumber) HandleReadCmd() error {

// backend.Read() blocks
go func() {
if err := backend.Read(context.Background(), p.CLIOptions.Read, resultCh, errorCh); err != nil {
if err := backend.Read(p.ServiceShutdownCtx, p.CLIOptions.Read, resultCh, errorCh); err != nil {
p.log.Errorf("unable to complete read for backend '%s': %s", backend.Name(), err)
os.Exit(0) // Exit out of plumber, since we can't continue
}

p.log.Debug("Read() exited, calling MainShutdownFunc()")
p.MainShutdownFunc()
}()

MAIN:
Expand All @@ -59,13 +61,16 @@ MAIN:
err = backend.DisplayMessage(p.CLIOptions, msg)
case errorMsg := <-errorCh:
err = backend.DisplayError(errorMsg)
case <-p.MainShutdownCtx.Done():
break MAIN
}

if err != nil {
printer.Errorf("unable to display message with '%s' backend: %s", backend.Name(), err)
}

if !p.CLIOptions.Read.Continuous {
<-p.MainShutdownCtx.Done()
break MAIN
}
}
Expand Down

0 comments on commit 1836f7c

Please sign in to comment.