diff --git a/pkg/conduit/runtime_test.go b/pkg/conduit/runtime_test.go index 28226c369..03735d88f 100644 --- a/pkg/conduit/runtime_test.go +++ b/pkg/conduit/runtime_test.go @@ -71,7 +71,7 @@ func TestRuntime(t *testing.T) { go func() { errC <- r.Run(ctx) }() - err, got, recvErr := cchan.ChanOut[error](errC).RecvTimeout(context.Background(), 100*time.Second) + err, got, recvErr := cchan.ChanOut[error](errC).RecvTimeout(context.Background(), time.Second) is.NoErr(recvErr) is.True(got) if !cerrors.Is(err, context.Canceled) { diff --git a/pkg/lifecycle/stream/destination.go b/pkg/lifecycle/stream/destination.go index 4eada1df3..6bb8ec5b3 100644 --- a/pkg/lifecycle/stream/destination.go +++ b/pkg/lifecycle/stream/destination.go @@ -101,6 +101,15 @@ func (n *DestinationNode) Run(ctx context.Context) (err error) { if err != nil || msg == nil { return err } + if msg.filtered { + n.logger.Debug(ctx).Str(log.MessageIDField, msg.ID()). + Msg("message marked as filtered, sending directly to next node") + err = n.base.Send(ctx, n.logger, msg) + if err != nil { + return msg.Nack(err, n.ID()) + } + continue + } n.logger.Trace(msg.Ctx).Msg("writing record to destination connector") diff --git a/pkg/lifecycle/stream/destination_acker.go b/pkg/lifecycle/stream/destination_acker.go index 8ea02faa4..662bedea8 100644 --- a/pkg/lifecycle/stream/destination_acker.go +++ b/pkg/lifecycle/stream/destination_acker.go @@ -34,8 +34,7 @@ type DestinationAckerNode struct { // queue is used to store messages queue deque.Deque[*Message] - // m guards access to queue - m sync.Mutex + queueMutex sync.Mutex // mctx guards access to the contextCtxCancel function mctx sync.Mutex @@ -96,9 +95,9 @@ func (n *DestinationAckerNode) Run(ctx context.Context) (err error) { return err } - n.m.Lock() + n.queueMutex.Lock() n.queue.PushBack(msg) - n.m.Unlock() + n.queueMutex.Unlock() select { case signalChan <- struct{}{}: // triggered the start of listening to acks in worker goroutine @@ -116,9 +115,9 @@ func (n *DestinationAckerNode) worker( ) { handleError := func(msg *Message, err error) { // push message back to the front of the queue and return error - n.m.Lock() + n.queueMutex.Lock() n.queue.PushFront(msg) - n.m.Unlock() + n.queueMutex.Unlock() errChan <- err } @@ -131,13 +130,25 @@ func (n *DestinationAckerNode) worker( // let's start fetching acks for messages in the queue for { // check if there are more messages waiting in the queue - n.m.Lock() + n.queueMutex.Lock() if n.queue.Len() == 0 { - n.m.Unlock() + n.queueMutex.Unlock() break } msg := n.queue.PopFront() - n.m.Unlock() + n.queueMutex.Unlock() + + if msg.filtered { + n.logger.Trace(ctx). + Str(log.MessageIDField, msg.ID()). + Msg("acking filtered message") + err := n.handleAck(msg, nil) + if err != nil { + errChan <- err + return + } + continue + } if len(acks) == 0 { // Ack can return multiple acks, store them and check the position diff --git a/pkg/lifecycle/stream/destination_acker_test.go b/pkg/lifecycle/stream/destination_acker_test.go index 6b7c7188f..08f4b09ca 100644 --- a/pkg/lifecycle/stream/destination_acker_test.go +++ b/pkg/lifecycle/stream/destination_acker_test.go @@ -96,6 +96,60 @@ func TestDestinationAckerNode_Cache(t *testing.T) { ackHandlerWg.Wait() // all ack handler should be called by now } +func TestDestinationAckerNode_AckFilteredRecords(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + dest := mock.NewDestination(ctrl) + + node := &DestinationAckerNode{ + Name: "destination-acker-node", + Destination: dest, + } + + in := make(chan *Message) + node.Sub(in) + + nodeDone := make(chan struct{}) + go func() { + defer close(nodeDone) + err := node.Run(ctx) + is.NoErr(err) + }() + + // up to this point there should have been no calls to the destination + // only after a received message should the node try to fetch the ack + msg := &Message{ + filtered: true, + Record: opencdc.Record{Position: opencdc.Position("test-position")}, + } + ackHandlerDone := make(chan struct{}) + msg.RegisterAckHandler(func(got *Message) error { + defer close(ackHandlerDone) + is.Equal(msg, got) + return nil + }) + in <- msg // send message to incoming channel + + select { + case <-time.After(time.Second): + is.Fail() // expected ack handler to be called + case <-ackHandlerDone: + // all good + } + + // note that there should be no calls to the destination at all if the node + // didn't receive any messages + close(in) + + select { + case <-time.After(time.Second): + is.Fail() // expected node to stop running + case <-nodeDone: + // all good + } +} + func TestDestinationAckerNode_ForwardAck(t *testing.T) { is := is.New(t) ctx := context.Background() diff --git a/pkg/lifecycle/stream/destination_test.go b/pkg/lifecycle/stream/destination_test.go index e4b17126a..5fd744383 100644 --- a/pkg/lifecycle/stream/destination_test.go +++ b/pkg/lifecycle/stream/destination_test.go @@ -17,10 +17,12 @@ package stream import ( "context" "io" + "sync" "testing" "time" "github.com/conduitio/conduit-commons/cchan" + "github.com/conduitio/conduit-commons/csync" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/metrics/noop" @@ -40,15 +42,15 @@ func TestDestinationNode_ForceStop(t *testing.T) { }{{ name: "Destination.Open blocks", mockDestination: func(onStuck chan struct{}) *mock.Destination { - src := mock.NewDestination(ctrl) - src.EXPECT().ID().Return("destination-connector").AnyTimes() - src.EXPECT().Errors().Return(make(chan error)) - src.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error { + dest := mock.NewDestination(ctrl) + dest.EXPECT().ID().Return("destination-connector").AnyTimes() + dest.EXPECT().Errors().Return(make(chan error)) + dest.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error { close(onStuck) <-ctx.Done() // block until context is done return ctx.Err() }) - return src + return dest }, wantMsg: false, wantErr: context.Canceled, @@ -56,23 +58,23 @@ func TestDestinationNode_ForceStop(t *testing.T) { name: "Destination.Write blocks", mockDestination: func(onStuck chan struct{}) *mock.Destination { var connectorCtx context.Context - src := mock.NewDestination(ctrl) - src.EXPECT().ID().Return("destination-connector").AnyTimes() - src.EXPECT().Errors().Return(make(chan error)) - src.EXPECT().Teardown(gomock.Any()).Return(nil) - src.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error { + dest := mock.NewDestination(ctrl) + dest.EXPECT().ID().Return("destination-connector").AnyTimes() + dest.EXPECT().Errors().Return(make(chan error)) + dest.EXPECT().Teardown(gomock.Any()).Return(nil) + dest.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error { // the connector opens the stream in open and keeps it open // until the context is open connectorCtx = ctx return nil }) - src.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r []opencdc.Record) error { + dest.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r []opencdc.Record) error { close(onStuck) <-connectorCtx.Done() // block until connector stream is closed return io.EOF // io.EOF is returned when the stream is closed }) - src.EXPECT().Stop(gomock.Any(), gomock.Any()).Return(nil) - return src + dest.EXPECT().Stop(gomock.Any(), gomock.Any()).Return(nil) + return dest }, wantMsg: true, wantErr: io.EOF, @@ -133,3 +135,58 @@ func TestDestinationNode_ForceStop(t *testing.T) { }) } } + +func TestDestinationNode_HandleFilteredMessage(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + dest := mock.NewDestination(ctrl) + // A filtered message is passing through this node without being sent + // to the destination, hence no Destination.Write() call here. + dest.EXPECT().Errors().Return(make(chan error)) + dest.EXPECT().Open(gomock.Any()).Return(nil) + dest.EXPECT().Stop(gomock.Any(), gomock.Any()).Return(nil) + dest.EXPECT().Teardown(gomock.Any()).Return(nil) + + node := &DestinationNode{ + Name: "destination-acker-node", + Destination: dest, + } + + in := make(chan *Message) + node.Sub(in) + out := node.Pub() + + var nodeStopped sync.WaitGroup + nodeStopped.Add(1) + + go func() { + defer nodeStopped.Done() + err := node.Run(ctx) + is.NoErr(err) + }() + + // up to this point there should have been no calls to the destination + // only after a received message should the node try to fetch the ack + msg := &Message{ + filtered: true, + Record: opencdc.Record{Position: opencdc.Position("test-position")}, + } + go func() { + // send message to incoming channel + err := cchan.ChanIn[*Message](in).SendTimeout(ctx, msg, 100*time.Millisecond) + is.NoErr(err) // expected message to be sent to the destination node's Sub channel + + // note that there should be no calls to the destination at all if the node + // didn't receive any messages + close(in) + }() + + gotMsg, gotMsgBool, err := cchan.ChanOut[*Message](out).RecvTimeout(ctx, 100*time.Millisecond) + is.NoErr(err) // expected node to close outgoing channel + is.True(gotMsgBool) // expected node to close outgoing channel + is.Equal(msg, gotMsg) + + err = (*csync.WaitGroup)(&nodeStopped).WaitTimeout(ctx, 100*time.Millisecond) + is.NoErr(err) // timed out waiting for node to be done running +} diff --git a/pkg/lifecycle/stream/message.go b/pkg/lifecycle/stream/message.go index 76b5454c5..465b40603 100644 --- a/pkg/lifecycle/stream/message.go +++ b/pkg/lifecycle/stream/message.go @@ -65,14 +65,21 @@ type Message struct { acked chan struct{} nacked chan struct{} + // filtered indicates whether the message holds a filtered record. + // Such a message needs to be acknowledged. + // This is done by letting the message pass through all the nodes + // (without being processed by them), until the destination acker + // node acknowledges it. + filtered bool + // handler is executed when Ack or Nack is called. handler StatusChangeHandler + // hasNackHandler is true if at least one nack handler was registered. hasNackHandler bool // ackNackReturnValue is cached the first time Ack or Nack is executed. ackNackReturnValue error - // initOnce is guarding the initialization logic of a message. initOnce sync.Once // ackNackOnce is guarding the acking/nacking logic of a message. diff --git a/pkg/lifecycle/stream/metrics.go b/pkg/lifecycle/stream/metrics.go index 6b4aee1cb..4165c766f 100644 --- a/pkg/lifecycle/stream/metrics.go +++ b/pkg/lifecycle/stream/metrics.go @@ -46,6 +46,16 @@ func (n *MetricsNode) Run(ctx context.Context) error { return err } + if msg.filtered { + n.logger.Trace(ctx).Str(log.MessageIDField, msg.ID()). + Msg("message marked as filtered, sending directly to next node") + err = n.base.Send(ctx, n.logger, msg) + if err != nil { + return msg.Nack(err, n.ID()) + } + continue + } + msg.RegisterAckHandler(func(msg *Message) error { n.Histogram.Observe(msg.Record) return nil diff --git a/pkg/lifecycle/stream/processor.go b/pkg/lifecycle/stream/processor.go index ec76e5cce..e71440546 100644 --- a/pkg/lifecycle/stream/processor.go +++ b/pkg/lifecycle/stream/processor.go @@ -79,6 +79,16 @@ func (n *ProcessorNode) Run(ctx context.Context) error { return err } + if msg.filtered { + n.logger.Trace(ctx).Str(log.MessageIDField, msg.ID()). + Msg("message marked as filtered, sending directly to next node") + err = n.base.Send(ctx, n.logger, msg) + if err != nil { + return msg.Nack(err, n.ID()) + } + continue + } + executeTime := time.Now() recsIn := []opencdc.Record{msg.Record} recsOut := n.Processor.Process(msg.Ctx, recsIn) @@ -104,10 +114,13 @@ func (n *ProcessorNode) Run(ctx context.Context) error { return err } case sdk.FilterRecord: - // NB: Ack skipped messages since they've been correctly handled - err := msg.Ack() + msg.filtered = true + n.logger.Trace(ctx). + Str(log.MessageIDField, msg.ID()). + Msg("message marked as filtered, sending directly to next node") + err = n.base.Send(ctx, n.logger, msg) if err != nil { - return cerrors.Errorf("failed to ack skipped message: %w", err) + return msg.Nack(err, n.ID()) } case sdk.ErrorRecord: err = msg.Nack(v.Error, n.ID()) diff --git a/pkg/lifecycle/stream/processor_test.go b/pkg/lifecycle/stream/processor_test.go index 66233e5e6..08930cf93 100644 --- a/pkg/lifecycle/stream/processor_test.go +++ b/pkg/lifecycle/stream/processor_test.go @@ -18,7 +18,10 @@ import ( "context" "sync" "testing" + "time" + "github.com/conduitio/conduit-commons/cchan" + "github.com/conduitio/conduit-commons/csync" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -251,7 +254,11 @@ func testNodeWithError(is *is.I, processor *mock.Processor, nackHandler NackHand is.Equal(false, ok) } -func TestProcessorNode_Skip(t *testing.T) { +// TestProcessorNode_HandleFilteredRecord tests the following case: +// If a processor returns a filtered record, then the message should +// be passed to the next node (published to the outgoing channel) +// and also marked with `filtered`. +func TestProcessorNode_HandleFilteredRecord(t *testing.T) { is := is.New(t) ctx := context.Background() ctrl := gomock.NewController(t) @@ -275,6 +282,9 @@ func TestProcessorNode_Skip(t *testing.T) { n.Sub(in) out := n.Pub() + var nodeStoppedWG sync.WaitGroup + nodeStoppedWG.Add(1) + // send a message on the pipeline that will be skipped msg := &Message{Ctx: ctx, Record: opencdc.Record{}} @@ -297,12 +307,98 @@ func TestProcessorNode_Skip(t *testing.T) { close(in) }() - // run the pipeline and assert that there are no underlying pipeline errors - err := n.Run(ctx) - is.Equal(err, nil) - is.Equal(counter, 1) + go func() { + defer nodeStoppedWG.Done() + err := n.Run(ctx) + is.NoErr(err) + }() + + gotMsg, msgReceived, err := cchan.ChanOut[*Message](out).RecvTimeout(ctx, 100*time.Millisecond) + is.True(msgReceived) + is.NoErr(err) + is.Equal(msg, gotMsg) + is.True(msg.filtered) + + err = (*csync.WaitGroup)(&nodeStoppedWG).WaitTimeout(ctx, 100*time.Millisecond) + is.NoErr(err) // timed out waiting for node to be done running // after the node stops the out channel should be closed - _, ok := <-out - is.Equal(false, ok) + msg, msgReceived, err = cchan.ChanOut[*Message](out).RecvTimeout(ctx, 100*time.Millisecond) + is.NoErr(err) + is.True(!msgReceived) + is.Equal(msg, nil) +} + +// TestProcessorNode_HandleFilteredRecord tests the following case: +// If a processor node receives a filtered message, it should be +// passed to the next node and the processor should NOT be called. +func TestProcessorNode_ReceivedFilteredMessage(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + + // create a dummy processor + proc := mock.NewProcessor(ctrl) + proc.EXPECT().Open(gomock.Any()) + proc.EXPECT().Teardown(gomock.Any()) + + n := ProcessorNode{ + Name: "test", + Processor: proc, + ProcessorTimer: noop.Timer{}, + } + + // setup the test pipeline + in := make(chan *Message) + n.Sub(in) + out := n.Pub() + + var nodeStopped sync.WaitGroup + nodeStopped.Add(1) + + // send a message on the pipeline that will be skipped + msg := &Message{ + Ctx: ctx, + Record: opencdc.Record{}, + filtered: true, + } + + // register a dummy AckHandler and NackHandler for tests. + msg.RegisterAckHandler(func(msg *Message) error { + is.Fail() // message should not be ack-ed + return nil + }) + msg.RegisterNackHandler(func(msg *Message, nm NackMetadata) error { + // Our NackHandler shouldn't ever be hit if we're correctly skipping + // so fail the test if we get here at all. + is.Fail() // message should not be nack-ed + return nil + }) + + go func() { + // publisher + in <- msg + close(in) + }() + + go func() { + defer nodeStopped.Done() + err := n.Run(ctx) + is.NoErr(err) + }() + + gotMsg, msgReceived, err := cchan.ChanOut[*Message](out).RecvTimeout(ctx, 100*time.Millisecond) + is.True(msgReceived) + is.NoErr(err) + is.Equal(msg, gotMsg) + is.True(msg.filtered) + + err = (*csync.WaitGroup)(&nodeStopped).WaitTimeout(ctx, 100*time.Millisecond) + is.NoErr(err) // timed out waiting for node to be done running + + // after the node stops the out channel should be closed + msg, msgReceived, err = cchan.ChanOut[*Message](out).RecvTimeout(ctx, 100*time.Millisecond) + is.NoErr(err) + is.True(!msgReceived) + is.Equal(msg, nil) }