Skip to content

Commit

Permalink
documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
nikandfor committed Jun 27, 2024
1 parent cd8ff9f commit 2af6a16
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 66 deletions.
102 changes: 42 additions & 60 deletions batch4.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,16 @@ import (
"sync/atomic"
)

/*
Full guide.
bc.Queue().In() // get in the queue
// Get data ready for the commit.
// All the blocking operations should be done here.
data := 1
if leave() {
bc.Queue().Out() // get out of the queue
bc.Notify() // notify waiting goroutines we won't come
return
}
idx := bc.Enter(true) // true to block and wait, false for non-blocking return if batch is in the process of commiting
// just like with the Mutex there are no guaranties how will enter the first
if idx < 0 { // in non-blocking mode we didn't enter the batch, it's always >= 0 in blocking mode
return // no need to do anything here
}
defer bc.Exit() // if we entered we must exit
_ = 0 // calling it with defer ensures state consistency in case of panic
// We are inside locked Mutex between Enter and Exit.
// So the shared state can be modified safely.
// That also means that all the long and heavy operations
// should be done before Enter.
if idx == 0 { // we are first in the batch, reset the state
sum = 0
}
if full() { // if we didn't change the state we can just leave.
bc.Trigger() // Optionally we can trigger the batch.
// Common use case is to flush the data if we won't fit.
return // Then we may return and retry in the next batch.
}
sum += data // adding our work to the batch
if spoilt() {
_, err = bc.Cancel(ctx, causeErr) // cancel the batch, commit isn't done, all get causeErr
// if causeErr is nil it's set to Canceled
}
if failed() {
panic("we can safely panic here") // panic will be propogated to the caller,
// other goroutines in the batch will get PanicError
}
if urgent() {
bc.Trigger() // do not wait for the others
}
res, err := bc.Commit(ctx) // get ready to commit.
// The last goroutine entered the batch calls the actual commit.
// All the others wait and get the same res and error.
*/

