diff --git a/pkg/lifecycle/stream/destination_acker.go b/pkg/lifecycle/stream/destination_acker.go index 3ea6891a0..ac3bd76cd 100644 --- a/pkg/lifecycle/stream/destination_acker.go +++ b/pkg/lifecycle/stream/destination_acker.go @@ -32,9 +32,13 @@ type DestinationAckerNode struct { // queue is used to store messages queue deque.Deque[*Message] + // m guards access to queue m sync.Mutex + // mctx guards access to the contextCtxCancel function + mctx sync.Mutex + base subNodeBase logger log.CtxLogger @@ -49,7 +53,9 @@ func (n *DestinationAckerNode) Run(ctx context.Context) (err error) { // start a fresh connector context to make sure the connector is running // until this method returns var connectorCtx context.Context + n.mctx.Lock() connectorCtx, n.connectorCtxCancel = context.WithCancel(context.Background()) + n.mctx.Unlock() defer n.connectorCtxCancel() // signalChan is buffered to ensure signals don't get lost if worker is busy @@ -226,5 +232,7 @@ func (n *DestinationAckerNode) SetLogger(logger log.CtxLogger) { func (n *DestinationAckerNode) ForceStop(ctx context.Context) { n.logger.Warn(ctx).Msg("force stopping destination acker node") + n.mctx.Lock() n.connectorCtxCancel() + n.mctx.Unlock() } diff --git a/pkg/lifecycle/stream/source.go b/pkg/lifecycle/stream/source.go index 8e8675afd..d6ab2eab3 100644 --- a/pkg/lifecycle/stream/source.go +++ b/pkg/lifecycle/stream/source.go @@ -47,6 +47,9 @@ type SourceNode struct { state csync.ValueWatcher[nodeState] connectorCtxCancel context.CancelFunc + // mctx guards access to the connector context + mctx sync.Mutex + stop struct { sync.Mutex position opencdc.Position @@ -102,7 +105,10 @@ func (n *SourceNode) Run(ctx context.Context) (err error) { // start a fresh connector context to make sure the connector is running // until this method returns var connectorCtx context.Context + + n.mctx.Lock() connectorCtx, n.connectorCtxCancel = context.WithCancel(context.Background()) + n.mctx.Unlock() defer n.connectorCtxCancel() // openMsgTracker tracks open messages until they are acked or nacked @@ -241,7 +247,9 @@ func (n *SourceNode) stopGraceful(ctx context.Context, reason error) (err error) func (n *SourceNode) ForceStop(ctx context.Context) { n.logger.Warn(ctx).Msg("force stopping source connector") + n.mctx.Lock() n.connectorCtxCancel() + n.mctx.Unlock() } func (n *SourceNode) Pub() <-chan *Message {