diff --git a/pkg/pipeline/stream/source.go b/pkg/pipeline/stream/source.go index 09b5f0e31..285c50294 100644 --- a/pkg/pipeline/stream/source.go +++ b/pkg/pipeline/stream/source.go @@ -177,11 +177,11 @@ func (n *SourceNode) Stop(ctx context.Context, reason error) error { // only execute stop once, more calls won't make a difference err = n.stop(ctx, reason) stopExecuted = true - if err != nil { - // an error happened, allow stop to be executed again - n.stopOnce = sync.Once{} - } }) + if err != nil { + // an error happened, allow stop to be executed again + n.stopOnce = sync.Once{} + } if !stopExecuted { n.logger.Warn(ctx).Msg("source connector stop already triggered, " + "ignoring second stop request (if the pipeline is stuck, please " + diff --git a/pkg/pipeline/stream/source_test.go b/pkg/pipeline/stream/source_test.go index 03d0b3544..86037c832 100644 --- a/pkg/pipeline/stream/source_test.go +++ b/pkg/pipeline/stream/source_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/conduitio/conduit/pkg/foundation/cchan" + "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/metrics/noop" "github.com/conduitio/conduit/pkg/pipeline/stream/mock" "github.com/conduitio/conduit/pkg/plugin" @@ -74,6 +75,65 @@ func TestSourceNode_Run(t *testing.T) { is.True(!ok) // expected nodeDone to be closed } +func TestSourceNode_Stop_Fail(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + + src := mock.NewSource(ctrl) + + wantErr := cerrors.New("test error") + startRead := make(chan struct{}) + stopRead := make(chan struct{}) + src.EXPECT().ID().Return("source-connector").AnyTimes() + src.EXPECT().Open(gomock.Any()).Return(nil).Times(1) + src.EXPECT().Errors().Return(make(chan error)).Times(1) + src.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) (record.Record, error) { + close(startRead) + <-stopRead + return record.Record{}, plugin.ErrStreamNotOpen + }).Times(1) + src.EXPECT().Stop(gomock.Any()).Return(nil, wantErr).Times(2) + src.EXPECT().Teardown(gomock.Any()).Return(nil).Times(1) + + node := &SourceNode{ + Name: "source-node", + Source: src, + PipelineTimer: noop.Timer{}, + } + out := node.Pub() + + nodeDone := make(chan struct{}) + go func() { + defer close(nodeDone) + err := node.Run(ctx) + is.True(cerrors.Is(err, plugin.ErrStreamNotOpen)) + }() + + _, ok, err := cchan.ChanOut[struct{}](startRead).RecvTimeout(ctx, time.Second) + is.NoErr(err) // expected read to start running + is.True(!ok) // expected read to start running + + // we stop the node now, the mock will simulate a failure + err = node.Stop(ctx, nil) + is.True(cerrors.Is(err, wantErr)) + + // we should be able to try stopping the node again + err = node.Stop(ctx, nil) + is.True(cerrors.Is(err, wantErr)) + + // simulate that read stops running + close(stopRead) + + _, ok, err = cchan.ChanOut[*Message](out).RecvTimeout(ctx, time.Second) + is.NoErr(err) // expected node to close outgoing channel + is.True(!ok) // expected node to close outgoing channel + + _, ok, err = cchan.ChanOut[struct{}](nodeDone).RecvTimeout(ctx, time.Second) + is.NoErr(err) // expected node to stop running + is.True(!ok) // expected nodeDone to be closed +} + func TestSourceNode_StopWhileNextNodeIsStuck(t *testing.T) { // A pipeline can't be stopped gracefully if the next node after a source // node blocks forever (or for a long time), because we can't inject the diff --git a/pkg/processor/procbuiltin/unwrap.go b/pkg/processor/procbuiltin/unwrap.go index 732c21450..275e34b9b 100644 --- a/pkg/processor/procbuiltin/unwrap.go +++ b/pkg/processor/procbuiltin/unwrap.go @@ -306,7 +306,9 @@ func (d *debeziumUnwrapper) unwrapMetadata(rec record.Record) (record.Metadata, readAt := time.UnixMilli(int64(tsMs)) rec.Metadata.SetReadAt(readAt) case debeziumFieldSource: - // don't add prefix for source fields + // don't add prefix for source fields to be consistent with the + // behavior of the debezium converter in the SDK - it puts all + // metadata fields into the `source` field source = d.flatten("", val) default: flattened := d.flatten("debezium."+field, val)