type (
// Queue is a queue of waiting workers.
Queue int32

// Coordinator coordinates workers to update shared state,
// commit it and deliver result to all participated workers.
Coordinator[Res any] struct {
// CommitFunc is called to commit shared shate.
//
// Required.
CommitFunc func(ctx context.Context) (Res, error)

lock
Expand All @@ -95,31 +41,50 @@ type (
trigger bool
}

// PanicError is returned to all the workers in the batch if one panicked.
// The panicked worker recieves panic, not an error.
PanicError struct {
Panic interface{}
}
)

// Canceled is a default error returned to workers if Cancel was called with nil err.
var Canceled = errors.New("batch canceled")

// New creates new Coordinator.
func New[Res any](f func(ctx context.Context) (Res, error)) *Coordinator[Res] {
return &Coordinator[Res]{
CommitFunc: f,
}
}

// Init initiates zero Coordinator.
// It can also be used as Reset but not in parallel with its usage.
func (c *Coordinator[Res]) Init(f func(ctx context.Context) (Res, error)) {
c.CommitFunc = f
}

// Gets the queue where workers get it to be waited for.
//
// Worker can leave the Queue, but it must call Notify to wake up waiting workers.
func (c *Coordinator[Res]) Queue() *Queue {
return &c.lock.queue
}

// Notify wakes up waiting workers.
// Must be called if the worker left the queue.
func (c *Coordinator[Res]) Notify() {
c.cond.Broadcast()
}

// Enter enters the batch.
// When the call returns we are in the critical section.
// Shared resources can be used safely.
// It's similar to Mutex.Lock.
// Pair method Exit must be called if Enter was successful (returned value >= 0).
// It returns index of entered worker.
// 0 means we are the first and we should reset the shared state.
// If blocking == false and batch is not available negative value returned.
func (c *Coordinator[Res]) Enter(blocking bool) int {
c.mu.Lock()

Expand Down Expand Up @@ -147,6 +112,10 @@ func (c *Coordinator[Res]) Enter(blocking bool) int {
return c.cnt - 1
}

// Exit exits the critical section.
// It should be called with defer just after we successfuly Enter the batch.
// It's similar to Mutex.Unlock.
// Retururns number of workers still in the batch (blocked in Commit/Cancel call).
func (c *Coordinator[Res]) Exit() int {
defer func() {
c.mu.Unlock()
Expand Down Expand Up @@ -178,14 +147,23 @@ func (c *Coordinator[Res]) Exit() int {
return idx
}

// Trigger triggers the batch to Commit.
// We can call Commit or return with the error.
// If we added our data to the batch or if we didn't respectively.
func (c *Coordinator[Res]) Trigger() {
c.trigger = true
}

// Commit waits for the waiting workes to add its data to the batch,
// calls Coordinator.Commit only once for the batch,
// and returns the same shared result to all workers.
func (c *Coordinator[Res]) Commit(ctx context.Context) (Res, error) {
return commit[Res](ctx, &c.lock, &c.coach, nil, c.CommitFunc)
}

// Cancel aborts current batch and returns the same error to all workers already added their data to the batch.
// Coordinator.Commit is not called.
// Waiting workers but not Entered the critical section are not affected.
func (c *Coordinator[Res]) Cancel(ctx context.Context, err error) (Res, error) {
if err == nil {
err = Canceled
Expand Down Expand Up @@ -235,18 +213,22 @@ again:
return cc.res, cc.err
}

// In gets into the queue.
func (q *Queue) In() int {
return int(atomic.AddInt32((*int32)(q), 1))
}

// In gets out of the queue.
func (q *Queue) Out() int {
return int(atomic.AddInt32((*int32)(q), -1))
}

// Len is the number of workers in the queue.
func (q *Queue) Len() int {
return int(atomic.LoadInt32((*int32)(q)))
}

// AsPanicError unwrapes PanicError.
func AsPanicError(err error) (PanicError, bool) {
var pe PanicError

Expand Down
13 changes: 7 additions & 6 deletions coordinator_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (

var jobs = flag.Int("jobs", 5, "parallel jobs")

type Service struct {
sum int // state we collect to commit together
type (
Service struct {
sum int // state we collect to commit together

bc *batch.Coordinator[int] // [int] is the result value type, set to struct{} if don't need it
}
bc *batch.Coordinator[int] // [int] is the result value type, set to struct{} if don't need it
}

type contextKey struct{}
contextKey struct{}
)

func NewService() *Service {
s := &Service{}
Expand Down Expand Up @@ -94,6 +96,5 @@ func ExampleCoordinator() {
}

wg.Wait()

// Output:
}
100 changes: 100 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
batch made easy.
batch is a safe and efficient way to combine results of multiple parallel tasks
into one heavy "commit" (save) operation.
batch ensures each task is either receive successful result if its results
have been committed (possibly as a parth of a batch) or an error in other case.
Logic for Coordinator
-------------------------
-> worker0 ------> | common cumulative state | /--> worker0
-> worker1 ------> | collected from multiple | -> result ---> worker1
-> worker2 ------> | workers and committed | \--> worker2
-> worker3 ------> | all at once | \-> worker3
-------------------------
# Logic for Multi
Workers are distributed among multiple coaches and then each coach works the same as in Coordinator case.
-------------------------
| -> | state is combined in a | /--> worker0
-> worker0 -> | -> | few independent coaches | -> result ---> worker1
-> worker1 -> | -------------------------
-> worker2 -> |
-> worker3 -> | -------------------------
| -> | each is committed as an | /--> worker2
| -> | independent part | -> result ---> worker3
-------------------------
Full Guide
bc.Queue().In() // get in the queue
// Get data ready for the commit.
// All the blocking operations should be done here.
data := 1
if leave() {
bc.Queue().Out() // get out of the queue
bc.Notify() // notify waiting goroutines we won't come
return
}
idx := bc.Enter(true) // true to block and wait, false for non-blocking return if batch is in the process of commiting
// just like with the Mutex there are no guaranties how will enter the first
if idx < 0 { // in non-blocking mode we didn't enter the batch, it's always >= 0 in blocking mode
return // no need to do anything here
}
defer bc.Exit() // if we entered we must exit
_ = 0 // calling it with defer ensures state consistency in case of panic
// We are inside locked Mutex between Enter and Exit.
// So the shared state can be modified safely.
// That also means that all the long and heavy operations
// should be done before Enter.
if idx == 0 { // we are first in the batch, reset the state
sum = 0
}
if full() { // if we didn't change the state we can just leave.
bc.Trigger() // Optionally we can trigger the batch.
// Common use case is to flush the data if we won't fit.
return // Then we may return and retry in the next batch.
}
sum += data // adding our work to the batch
if spoilt() {
_, err = bc.Cancel(ctx, causeErr) // cancel the batch, commit isn't done, all get causeErr
// if causeErr is nil it's set to Canceled
}
if failed() {
panic("we can safely panic here") // panic will be propogated to the caller,
// other goroutines in the batch will get PanicError
}
if urgent() {
bc.Trigger() // do not wait for the others
}
res, err := bc.Commit(ctx) // get ready to commit.
// The last goroutine entered the batch calls the actual commit.
// All the others wait and get the same res and error.
Batch is a safe wrapper around Coordinator.
It will call missed methods if that makes sense or it will panic otherwise.
Multi is a coordinator with N available parallel batches.
Suppose you have 3 db replicas so you can distribute load across them.
Multi.Enter will enter the first free coach (replica in our example) and return its index.
The rest is similar. Custom logic for choosing coach can be used by setting Multi.Balancer.
MultiBatch is a safe wrapper around Multi just like Batch wraps Coordinator.
*/
package batch
Loading

0 comments on commit 2af6a16

Please sign in to comment.