Skip to content

Commit

Permalink
safe wrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
nikandfor committed Jan 10, 2024
1 parent 33b4495 commit ca32a02
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 17 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ This is all without timeouts, additional goroutines, allocations, and channels.

```
// General pattern is
// Queue.In -> Enter -> [if idx < 0 {return}] -> defer Exit -> Commit/Cancel/return/panic
// Queue.In -> Enter -> defer Exit -> Commit/Cancel/return/panic
var sum int
Expand Down Expand Up @@ -66,6 +66,8 @@ for j := 0; j < N; j++ {
}
```

See the all available options in the doc.

Batch is error and panic proof which means the user code can return error or panic in any place,
but as soon as all the workers left the batch its state is reset.
But not the external state, it's callers responsibility to keep it consistent.
64 changes: 64 additions & 0 deletions batch4.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,66 @@ import (
"sync/atomic"
)

/*
Full guide.
bc.Queue().In() // get in the queue
// Get data ready for the commit.
// All the blocking operations should be done here.
data := 1
if leave() {
bc.Queue().Out() // get out of the queue
bc.Notify() // notify waiting goroutines we won't come
return
}
idx := bc.Enter(true) // true to block and wait, false for non-blocking return if batch is in the process of commiting
// just like with the Mutex there are no guaranties how will enter the first
if idx < 0 { // in non-blocking mode we didn't enter the batch, it's always >= 0 in blocking mode
return // no need to do anything here
}
defer bc.Exit() // if we entered we must exit
_ = 0 // calling it with defer ensures state consistency in case of panic
// We are inside locked Mutex between Enter and Exit.
// So the shared state can be modified safely.
// That also means that all the long and heavy operations
// should be done before Enter.
if idx == 0 { // we are first in the batch, reset the state
sum = 0
}
if full() { // if we didn't change the state we can just leave.
bc.Trigger() // Optionally we can trigger the batch.
// Common use case is to flush the data if we won't fit.
return // Then we may return and retry in the next batch.
}
sum += data // adding our work to the batch
if spoilt() {
_, err = bc.Cancel(ctx, causeErr) // cancel the batch, commit isn't done, all get causeErr
// if causeErr is nil it's set to Canceled
}
if failed() {
panic("we can safely panic here") // panic will be propogated to the caller,
// other goroutines in the batch will get PanicError
}
if urgent() {
bc.Trigger() // do not wait for the others
}
res, err := bc.Commit(ctx) // get ready to commit.
// The last goroutine entered the batch calls the actual commit.
// All the others wait and get the same res and error.
*/

