diff --git a/README.md b/README.md index fec5c17..c6f7b22 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ This is all without timeouts, additional goroutines, allocations, and channels. ``` // General pattern is -// Queue.In -> Enter -> [if idx < 0 {return}] -> defer Exit -> Commit/Cancel/return/panic +// Queue.In -> Enter -> defer Exit -> Commit/Cancel/return/panic var sum int @@ -66,6 +66,8 @@ for j := 0; j < N; j++ { } ``` +See the all available options in the doc. + Batch is error and panic proof which means the user code can return error or panic in any place, but as soon as all the workers left the batch its state is reset. But not the external state, it's callers responsibility to keep it consistent. diff --git a/batch4.go b/batch4.go index a7ed65a..e9eea52 100644 --- a/batch4.go +++ b/batch4.go @@ -8,6 +8,66 @@ import ( "sync/atomic" ) +/* +Full guide. + + bc.Queue().In() // get in the queue + + // Get data ready for the commit. + // All the blocking operations should be done here. + data := 1 + + if leave() { + bc.Queue().Out() // get out of the queue + bc.Notify() // notify waiting goroutines we won't come + return + } + + idx := bc.Enter(true) // true to block and wait, false for non-blocking return if batch is in the process of commiting + // just like with the Mutex there are no guaranties how will enter the first + if idx < 0 { // in non-blocking mode we didn't enter the batch, it's always >= 0 in blocking mode + return // no need to do anything here + } + + defer bc.Exit() // if we entered we must exit + _ = 0 // calling it with defer ensures state consistency in case of panic + + // We are inside locked Mutex between Enter and Exit. + // So the shared state can be modified safely. + // That also means that all the long and heavy operations + // should be done before Enter. + + if idx == 0 { // we are first in the batch, reset the state + sum = 0 + } + + if full() { // if we didn't change the state we can just leave. + bc.Trigger() // Optionally we can trigger the batch. + // Common use case is to flush the data if we won't fit. + return // Then we may return and retry in the next batch. + } + + sum += data // adding our work to the batch + + if spoilt() { + _, err = bc.Cancel(ctx, causeErr) // cancel the batch, commit isn't done, all get causeErr + // if causeErr is nil it's set to Canceled + } + + if failed() { + panic("we can safely panic here") // panic will be propogated to the caller, + // other goroutines in the batch will get PanicError + } + + if urgent() { + bc.Trigger() // do not wait for the others + } + + res, err := bc.Commit(ctx) // get ready to commit. + // The last goroutine entered the batch calls the actual commit. + // All the others wait and get the same res and error. +*/ + type ( Queue int32 @@ -56,6 +116,10 @@ func (c *Coordinator[Res]) Queue() *Queue { return &c.locs.queue } +func (c *Coordinator[Res]) Notify() { + c.cond.Broadcast() +} + func (c *Coordinator[Res]) Enter(blocking bool) int { c.mu.Lock() diff --git a/batch4_test.go b/batch4_test.go index 3455665..71d6032 100644 --- a/batch4_test.go +++ b/batch4_test.go @@ -99,11 +99,11 @@ func TestCoordinatorAllCases(tb *testing.T) { go func() { defer wg.Done() - for i := 0; i < 7; i++ { + for i := 0; i <= 8; i++ { i := i func() { - if j == 1 && i == 3 { + if j == 1 && (i == 3 || i == 7) { defer func() { _ = recover() }() @@ -113,7 +113,13 @@ func TestCoordinatorAllCases(tb *testing.T) { runtime.Gosched() - idx := bc.Enter(j == 0) + if j == 1 && i == 6 { + bc.Queue().Out() + bc.Notify() + return + } + + idx := bc.Enter(j != 0) if idx < 0 { tb.Logf("worker %2d iter %2d didn't enter %2d", j, i, idx) return @@ -166,6 +172,11 @@ func TestCoordinatorAllCases(tb *testing.T) { } else { tb.Logf("worker %2d iter %2d res %2d %v", j, i, res, err) } + + if j == 1 && i == 7 { + tb.Logf("worker %2d iter %2d PANICS after commit", j, i) + panic("panIC") + } }() } }() diff --git a/examples_test.go b/coordinator_example_test.go similarity index 79% rename from examples_test.go rename to coordinator_example_test.go index 54e7e22..82d8681 100644 --- a/examples_test.go +++ b/coordinator_example_test.go @@ -44,10 +44,11 @@ func (s *Service) DoWork(ctx context.Context, data int) (int, error) { idx := s.bc.Enter(true) // true for blocking, false if we want to leave instead of waiting if idx < 0 { // we haven't entered the batch in non blocking mode - return 0, errors.New("not in this time") // we have to leave in that case + return 0, errors.New("not this time") // we have to leave in that case } - defer s.bc.Exit() // it's like Mutex.Unlock. It's a pair to successful Enter. Must be called with defer to outlive panics + defer s.bc.Exit() // it's like Mutex.Unlock. It's a pair to successful Enter. + _ = 0 // Must be called with defer to outlive panics if idx == 0 { // we are first in the batch, reset the state s.sum = 0 @@ -56,15 +57,9 @@ func (s *Service) DoWork(ctx context.Context, data int) (int, error) { log.Printf("worker %2d got in with index %2d", ctx.Value(contextKey{}), idx) - // if isFull() { s.bc.Trigger() } // trigger commit. we can leave or we can stay in the batch - - // if notThisTime() { return } // safely leave the batch if we changed our mind. Keep the state (s.sum) unchanged. - s.sum += data // add our work to the batch - // if spoiltState() { return s.bc.Cancel(ctx, err) } // cancel the whole batch if we spoilt it - - // only one of leave(return)/Cancel/Commit must be called and only once + // only one of return/Cancel/Commit must be called and only once res, err := s.bc.Commit(ctx) if err != nil { // batch failed, each worker in it will get the same error return 0, err diff --git a/multi.go b/multi.go index 5dc2d14..9bc560c 100644 --- a/multi.go +++ b/multi.go @@ -34,6 +34,10 @@ func (c *Multi[Res]) Queue() *Queue { return &c.locs.queue } +func (c *Multi[Res]) Notify() { + c.cond.Broadcast() +} + func (c *Multi[Res]) Enter(blocking bool) (coach, idx int) { c.mu.Lock() diff --git a/multi_test.go b/multi_test.go index dadcf41..50be2b2 100644 --- a/multi_test.go +++ b/multi_test.go @@ -37,11 +37,11 @@ func TestMulti(tb *testing.T) { go func() { defer wg.Done() - for i := 0; i < 7; i++ { + for i := 0; i <= 8; i++ { i := i func() { - if j == 1 && i == 3 { + if j == 1 && (i == 3 || i == 7) { defer func() { _ = recover() }() @@ -51,7 +51,13 @@ func TestMulti(tb *testing.T) { runtime.Gosched() - coach, idx := bc.Enter(j == 0) + if j == 1 && i == 6 { + bc.Queue().Out() + bc.Notify() + return + } + + coach, idx := bc.Enter(j != 0) if idx < 0 { tb.Logf("worker %2d iter %2d didn't enter %2d/%2d", j, i, coach, idx) return @@ -104,6 +110,11 @@ func TestMulti(tb *testing.T) { } else { tb.Logf("coach %2d worker %2d iter %2d res %2d %v", coach, j, i, res, err) } + + if j == 1 && i == 7 { + tb.Logf("coach %2d worker %2d iter %2d PANICS after commit", coach, j, i) + panic("panIC") + } }() } }() @@ -165,7 +176,7 @@ func BenchmarkMulti(tb *testing.B) { tb.Run("Balancer_8", func(tb *testing.B) { tb.ReportAllocs() bc.Balancer = func(x []uint64) int { - return bits.Len64(x[0]) - 1 // choose the highest number + return bits.Len64(x[0]) - 1 - 1 // highest-1 or -1 } tb.RunParallel(run) diff --git a/safe.go b/safe.go new file mode 100644 index 0000000..b0fde5c --- /dev/null +++ b/safe.go @@ -0,0 +1,103 @@ +package batch + +import "context" + +type ( + Batch[Res any] struct { + c *Coordinator[Res] + + noCopy noCopy + state byte + } + + noCopy struct{} +) + +const ( + stateNew = iota + stateQueued + stateEntered + stateCommitted + stateExited = stateNew + + usage = "BatchIn -> defer Exit -> [QueueIn] -> Enter -> Cancel/Commit/return" +) + +func By[Res any](c *Coordinator[Res]) Batch[Res] { + return Batch[Res]{ + c: c, + } +} + +func (b *Batch[Res]) QueueIn() int { + if b.state != stateNew { + panic(usage) + } + + b.state = stateQueued + + return b.c.queue.In() +} + +func (b *Batch[Res]) Enter(blocking bool) int { + switch b.state { + case stateNew: + b.QueueIn() + case stateQueued: + default: + panic(usage) + } + + idx := b.c.Enter(blocking) + if idx >= 0 { + b.state = stateEntered + } else { + b.state = stateNew + } + + return idx +} + +func (b *Batch[Res]) Trigger() { + b.c.Trigger() +} + +func (b *Batch[Res]) Cancel(ctx context.Context, err error) (Res, error) { + if b.state != stateEntered { + panic(usage) + } + + b.state = stateCommitted + + return b.c.Cancel(ctx, err) +} + +func (b *Batch[Res]) Commit(ctx context.Context) (Res, error) { + if b.state != stateEntered { + panic(usage) + } + + b.state = stateCommitted + + return b.c.Commit(ctx) +} + +func (b *Batch[Res]) Exit() int { + idx := -1 + + switch b.state { + case stateNew: + case stateQueued: + b.c.queue.Out() + b.c.Notify() + case stateEntered, + stateCommitted: + idx = b.c.Exit() + default: + panic(usage) + } + + b.state = stateExited + + return idx +} diff --git a/safe_multi.go b/safe_multi.go new file mode 100644 index 0000000..000743d --- /dev/null +++ b/safe_multi.go @@ -0,0 +1,95 @@ +package batch + +import "context" + +type ( + MultiBatch[Res any] struct { + c *Multi[Res] + + coach int + + noCopy noCopy + state byte + } +) + +func ByMulti[Res any](c *Multi[Res]) MultiBatch[Res] { + return MultiBatch[Res]{ + c: c, + } +} + +func (b *MultiBatch[Res]) QueueIn() int { + if b.state != stateNew { + panic(usage) + } + + b.state = stateQueued + + return b.c.queue.In() +} + +func (b *MultiBatch[Res]) Enter(blocking bool) (coach, idx int) { + switch b.state { + case stateNew: + b.QueueIn() + case stateQueued: + default: + panic(usage) + } + + coach, idx = b.c.Enter(blocking) + if idx >= 0 { + b.state = stateEntered + } else { + b.state = stateNew + } + + b.coach = coach + + return coach, idx +} + +func (b *MultiBatch[Res]) Trigger() { + b.c.Trigger(b.coach) +} + +func (b *MultiBatch[Res]) Cancel(ctx context.Context, err error) (Res, error) { + if b.state != stateEntered { + panic(usage) + } + + b.state = stateCommitted + + return b.c.Cancel(ctx, b.coach, err) +} + +func (b *MultiBatch[Res]) Commit(ctx context.Context) (Res, error) { + if b.state != stateEntered { + panic(usage) + } + + b.state = stateCommitted + + return b.c.Commit(ctx, b.coach) +} + +func (b *MultiBatch[Res]) Exit() int { + idx := -1 + + switch b.state { + case stateNew: + case stateQueued: + b.c.queue.Out() + b.c.Notify() + case stateEntered, + stateCommitted: + idx = b.c.Exit(b.coach) + default: + panic(usage) + } + + b.state = stateExited + + return idx +}