Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: filtered records holding up pipeline with destination batching #1987

Merged
merged 12 commits into from
Nov 26, 2024
2 changes: 1 addition & 1 deletion pkg/conduit/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestRuntime(t *testing.T) {
go func() {
errC <- r.Run(ctx)
}()
err, got, recvErr := cchan.ChanOut[error](errC).RecvTimeout(context.Background(), 100*time.Second)
err, got, recvErr := cchan.ChanOut[error](errC).RecvTimeout(context.Background(), time.Second)
is.NoErr(recvErr)
is.True(got)
if !cerrors.Is(err, context.Canceled) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/lifecycle/stream/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ func (n *DestinationNode) Run(ctx context.Context) (err error) {
if err != nil || msg == nil {
return err
}
if msg.filtered {
n.logger.Debug(ctx).Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return msg.Nack(err, n.ID())
}
continue
}

n.logger.Trace(msg.Ctx).Msg("writing record to destination connector")

Expand Down
29 changes: 20 additions & 9 deletions pkg/lifecycle/stream/destination_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ type DestinationAckerNode struct {
// queue is used to store messages
queue deque.Deque[*Message]

// m guards access to queue
m sync.Mutex
queueMutex sync.Mutex

// mctx guards access to the contextCtxCancel function
mctx sync.Mutex
Expand Down Expand Up @@ -96,9 +95,9 @@ func (n *DestinationAckerNode) Run(ctx context.Context) (err error) {
return err
}

n.m.Lock()
n.queueMutex.Lock()
n.queue.PushBack(msg)
n.m.Unlock()
n.queueMutex.Unlock()
select {
case signalChan <- struct{}{}:
// triggered the start of listening to acks in worker goroutine
Expand All @@ -116,9 +115,9 @@ func (n *DestinationAckerNode) worker(
) {
handleError := func(msg *Message, err error) {
// push message back to the front of the queue and return error
n.m.Lock()
n.queueMutex.Lock()
n.queue.PushFront(msg)
n.m.Unlock()
n.queueMutex.Unlock()

errChan <- err
}
Expand All @@ -131,13 +130,25 @@ func (n *DestinationAckerNode) worker(
// let's start fetching acks for messages in the queue
for {
// check if there are more messages waiting in the queue
n.m.Lock()
n.queueMutex.Lock()
if n.queue.Len() == 0 {
n.m.Unlock()
n.queueMutex.Unlock()
break
}
msg := n.queue.PopFront()
n.m.Unlock()
n.queueMutex.Unlock()

if msg.filtered {
n.logger.Trace(ctx).
Str(log.MessageIDField, msg.ID()).
Msg("acking filtered message")
err := n.handleAck(msg, nil)
if err != nil {
errChan <- err
return
}
continue
}

if len(acks) == 0 {
// Ack can return multiple acks, store them and check the position
Expand Down
54 changes: 54 additions & 0 deletions pkg/lifecycle/stream/destination_acker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,60 @@ func TestDestinationAckerNode_Cache(t *testing.T) {
ackHandlerWg.Wait() // all ack handler should be called by now
}

func TestDestinationAckerNode_AckFilteredRecords(t *testing.T) {
is := is.New(t)
ctx := context.Background()
ctrl := gomock.NewController(t)
dest := mock.NewDestination(ctrl)

node := &DestinationAckerNode{
Name: "destination-acker-node",
Destination: dest,
}

in := make(chan *Message)
node.Sub(in)

nodeDone := make(chan struct{})
go func() {
defer close(nodeDone)
err := node.Run(ctx)
is.NoErr(err)
}()

// up to this point there should have been no calls to the destination
// only after a received message should the node try to fetch the ack
msg := &Message{
filtered: true,
Record: opencdc.Record{Position: opencdc.Position("test-position")},
}
ackHandlerDone := make(chan struct{})
msg.RegisterAckHandler(func(got *Message) error {
defer close(ackHandlerDone)
is.Equal(msg, got)
return nil
})
in <- msg // send message to incoming channel

select {
case <-time.After(time.Second):
is.Fail() // expected ack handler to be called
case <-ackHandlerDone:
// all good
}

// note that there should be no calls to the destination at all if the node
// didn't receive any messages
close(in)

select {
case <-time.After(time.Second):
is.Fail() // expected node to stop running
case <-nodeDone:
// all good
}
}

func TestDestinationAckerNode_ForwardAck(t *testing.T) {
is := is.New(t)
ctx := context.Background()
Expand Down
83 changes: 70 additions & 13 deletions pkg/lifecycle/stream/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package stream
import (
"context"
"io"
"sync"
"testing"
"time"

"github.com/conduitio/conduit-commons/cchan"
"github.com/conduitio/conduit-commons/csync"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/metrics/noop"
Expand All @@ -40,39 +42,39 @@ func TestDestinationNode_ForceStop(t *testing.T) {
}{{
name: "Destination.Open blocks",
mockDestination: func(onStuck chan struct{}) *mock.Destination {
src := mock.NewDestination(ctrl)
src.EXPECT().ID().Return("destination-connector").AnyTimes()
src.EXPECT().Errors().Return(make(chan error))
src.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
dest := mock.NewDestination(ctrl)
dest.EXPECT().ID().Return("destination-connector").AnyTimes()
dest.EXPECT().Errors().Return(make(chan error))
dest.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
close(onStuck)
<-ctx.Done() // block until context is done
return ctx.Err()
})
return src
return dest
},
wantMsg: false,
wantErr: context.Canceled,
}, {
name: "Destination.Write blocks",
mockDestination: func(onStuck chan struct{}) *mock.Destination {
var connectorCtx context.Context
src := mock.NewDestination(ctrl)
src.EXPECT().ID().Return("destination-connector").AnyTimes()
src.EXPECT().Errors().Return(make(chan error))
src.EXPECT().Teardown(gomock.Any()).Return(nil)
src.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
dest := mock.NewDestination(ctrl)
dest.EXPECT().ID().Return("destination-connector").AnyTimes()
dest.EXPECT().Errors().Return(make(chan error))
dest.EXPECT().Teardown(gomock.Any()).Return(nil)
dest.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
// the connector opens the stream in open and keeps it open
// until the context is open
connectorCtx = ctx
return nil
})
src.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r []opencdc.Record) error {
dest.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r []opencdc.Record) error {
close(onStuck)
<-connectorCtx.Done() // block until connector stream is closed
return io.EOF // io.EOF is returned when the stream is closed
})
src.EXPECT().Stop(gomock.Any(), gomock.Any()).Return(nil)
return src
dest.EXPECT().Stop(gomock.Any(), gomock.Any()).Return(nil)
return dest
},
wantMsg: true,
wantErr: io.EOF,
Expand Down Expand Up @@ -133,3 +135,58 @@ func TestDestinationNode_ForceStop(t *testing.T) {
})
}
}

func TestDestinationNode_HandleFilteredMessage(t *testing.T) {
is := is.New(t)
ctx := context.Background()
ctrl := gomock.NewController(t)
dest := mock.NewDestination(ctrl)
// A filtered message is passing through this node without being sent
// to the destination, hence no Destination.Write() call here.
dest.EXPECT().Errors().Return(make(chan error))
dest.EXPECT().Open(gomock.Any()).Return(nil)
dest.EXPECT().Stop(gomock.Any(), gomock.Any()).Return(nil)
dest.EXPECT().Teardown(gomock.Any()).Return(nil)

node := &DestinationNode{
Name: "destination-acker-node",
Destination: dest,
}

in := make(chan *Message)
node.Sub(in)
out := node.Pub()

var nodeStopped sync.WaitGroup
nodeStopped.Add(1)

go func() {
defer nodeStopped.Done()
err := node.Run(ctx)
is.NoErr(err)
}()

// up to this point there should have been no calls to the destination
// only after a received message should the node try to fetch the ack
msg := &Message{
filtered: true,
Record: opencdc.Record{Position: opencdc.Position("test-position")},
}
go func() {
// send message to incoming channel
err := cchan.ChanIn[*Message](in).SendTimeout(ctx, msg, 100*time.Millisecond)
is.NoErr(err) // expected message to be sent to the destination node's Sub channel

// note that there should be no calls to the destination at all if the node
// didn't receive any messages
close(in)
}()

gotMsg, gotMsgBool, err := cchan.ChanOut[*Message](out).RecvTimeout(ctx, 100*time.Millisecond)
is.NoErr(err) // expected node to close outgoing channel
is.True(gotMsgBool) // expected node to close outgoing channel
is.Equal(msg, gotMsg)

err = (*csync.WaitGroup)(&nodeStopped).WaitTimeout(ctx, 100*time.Millisecond)
is.NoErr(err) // timed out waiting for node to be done running
}
9 changes: 8 additions & 1 deletion pkg/lifecycle/stream/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,21 @@ type Message struct {
acked chan struct{}
nacked chan struct{}

// filtered indicates whether the message holds a filtered record.
// Such a message needs to be acknowledged.
// This is done by letting the message pass through all the nodes
// (without being processed by them), until the destination acker
// node acknowledges it.
filtered bool
hariso marked this conversation as resolved.
Show resolved Hide resolved

// handler is executed when Ack or Nack is called.
handler StatusChangeHandler

// hasNackHandler is true if at least one nack handler was registered.
hasNackHandler bool

// ackNackReturnValue is cached the first time Ack or Nack is executed.
ackNackReturnValue error

// initOnce is guarding the initialization logic of a message.
initOnce sync.Once
// ackNackOnce is guarding the acking/nacking logic of a message.
Expand Down
10 changes: 10 additions & 0 deletions pkg/lifecycle/stream/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func (n *MetricsNode) Run(ctx context.Context) error {
return err
}

if msg.filtered {
n.logger.Trace(ctx).Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return msg.Nack(err, n.ID())
}
continue
}

msg.RegisterAckHandler(func(msg *Message) error {
n.Histogram.Observe(msg.Record)
return nil
Expand Down
19 changes: 16 additions & 3 deletions pkg/lifecycle/stream/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
return err
}

if msg.filtered {
n.logger.Trace(ctx).Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return msg.Nack(err, n.ID())
}
continue
}

executeTime := time.Now()
recsIn := []opencdc.Record{msg.Record}
recsOut := n.Processor.Process(msg.Ctx, recsIn)
Expand All @@ -104,10 +114,13 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
return err
}
case sdk.FilterRecord:
// NB: Ack skipped messages since they've been correctly handled
err := msg.Ack()
msg.filtered = true
n.logger.Trace(ctx).
Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return cerrors.Errorf("failed to ack skipped message: %w", err)
return msg.Nack(err, n.ID())
}
case sdk.ErrorRecord:
err = msg.Nack(v.Error, n.ID())
Expand Down
Loading