diff --git a/batch4.go b/batch4.go index 709f2bf..d2c2795 100644 --- a/batch4.go +++ b/batch4.go @@ -8,70 +8,16 @@ import ( "sync/atomic" ) -/* -Full guide. - - bc.Queue().In() // get in the queue - - // Get data ready for the commit. - // All the blocking operations should be done here. - data := 1 - - if leave() { - bc.Queue().Out() // get out of the queue - bc.Notify() // notify waiting goroutines we won't come - return - } - - idx := bc.Enter(true) // true to block and wait, false for non-blocking return if batch is in the process of commiting - // just like with the Mutex there are no guaranties how will enter the first - if idx < 0 { // in non-blocking mode we didn't enter the batch, it's always >= 0 in blocking mode - return // no need to do anything here - } - - defer bc.Exit() // if we entered we must exit - _ = 0 // calling it with defer ensures state consistency in case of panic - - // We are inside locked Mutex between Enter and Exit. - // So the shared state can be modified safely. - // That also means that all the long and heavy operations - // should be done before Enter. - - if idx == 0 { // we are first in the batch, reset the state - sum = 0 - } - - if full() { // if we didn't change the state we can just leave. - bc.Trigger() // Optionally we can trigger the batch. - // Common use case is to flush the data if we won't fit. - return // Then we may return and retry in the next batch. - } - - sum += data // adding our work to the batch - - if spoilt() { - _, err = bc.Cancel(ctx, causeErr) // cancel the batch, commit isn't done, all get causeErr - // if causeErr is nil it's set to Canceled - } - - if failed() { - panic("we can safely panic here") // panic will be propogated to the caller, - // other goroutines in the batch will get PanicError - } - - if urgent() { - bc.Trigger() // do not wait for the others - } - - res, err := bc.Commit(ctx) // get ready to commit. - // The last goroutine entered the batch calls the actual commit. - // All the others wait and get the same res and error. -*/ - type ( + // Queue is a queue of waiting workers. Queue int32 + // Coordinator coordinates workers to update shared state, + // commit it and deliver result to all participated workers. Coordinator[Res any] struct { + // CommitFunc is called to commit shared shate. + // + // Required. CommitFunc func(ctx context.Context) (Res, error) lock @@ -95,31 +41,50 @@ type ( trigger bool } + // PanicError is returned to all the workers in the batch if one panicked. + // The panicked worker recieves panic, not an error. PanicError struct { Panic interface{} } ) +// Canceled is a default error returned to workers if Cancel was called with nil err. var Canceled = errors.New("batch canceled") +// New creates new Coordinator. func New[Res any](f func(ctx context.Context) (Res, error)) *Coordinator[Res] { return &Coordinator[Res]{ CommitFunc: f, } } +// Init initiates zero Coordinator. +// It can also be used as Reset but not in parallel with its usage. func (c *Coordinator[Res]) Init(f func(ctx context.Context) (Res, error)) { c.CommitFunc = f } +// Gets the queue where workers get it to be waited for. +// +// Worker can leave the Queue, but it must call Notify to wake up waiting workers. func (c *Coordinator[Res]) Queue() *Queue { return &c.lock.queue } +// Notify wakes up waiting workers. +// Must be called if the worker left the queue. func (c *Coordinator[Res]) Notify() { c.cond.Broadcast() } +// Enter enters the batch. +// When the call returns we are in the critical section. +// Shared resources can be used safely. +// It's similar to Mutex.Lock. +// Pair method Exit must be called if Enter was successful (returned value >= 0). +// It returns index of entered worker. +// 0 means we are the first and we should reset the shared state. +// If blocking == false and batch is not available negative value returned. func (c *Coordinator[Res]) Enter(blocking bool) int { c.mu.Lock() @@ -147,6 +112,10 @@ func (c *Coordinator[Res]) Enter(blocking bool) int { return c.cnt - 1 } +// Exit exits the critical section. +// It should be called with defer just after we successfuly Enter the batch. +// It's similar to Mutex.Unlock. +// Retururns number of workers still in the batch (blocked in Commit/Cancel call). func (c *Coordinator[Res]) Exit() int { defer func() { c.mu.Unlock() @@ -178,14 +147,23 @@ func (c *Coordinator[Res]) Exit() int { return idx } +// Trigger triggers the batch to Commit. +// We can call Commit or return with the error. +// If we added our data to the batch or if we didn't respectively. func (c *Coordinator[Res]) Trigger() { c.trigger = true } +// Commit waits for the waiting workes to add its data to the batch, +// calls Coordinator.Commit only once for the batch, +// and returns the same shared result to all workers. func (c *Coordinator[Res]) Commit(ctx context.Context) (Res, error) { return commit[Res](ctx, &c.lock, &c.coach, nil, c.CommitFunc) } +// Cancel aborts current batch and returns the same error to all workers already added their data to the batch. +// Coordinator.Commit is not called. +// Waiting workers but not Entered the critical section are not affected. func (c *Coordinator[Res]) Cancel(ctx context.Context, err error) (Res, error) { if err == nil { err = Canceled @@ -235,18 +213,22 @@ again: return cc.res, cc.err } +// In gets into the queue. func (q *Queue) In() int { return int(atomic.AddInt32((*int32)(q), 1)) } +// In gets out of the queue. func (q *Queue) Out() int { return int(atomic.AddInt32((*int32)(q), -1)) } +// Len is the number of workers in the queue. func (q *Queue) Len() int { return int(atomic.LoadInt32((*int32)(q))) } +// AsPanicError unwrapes PanicError. func AsPanicError(err error) (PanicError, bool) { var pe PanicError diff --git a/coordinator_example_test.go b/coordinator_example_test.go index 82d8681..ff682ed 100644 --- a/coordinator_example_test.go +++ b/coordinator_example_test.go @@ -12,13 +12,15 @@ import ( var jobs = flag.Int("jobs", 5, "parallel jobs") -type Service struct { - sum int // state we collect to commit together +type ( + Service struct { + sum int // state we collect to commit together - bc *batch.Coordinator[int] // [int] is the result value type, set to struct{} if don't need it -} + bc *batch.Coordinator[int] // [int] is the result value type, set to struct{} if don't need it + } -type contextKey struct{} + contextKey struct{} +) func NewService() *Service { s := &Service{} @@ -94,6 +96,5 @@ func ExampleCoordinator() { } wg.Wait() - // Output: } diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..7ae0deb --- /dev/null +++ b/doc.go @@ -0,0 +1,100 @@ +/* +batch made easy. + +batch is a safe and efficient way to combine results of multiple parallel tasks +into one heavy "commit" (save) operation. +batch ensures each task is either receive successful result if its results +have been committed (possibly as a parth of a batch) or an error in other case. + +Logic for Coordinator + + ------------------------- + -> worker0 ------> | common cumulative state | /--> worker0 + -> worker1 ------> | collected from multiple | -> result ---> worker1 + -> worker2 ------> | workers and committed | \--> worker2 + -> worker3 ------> | all at once | \-> worker3 + ------------------------- + +# Logic for Multi + +Workers are distributed among multiple coaches and then each coach works the same as in Coordinator case. + + ------------------------- + | -> | state is combined in a | /--> worker0 + -> worker0 -> | -> | few independent coaches | -> result ---> worker1 + -> worker1 -> | ------------------------- + -> worker2 -> | + -> worker3 -> | ------------------------- + | -> | each is committed as an | /--> worker2 + | -> | independent part | -> result ---> worker3 + ------------------------- + +Full Guide + + bc.Queue().In() // get in the queue + + // Get data ready for the commit. + // All the blocking operations should be done here. + data := 1 + + if leave() { + bc.Queue().Out() // get out of the queue + bc.Notify() // notify waiting goroutines we won't come + return + } + + idx := bc.Enter(true) // true to block and wait, false for non-blocking return if batch is in the process of commiting + // just like with the Mutex there are no guaranties how will enter the first + if idx < 0 { // in non-blocking mode we didn't enter the batch, it's always >= 0 in blocking mode + return // no need to do anything here + } + + defer bc.Exit() // if we entered we must exit + _ = 0 // calling it with defer ensures state consistency in case of panic + + // We are inside locked Mutex between Enter and Exit. + // So the shared state can be modified safely. + // That also means that all the long and heavy operations + // should be done before Enter. + + if idx == 0 { // we are first in the batch, reset the state + sum = 0 + } + + if full() { // if we didn't change the state we can just leave. + bc.Trigger() // Optionally we can trigger the batch. + // Common use case is to flush the data if we won't fit. + return // Then we may return and retry in the next batch. + } + + sum += data // adding our work to the batch + + if spoilt() { + _, err = bc.Cancel(ctx, causeErr) // cancel the batch, commit isn't done, all get causeErr + // if causeErr is nil it's set to Canceled + } + + if failed() { + panic("we can safely panic here") // panic will be propogated to the caller, + // other goroutines in the batch will get PanicError + } + + if urgent() { + bc.Trigger() // do not wait for the others + } + + res, err := bc.Commit(ctx) // get ready to commit. + // The last goroutine entered the batch calls the actual commit. + // All the others wait and get the same res and error. + +Batch is a safe wrapper around Coordinator. +It will call missed methods if that makes sense or it will panic otherwise. + +Multi is a coordinator with N available parallel batches. +Suppose you have 3 db replicas so you can distribute load across them. +Multi.Enter will enter the first free coach (replica in our example) and return its index. +The rest is similar. Custom logic for choosing coach can be used by setting Multi.Balancer. + +MultiBatch is a safe wrapper around Multi just like Batch wraps Coordinator. +*/ +package batch diff --git a/multi.go b/multi.go index 2a99ea6..dd3386a 100644 --- a/multi.go +++ b/multi.go @@ -5,9 +5,16 @@ import ( ) type ( + // Multi is a coordinator to choose and access multiple coaches. Multi[Res any] struct { + // CommitFunc is called to commit result of coach number coach. CommitFunc func(ctx context.Context, coach int) (Res, error) + // Balancer chooses replica among available or it can choose to wait for more. + // bitset is a set of available coaches. + // Coach n is available <=> bitset[n/64] & (1<<(n%64)) != 0. + // If n is >= 0 and the coach is available it proceeds with it. + // If it's not Enter blocks or returns -1, -1 according to Enter argument. Balancer func(bitset []uint64) int available []uint64 @@ -17,6 +24,7 @@ type ( } ) +// NewMulti create new Multi coordinator with n parallel coaches. func NewMulti[Res any](n int, f func(ctx context.Context, coach int) (Res, error)) *Multi[Res] { return &Multi[Res]{ CommitFunc: f, @@ -25,19 +33,31 @@ func NewMulti[Res any](n int, f func(ctx context.Context, coach int) (Res, error } } +// Init initiates zero Multi. +// It can also be used as Reset but not in parallel with its usage. func (c *Multi[Res]) Init(n int, f func(ctx context.Context, coach int) (Res, error)) { c.CommitFunc = f c.cs = make([]coach[Res], n) } +// Queue returns common queue for entering coaches. +// Getting into it means already started batches will wait for it not committing yet. +// +// Worker can leave the Queue, but it must call Notify to wake up waiting workers. func (c *Multi[Res]) Queue() *Queue { return &c.lock.queue } +// Notify wakes up waiting workers. Can be used if you were waited, +// but now situation has changed. func (c *Multi[Res]) Notify() { c.cond.Broadcast() } +// Enter enters available batch. +// It will return -1, -1 if no coaches available and blocking == false. +// +// coach choice can be configured by setting custom Multi.Balancer. func (c *Multi[Res]) Enter(blocking bool) (coach, idx int) { c.mu.Lock() @@ -104,6 +124,8 @@ func (c *Multi[Res]) enterBalancer() (coach, idx int) { return coach, c.cs[coach].cnt - 1 } +// Exit exits the batch. Should be called with defer. +// It works similar to Mutex.Unlock in the sense it unlocks shared resources. func (c *Multi[Res]) Exit(coach int) int { defer func() { c.mu.Unlock() @@ -137,16 +159,29 @@ func (c *Multi[Res]) Exit(coach int) int { return idx } +// Trigger triggers the batch to commit. +// Must be called inside Enter-Exit section. +// +// Can be used to flush current batch and retry. +// Commit only happens when current worker Exits from the section. +// So you need to Exit and then get into the Queue and Enter again to retry. func (c *Multi[Res]) Trigger(coach int) { c.cs[coach].trigger = true } +// Commit indicates the current worker is done with the shared data and ready to Commit it. +// Commit blocks until batch is committed. The same Res is returned to all the workers in the batch. +// (Res, error) is what the Multi.Commit returns. func (c *Multi[Res]) Commit(ctx context.Context, coach int) (Res, error) { return commit(ctx, &c.lock, &c.cs[coach], nil, func(ctx context.Context) (Res, error) { return c.CommitFunc(ctx, coach) }) } +// Cancel indicates the current worked is done with shared data but it can't be committed. +// All the workers from the same batch receive zero Res and the same error. +// +// It can be used if batch shared state have been spoilt as a result of error or something. func (c *Multi[Res]) Cancel(ctx context.Context, coach int, err error) (Res, error) { if err == nil { err = Canceled diff --git a/safe_example_test.go b/safe_example_test.go new file mode 100644 index 0000000..f3f711d --- /dev/null +++ b/safe_example_test.go @@ -0,0 +1,94 @@ +package batch_test + +import ( + "context" + "errors" + "log" + "sync" + + "nikand.dev/go/batch" +) + +type SafeService struct { + sum int // state we collect to commit together + + bc *batch.Coordinator[int] // [int] is the result value type, set to struct{} if don't need it +} + +func NewSafeService() *SafeService { + s := &SafeService{} + + s.bc = batch.New(s.commit) + + return s +} + +func (s *SafeService) commit(ctx context.Context) (int, error) { + // suppose some heavy operation here + // update a file or write to db + + log.Printf("* * * commit %2d * * *", s.sum) + + return s.sum, nil +} + +func (s *SafeService) DoWork(ctx context.Context, data int) (int, error) { + b := batch.By(s.bc) + defer b.Exit() // it's like Mutex.Unlock, but safely works even if we didn't enter + _ = 0 // Must be called with defer to outlive panics + + b.QueueIn() // let others know we are going to join + + _ = data // prepare data + + idx := b.Enter(true) // true for blocking, false if we want to leave instead of waiting + if idx < 0 { // we haven't entered the batch in non blocking mode + return 0, errors.New("not this time") // we have to leave in that case + } + + if idx == 0 { // we are first in the batch, reset the state + s.sum = 0 + log.Printf("* * * reset batch * * *") + } + + log.Printf("worker %2d got in with index %2d", ctx.Value(contextKey{}), idx) + + s.sum += data // add our work to the batch + + // only one of return/Cancel/Commit must be called and only once + res, err := b.Commit(ctx) + if err != nil { // batch failed, each worker in it will get the same error + return 0, err + } + + log.Printf("worker %2d got result %v %v", ctx.Value(contextKey{}), res, err) + + // if we are here, all of the workers have their work committed + + return res, nil +} + +func ExampleBatch() { + s := NewSafeService() + + // let's spin up some workers + var wg sync.WaitGroup + + for j := 0; j < *jobs; j++ { + j := j + wg.Add(1) + + go func() { + defer wg.Done() + + ctx := context.Background() // passed to commit function + ctx = context.WithValue(ctx, contextKey{}, j) + + res, err := s.DoWork(ctx, 1) + _, _ = res, err + }() + } + + wg.Wait() + // Output: +} diff --git a/semaphore.go b/semaphore.go index 31f2395..fc8f08a 100644 --- a/semaphore.go +++ b/semaphore.go @@ -3,6 +3,9 @@ package batch import "sync" type ( + // Semaphore is a classical semaphore synchronization primitive. + // All methods can be safely called on nil Semaphore. + // They will do nothing like you have unlimited semaphore. Semaphore struct { mu sync.Mutex cond sync.Cond @@ -11,6 +14,7 @@ type ( } ) +// NewSemaphore creates a new semaphore with capacity of n. func NewSemaphore(n int) *Semaphore { b := &Semaphore{} @@ -19,6 +23,10 @@ func NewSemaphore(n int) *Semaphore { return b } +// Reset resets semaphore capacity. +// But not the current value, which means it can be used +// to update limit on the fly, but it can't be used to reset +// inconsistent semaphore. func (b *Semaphore) Reset(n int) { if b == nil { return @@ -34,6 +42,7 @@ func (b *Semaphore) Reset(n int) { b.lim = n } +// Enter critical section. func (b *Semaphore) Enter() int { if b == nil { return 0 @@ -51,6 +60,7 @@ func (b *Semaphore) Enter() int { return b.n } +// Exit from critical section. func (b *Semaphore) Exit() { if b == nil { return @@ -63,6 +73,7 @@ func (b *Semaphore) Exit() { b.cond.Signal() } +// Len is a number of tasks in the critical section. func (b *Semaphore) Len() int { if b == nil { return 0 @@ -74,6 +85,7 @@ func (b *Semaphore) Len() int { return b.n } +// Cap is a semaphore capacity. func (b *Semaphore) Cap() int { if b == nil { return 0