type (
Queue int32

Expand Down Expand Up @@ -56,6 +116,10 @@ func (c *Coordinator[Res]) Queue() *Queue {
return &c.locs.queue
}

func (c *Coordinator[Res]) Notify() {
c.cond.Broadcast()
}

func (c *Coordinator[Res]) Enter(blocking bool) int {
c.mu.Lock()

Expand Down
17 changes: 14 additions & 3 deletions batch4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ func TestCoordinatorAllCases(tb *testing.T) {
go func() {
defer wg.Done()

for i := 0; i < 7; i++ {
for i := 0; i <= 8; i++ {
i := i

func() {
if j == 1 && i == 3 {
if j == 1 && (i == 3 || i == 7) {
defer func() {
_ = recover()
}()
Expand All @@ -113,7 +113,13 @@ func TestCoordinatorAllCases(tb *testing.T) {

runtime.Gosched()

idx := bc.Enter(j == 0)
if j == 1 && i == 6 {
bc.Queue().Out()
bc.Notify()
return
}

idx := bc.Enter(j != 0)
if idx < 0 {
tb.Logf("worker %2d iter %2d didn't enter %2d", j, i, idx)
return
Expand Down Expand Up @@ -166,6 +172,11 @@ func TestCoordinatorAllCases(tb *testing.T) {
} else {
tb.Logf("worker %2d iter %2d res %2d %v", j, i, res, err)
}

if j == 1 && i == 7 {
tb.Logf("worker %2d iter %2d PANICS after commit", j, i)
panic("panIC")
}
}()
}
}()
Expand Down
13 changes: 4 additions & 9 deletions examples_test.go → coordinator_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ func (s *Service) DoWork(ctx context.Context, data int) (int, error) {

idx := s.bc.Enter(true) // true for blocking, false if we want to leave instead of waiting
if idx < 0 { // we haven't entered the batch in non blocking mode
return 0, errors.New("not in this time") // we have to leave in that case
return 0, errors.New("not this time") // we have to leave in that case
}

defer s.bc.Exit() // it's like Mutex.Unlock. It's a pair to successful Enter. Must be called with defer to outlive panics
defer s.bc.Exit() // it's like Mutex.Unlock. It's a pair to successful Enter.
_ = 0 // Must be called with defer to outlive panics

if idx == 0 { // we are first in the batch, reset the state
s.sum = 0
Expand All @@ -56,15 +57,9 @@ func (s *Service) DoWork(ctx context.Context, data int) (int, error) {

log.Printf("worker %2d got in with index %2d", ctx.Value(contextKey{}), idx)

// if isFull() { s.bc.Trigger() } // trigger commit. we can leave or we can stay in the batch

// if notThisTime() { return } // safely leave the batch if we changed our mind. Keep the state (s.sum) unchanged.

s.sum += data // add our work to the batch

// if spoiltState() { return s.bc.Cancel(ctx, err) } // cancel the whole batch if we spoilt it

// only one of leave(return)/Cancel/Commit must be called and only once
// only one of return/Cancel/Commit must be called and only once
res, err := s.bc.Commit(ctx)
if err != nil { // batch failed, each worker in it will get the same error
return 0, err
Expand Down
4 changes: 4 additions & 0 deletions multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (c *Multi[Res]) Queue() *Queue {
return &c.locs.queue
}

func (c *Multi[Res]) Notify() {
c.cond.Broadcast()
}

func (c *Multi[Res]) Enter(blocking bool) (coach, idx int) {
c.mu.Lock()

Expand Down
19 changes: 15 additions & 4 deletions multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ func TestMulti(tb *testing.T) {
go func() {
defer wg.Done()

for i := 0; i < 7; i++ {
for i := 0; i <= 8; i++ {
i := i

func() {
if j == 1 && i == 3 {
if j == 1 && (i == 3 || i == 7) {
defer func() {
_ = recover()
}()
Expand All @@ -51,7 +51,13 @@ func TestMulti(tb *testing.T) {

runtime.Gosched()

coach, idx := bc.Enter(j == 0)
if j == 1 && i == 6 {
bc.Queue().Out()
bc.Notify()
return
}

coach, idx := bc.Enter(j != 0)
if idx < 0 {
tb.Logf("worker %2d iter %2d didn't enter %2d/%2d", j, i, coach, idx)
return
Expand Down Expand Up @@ -104,6 +110,11 @@ func TestMulti(tb *testing.T) {
} else {
tb.Logf("coach %2d worker %2d iter %2d res %2d %v", coach, j, i, res, err)
}

if j == 1 && i == 7 {
tb.Logf("coach %2d worker %2d iter %2d PANICS after commit", coach, j, i)
panic("panIC")
}
}()
}
}()
Expand Down Expand Up @@ -165,7 +176,7 @@ func BenchmarkMulti(tb *testing.B) {
tb.Run("Balancer_8", func(tb *testing.B) {
tb.ReportAllocs()
bc.Balancer = func(x []uint64) int {
return bits.Len64(x[0]) - 1 // choose the highest number
return bits.Len64(x[0]) - 1 - 1 // highest-1 or -1
}

tb.RunParallel(run)
Expand Down
103 changes: 103 additions & 0 deletions safe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package batch

import "context"

type (
Batch[Res any] struct {
c *Coordinator[Res]

noCopy noCopy

Check failure on line 9 in safe.go

View workflow job for this annotation

GitHub Actions / lint

field `noCopy` is unused (unused)
state byte
}

noCopy struct{}

Check failure on line 13 in safe.go

View workflow job for this annotation

GitHub Actions / lint

type `noCopy` is unused (unused)
)

const (
stateNew = iota
stateQueued
stateEntered
stateCommitted
stateExited = stateNew

usage = "BatchIn -> defer Exit -> [QueueIn] -> Enter -> Cancel/Commit/return"
)

func By[Res any](c *Coordinator[Res]) Batch[Res] {
return Batch[Res]{
c: c,
}
}

func (b *Batch[Res]) QueueIn() int {
if b.state != stateNew {
panic(usage)
}

b.state = stateQueued

return b.c.queue.In()
}

func (b *Batch[Res]) Enter(blocking bool) int {
switch b.state {
case stateNew:
b.QueueIn()
case stateQueued:
default:
panic(usage)
}

idx := b.c.Enter(blocking)
if idx >= 0 {
b.state = stateEntered
} else {
b.state = stateNew
}

return idx
}

func (b *Batch[Res]) Trigger() {
b.c.Trigger()
}

func (b *Batch[Res]) Cancel(ctx context.Context, err error) (Res, error) {
if b.state != stateEntered {
panic(usage)
}

b.state = stateCommitted

return b.c.Cancel(ctx, err)
}

func (b *Batch[Res]) Commit(ctx context.Context) (Res, error) {
if b.state != stateEntered {
panic(usage)
}

b.state = stateCommitted

return b.c.Commit(ctx)
}

func (b *Batch[Res]) Exit() int {
idx := -1

switch b.state {
case stateNew:
case stateQueued:
b.c.queue.Out()
b.c.Notify()
case stateEntered,
stateCommitted:
idx = b.c.Exit()
default:
panic(usage)
}

b.state = stateExited

return idx
}
Loading

0 comments on commit ca32a02

Please sign in to comment.