Skip to content

Commit

Permalink
adds a mutex to ctx cancel (#1909)
Browse files Browse the repository at this point in the history
Co-authored-by: Haris Osmanagić <[email protected]>
  • Loading branch information
raulb and hariso authored Oct 11, 2024
1 parent 3df0511 commit dba04d5
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pkg/lifecycle/stream/destination_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ type DestinationAckerNode struct {

// queue is used to store messages
queue deque.Deque[*Message]

// m guards access to queue
m sync.Mutex

// mctx guards access to the contextCtxCancel function
mctx sync.Mutex

base subNodeBase
logger log.CtxLogger

Expand All @@ -49,7 +53,9 @@ func (n *DestinationAckerNode) Run(ctx context.Context) (err error) {
// start a fresh connector context to make sure the connector is running
// until this method returns
var connectorCtx context.Context
n.mctx.Lock()
connectorCtx, n.connectorCtxCancel = context.WithCancel(context.Background())
n.mctx.Unlock()
defer n.connectorCtxCancel()

// signalChan is buffered to ensure signals don't get lost if worker is busy
Expand Down Expand Up @@ -226,5 +232,7 @@ func (n *DestinationAckerNode) SetLogger(logger log.CtxLogger) {

func (n *DestinationAckerNode) ForceStop(ctx context.Context) {
n.logger.Warn(ctx).Msg("force stopping destination acker node")
n.mctx.Lock()
n.connectorCtxCancel()
n.mctx.Unlock()
}
8 changes: 8 additions & 0 deletions pkg/lifecycle/stream/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type SourceNode struct {
state csync.ValueWatcher[nodeState]
connectorCtxCancel context.CancelFunc

// mctx guards access to the connector context
mctx sync.Mutex

stop struct {
sync.Mutex
position opencdc.Position
Expand Down Expand Up @@ -102,7 +105,10 @@ func (n *SourceNode) Run(ctx context.Context) (err error) {
// start a fresh connector context to make sure the connector is running
// until this method returns
var connectorCtx context.Context

n.mctx.Lock()
connectorCtx, n.connectorCtxCancel = context.WithCancel(context.Background())
n.mctx.Unlock()
defer n.connectorCtxCancel()

// openMsgTracker tracks open messages until they are acked or nacked
Expand Down Expand Up @@ -241,7 +247,9 @@ func (n *SourceNode) stopGraceful(ctx context.Context, reason error) (err error)

func (n *SourceNode) ForceStop(ctx context.Context) {
n.logger.Warn(ctx).Msg("force stopping source connector")
n.mctx.Lock()
n.connectorCtxCancel()
n.mctx.Unlock()
}

func (n *SourceNode) Pub() <-chan *Message {
Expand Down

0 comments on commit dba04d5

Please sign in to comment.