Skip to content

Commit

Permalink
docs update
Browse files Browse the repository at this point in the history
  • Loading branch information
nikandfor committed Jun 27, 2024
1 parent c660ace commit 2b48636
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 55 deletions.
33 changes: 20 additions & 13 deletions batch4.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
)

type (
// Queue is a queue of waiting workers.
// Queue of workers waiting to Enter the batch.
Queue int32

// Coordinator coordinates workers to update shared state,
// commit it and deliver result to all participated workers.
// commit it, and deliver result to all participated workers.
Coordinator[Res any] struct {
// CommitFunc is called to commit shared shate.
//
// It's already called owning critical section. Enter-Exit cycle must not be called from it.
//
// Required.
CommitFunc func(ctx context.Context) (Res, error)

Expand All @@ -42,7 +44,7 @@ type (
}

// PanicError is returned to all the workers in the batch if one panicked.
// The panicked worker recieves panic, not an error.
// The panicked worker gets panic, not an error.
PanicError struct {
Panic interface{}
}
Expand All @@ -64,15 +66,17 @@ func (c *Coordinator[Res]) Init(f func(ctx context.Context) (Res, error)) {
c.CommitFunc = f
}

// Gets the queue where workers get it to be waited for.
// Gets the queue of waitng workers.
//
// Worker can leave the Queue, but it must call Notify to wake up waiting workers.
// Worker can leave the Queue before Enter,
// but we must call Notify to wake up waiting workers.
func (c *Coordinator[Res]) Queue() *Queue {
return &c.lock.queue
}

// Notify wakes up waiting workers.
// Must be called if the worker left the queue.
//
// Must be called if the worker left the Queue before Enter.
func (c *Coordinator[Res]) Notify() {
c.cond.Broadcast()
}
Expand All @@ -83,9 +87,9 @@ func (c *Coordinator[Res]) Notify() {
// It's similar to Mutex.Lock.
// Pair method Exit must be called if Enter was successful (returned value >= 0).
// It returns index of entered worker.
// 0 means we are the first and we should reset the shared state.
// 0 means we are the first in the batch and we should reset shared state.
// If blocking == false and batch is not available negative value returned.
// Enter also removes worker from the queue.
// Enter also removes the worker from the queue.
func (c *Coordinator[Res]) Enter(blocking bool) int {
c.mu.Lock()

Expand Down Expand Up @@ -114,9 +118,11 @@ func (c *Coordinator[Res]) Enter(blocking bool) int {
}

// Exit exits the critical section.
// It should be called with defer just after we successfuly Enter the batch.
// It should be called with defer just after we successfuly Entered the batch.
// It's similar to Mutex.Unlock.
// Retururns number of workers still in the batch (blocked in Commit/Cancel call).
// Returns number of workers have not Exited yet.
// 0 means we are the last exiting the batch, state can be reset here.
// But remember the case when worker have panicked.
func (c *Coordinator[Res]) Exit() int {
defer func() {
c.mu.Unlock()
Expand Down Expand Up @@ -148,14 +154,15 @@ func (c *Coordinator[Res]) Exit() int {
return idx
}

// Trigger triggers the batch to Commit.
// We can call Commit or return with the error.
// Trigger batch to Commit.
// We can call both Commit or Exit after that.
// If we added our data to the batch or if we didn't respectively.
// So we will be part of the batch or not.
func (c *Coordinator[Res]) Trigger() {
c.trigger = true
}

// Commit waits for the waiting workes to add its data to the batch,
// Commit waits for the waiting workes to add their data to the batch,
// calls Coordinator.Commit only once for the batch,
// and returns the same shared result to all workers.
func (c *Coordinator[Res]) Commit(ctx context.Context) (Res, error) {
Expand Down
50 changes: 26 additions & 24 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ Workers are distributed among multiple coaches and then each coach works the sam
bc.Queue().In() // get in the queue
// Get data ready for the commit.
// All the blocking operations should be done here.
// All the independent from other workers operations should be done here.
data := 1
if leave() {
bc.Queue().Out() // get out of the queue
bc.Notify() // notify waiting goroutines we won't come
return
bc.Queue().Out() // Get out of the queue.
bc.Notify() // Notify waiting goroutines we won't come.
return errNoNeed
}
idx := bc.Enter(true) // true to block and wait, false for non-blocking return if batch is in the process of commiting
// just like with the Mutex there are no guaranties how will enter the first
if idx < 0 { // in non-blocking mode we didn't enter the batch, it's always >= 0 in blocking mode
return // no need to do anything here
idx := bc.Enter(true) // true to block and wait, false for non-blocking return if batch is in the process of commiting.
// Just like with the Mutex there are no guaranties who will enter first.
if idx < 0 { // In non-blocking mode we didn't Enter the batch, it's always >= 0 in blocking mode.
return errBusy // No need to do anything here.
}
defer bc.Exit() // if we entered we must exit
Expand All @@ -61,39 +61,41 @@ Workers are distributed among multiple coaches and then each coach works the sam
sum = 0
}
if full() { // if we didn't change the state we can just leave.
bc.Trigger() // Optionally we can trigger the batch.
// Common use case is to flush the data if we won't fit.
return // Then we may return and retry in the next batch.
if full() { // if we didn't change the state we can just leave.
bc.Trigger() // Optionally we can trigger the batch.
// Common use case is to flush the data if we won't fit.
return errRetry // Then we may return and retry in the next batch.
}
sum += data // adding our work to the batch
if spoilt() {
_, err = bc.Cancel(ctx, causeErr) // cancel the batch, commit isn't done, all get causeErr
// if causeErr is nil it's set to Canceled
if spoilt() { // If we spoilt the satate and want to abort commit
_, err = bc.Cancel(ctx, causeErr) // cancel the batch. Commit isn't done, all workers get causeErr.
// If causeErr is nil it's set to Canceled.
}
if failed() {
panic("we can safely panic here") // panic will be propogated to the caller,
// other goroutines in the batch will get PanicError
if failed() { // Suppose some library panicked here.
panic("we can safely panic here") // Panic will be propogated to the caller,
// other goroutines in the batch will get PanicError.
}
if urgent() {
bc.Trigger() // do not wait for the others
if urgent() { // If we want to commit immideately
bc.Trigger() // trigger it.
// Workers already added their job will be committed too.
// Workers haven't entered the batch will go to the next one.
}
res, err := bc.Commit(ctx) // get ready to commit.
// The last goroutine entered the batch calls the actual commit.
res, err := bc.Commit(ctx) // Call Commit.
// The last goroutine entered the batch calls the actual Coordinator.Commit.
// All the others wait and get the same res and error.
Batch is a safe wrapper around Coordinator.
It will call missed methods if that makes sense or it will panic otherwise.
It will call missed methods if that makes sense or panic otherwise.
Multi is a coordinator with N available parallel batches.
Suppose you have 3 db replicas so you can distribute load across them.
Multi.Enter will enter the first free coach (replica in our example) and return its index.
The rest is similar. Custom logic for choosing coach can be used by setting Multi.Balancer.
The rest is similar to Coordinator. Custom logic for choosing coach can be used by setting Multi.Balancer.
MultiBatch is a safe wrapper around Multi just like Batch wraps Coordinator.
*/
Expand Down
42 changes: 24 additions & 18 deletions multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ import (
)

type (
// Multi is a coordinator to choose and access multiple coaches.
// Multi is a coordinator for multiple parallel batches.
// Multi can't be created as a literal,
// it must be initialized either by NewMulti or Init.
Multi[Res any] struct {
// CommitFunc is called to commit result of coach number coach.
// CommitFunc is called to commit result of batch number coach.
//
// It's already called owning critical section. Enter-Exit cycle must not be called from it.
CommitFunc func(ctx context.Context, coach int) (Res, error)

// Balancer chooses replica among available or it can choose to wait for more.
// bitset is a set of available coaches.
// Coach n is available <=> bitset[n/64] & (1<<(n%64)) != 0.
// If n is >= 0 and the coach is available it proceeds with it.
// If it's not Enter blocks or returns -1, -1 according to Enter argument.
// If returned value >= 0 and that coach is available it proceeds with it.
// If returned value < 0 or that coach is not available
// worker acts as there were no available coaches.
Balancer func(bitset []uint64) int
available []uint64

Expand All @@ -24,7 +29,7 @@ type (
}
)

// NewMulti create new Multi coordinator with n parallel coaches.
// NewMulti create new Multi coordinator with n parallel batches.
func NewMulti[Res any](n int, f func(ctx context.Context, coach int) (Res, error)) *Multi[Res] {
return &Multi[Res]{
CommitFunc: f,
Expand All @@ -40,22 +45,23 @@ func (c *Multi[Res]) Init(n int, f func(ctx context.Context, coach int) (Res, er
c.cs = make([]coach[Res], n)
}

// Queue returns common queue for entering coaches.
// Queue returns common queue for all coaches.
// Getting into it means already started batches will wait for it not committing yet.
//
// Worker can leave the Queue, but it must call Notify to wake up waiting workers.
// Worker can leave the Queue before Enter, but we must call Notify to wake up waiting workers.
func (c *Multi[Res]) Queue() *Queue {
return &c.lock.queue
}

// Notify wakes up waiting workers. Can be used if you were waited,
// but now situation has changed.
// Notify wakes up waiting workers.
//
// Must be called if the worker left the Queue before Enter.
func (c *Multi[Res]) Notify() {
c.cond.Broadcast()
}

// Enter enters available batch.
// It will return -1, -1 if no coaches available and blocking == false.
// Enter available batch.
// It will return -1, -1 if blocking == false and no batches available.
// Enter also removes worker from the queue.
//
// See also documentation for Coordinator.Enter.
Expand Down Expand Up @@ -127,7 +133,7 @@ func (c *Multi[Res]) enterBalancer() (coach, idx int) {
return coach, c.cs[coach].cnt - 1
}

// Exit exits the batch. Should be called with defer.
// Exit the batch. Should be called with defer.
// It works similar to Mutex.Unlock in the sense it unlocks shared resources.
func (c *Multi[Res]) Exit(coach int) int {
defer func() {
Expand Down Expand Up @@ -162,26 +168,26 @@ func (c *Multi[Res]) Exit(coach int) int {
return idx
}

// Trigger triggers the batch to commit.
// Trigger the batch to commit.
// Must be called inside Enter-Exit section.
//
// Can be used to flush current batch and retry.
// Commit only happens when current worker Exits from the section.
// Can be used to flush current batch. With our data or without and then we can retry.
// Commit happens when current worker Exits from critical section.
// So you need to Exit and then get into the Queue and Enter again to retry.
func (c *Multi[Res]) Trigger(coach int) {
c.cs[coach].trigger = true
}

// Commit indicates the current worker is done with the shared data and ready to Commit it.
// Commit blocks until batch is committed. The same Res is returned to all the workers in the batch.
// Commit indicates the current worker is done with the shared state and ready to Commit it.
// Commit blocks until batch is committed. The same Res and error is returned to all the workers in the batch.
// (Res, error) is what the Multi.Commit returns.
func (c *Multi[Res]) Commit(ctx context.Context, coach int) (Res, error) {
return commit(ctx, &c.lock, &c.cs[coach], nil, func(ctx context.Context) (Res, error) {
return c.CommitFunc(ctx, coach)
})
}

// Cancel indicates the current worked is done with shared data but it can't be committed.
// Cancel indicates the current worker is done with shared data but it mustn't be committed.
// All the workers from the same batch receive zero Res and the same error.
//
// It can be used if batch shared state have been spoilt as a result of error or something.
Expand Down

0 comments on commit 2b48636

Please sign in to comment.