Skip to content

Commit

Permalink
Graceful stop panic fix (#877)
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Feb 14, 2023
1 parent 68ab12b commit 9b973f0
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 5 deletions.
8 changes: 4 additions & 4 deletions pkg/pipeline/stream/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ func (n *SourceNode) Stop(ctx context.Context, reason error) error {
// only execute stop once, more calls won't make a difference
err = n.stop(ctx, reason)
stopExecuted = true
if err != nil {
// an error happened, allow stop to be executed again
n.stopOnce = sync.Once{}
}
})
if err != nil {
// an error happened, allow stop to be executed again
n.stopOnce = sync.Once{}
}
if !stopExecuted {
n.logger.Warn(ctx).Msg("source connector stop already triggered, " +
"ignoring second stop request (if the pipeline is stuck, please " +
Expand Down
60 changes: 60 additions & 0 deletions pkg/pipeline/stream/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/conduitio/conduit/pkg/foundation/cchan"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/metrics/noop"
"github.com/conduitio/conduit/pkg/pipeline/stream/mock"
"github.com/conduitio/conduit/pkg/plugin"
Expand Down Expand Up @@ -74,6 +75,65 @@ func TestSourceNode_Run(t *testing.T) {
is.True(!ok) // expected nodeDone to be closed
}

func TestSourceNode_Stop_Fail(t *testing.T) {
is := is.New(t)
ctx := context.Background()
ctrl := gomock.NewController(t)

src := mock.NewSource(ctrl)

wantErr := cerrors.New("test error")
startRead := make(chan struct{})
stopRead := make(chan struct{})
src.EXPECT().ID().Return("source-connector").AnyTimes()
src.EXPECT().Open(gomock.Any()).Return(nil).Times(1)
src.EXPECT().Errors().Return(make(chan error)).Times(1)
src.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) (record.Record, error) {
close(startRead)
<-stopRead
return record.Record{}, plugin.ErrStreamNotOpen
}).Times(1)
src.EXPECT().Stop(gomock.Any()).Return(nil, wantErr).Times(2)
src.EXPECT().Teardown(gomock.Any()).Return(nil).Times(1)

node := &SourceNode{
Name: "source-node",
Source: src,
PipelineTimer: noop.Timer{},
}
out := node.Pub()

nodeDone := make(chan struct{})
go func() {
defer close(nodeDone)
err := node.Run(ctx)
is.True(cerrors.Is(err, plugin.ErrStreamNotOpen))
}()

_, ok, err := cchan.ChanOut[struct{}](startRead).RecvTimeout(ctx, time.Second)
is.NoErr(err) // expected read to start running
is.True(!ok) // expected read to start running

// we stop the node now, the mock will simulate a failure
err = node.Stop(ctx, nil)
is.True(cerrors.Is(err, wantErr))

// we should be able to try stopping the node again
err = node.Stop(ctx, nil)
is.True(cerrors.Is(err, wantErr))

// simulate that read stops running
close(stopRead)

_, ok, err = cchan.ChanOut[*Message](out).RecvTimeout(ctx, time.Second)
is.NoErr(err) // expected node to close outgoing channel
is.True(!ok) // expected node to close outgoing channel

_, ok, err = cchan.ChanOut[struct{}](nodeDone).RecvTimeout(ctx, time.Second)
is.NoErr(err) // expected node to stop running
is.True(!ok) // expected nodeDone to be closed
}

func TestSourceNode_StopWhileNextNodeIsStuck(t *testing.T) {
// A pipeline can't be stopped gracefully if the next node after a source
// node blocks forever (or for a long time), because we can't inject the
Expand Down
4 changes: 3 additions & 1 deletion pkg/processor/procbuiltin/unwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ func (d *debeziumUnwrapper) unwrapMetadata(rec record.Record) (record.Metadata,
readAt := time.UnixMilli(int64(tsMs))
rec.Metadata.SetReadAt(readAt)
case debeziumFieldSource:
// don't add prefix for source fields
// don't add prefix for source fields to be consistent with the
// behavior of the debezium converter in the SDK - it puts all
// metadata fields into the `source` field
source = d.flatten("", val)
default:
flattened := d.flatten("debezium."+field, val)
Expand Down

0 comments on commit 9b973f0

Please sign in to comment.