Skip to content

Commit

Permalink
fix race condition with context close and confirm at the same time on…
Browse files Browse the repository at this point in the history
… DeferredConfirmation. (#101)

* add failling concurrency test

* add mutex to DeferredConfirmation to prevent race condition
  • Loading branch information
sapk authored Jul 15, 2022
1 parent 42c5149 commit 66ddd64
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
21 changes: 15 additions & 6 deletions confirms.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ func (d *deferredConfirmations) Confirm(confirmation Confirmation) {
// we should never receive a confirmation for a tag that hasn't been published, but a test causes this to happen
return
}
dc.confirmation = confirmation
dc.cancel()
dc.Confirm(confirmation.Ack)
delete(d.confirmations, confirmation.DeliveryTag)
}

Expand All @@ -155,8 +154,7 @@ func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) {

for k, v := range d.confirmations {
if k <= confirmation.DeliveryTag {
v.confirmation = Confirmation{DeliveryTag: k, Ack: confirmation.Ack}
v.cancel()
v.Confirm(confirmation.Ack)
delete(d.confirmations, k)
}
}
Expand All @@ -168,14 +166,25 @@ func (d *deferredConfirmations) Close() {
defer d.m.Unlock()

for k, v := range d.confirmations {
v.confirmation = Confirmation{DeliveryTag: k, Ack: false}
v.cancel()
v.Confirm(false)
delete(d.confirmations, k)
}
}

// Confirm ack confirmation.
func (d *DeferredConfirmation) Confirm(ack bool) {
d.m.Lock()
defer d.m.Unlock()

d.confirmation.Ack = ack
d.cancel()
}

// Waits for publisher confirmation. Returns true if server successfully received the publishing.
func (d *DeferredConfirmation) Wait() bool {
<-d.ctx.Done()

d.m.Lock()
defer d.m.Unlock()
return d.confirmation.Ack
}
29 changes: 29 additions & 0 deletions confirms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,32 @@ func TestDeferredConfirmationsContextTimeout(t *testing.T) {
t.Fatal("expected to receive false for timeout confirmations, received true")
}
}

func TestDeferredConfirmationsContextConcurrency(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

dcs := newDeferredConfirmations()
var wg sync.WaitGroup
var result bool
dc1 := dcs.Add(ctx, 1)
dc2 := dcs.Add(ctx, 2)
dc3 := dcs.Add(ctx, 3)
wg.Add(1)

// wait confirmation
go func() {
result = !dc1.Wait() && !dc2.Wait() && !dc3.Wait()
t.Logf("result: %v", result)
wg.Done()
}()

// confirm
go func() {
dcs.ConfirmMultiple(Confirmation{4, true})
}()

// and cancel in //
go cancel()

wg.Wait()
}
2 changes: 2 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"io"
"sync"
"time"
)

Expand Down Expand Up @@ -184,6 +185,7 @@ type Blocking struct {
// allows users to directly correlate a publishing to a confirmation. These are
// returned from PublishWithDeferredConfirm on Channels.
type DeferredConfirmation struct {
m sync.Mutex
ctx context.Context
cancel context.CancelFunc
DeliveryTag uint64
Expand Down

0 comments on commit 66ddd64

Please sign in to comment.