Skip to content

Commit

Permalink
improve example
Browse files Browse the repository at this point in the history
  • Loading branch information
nikandfor committed Jan 7, 2024
1 parent 87b254e commit 77e6738
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ This is all without timeouts, additional goroutines, allocations, and channels.

```
// General pattern is
// Queue.In -> Enter -> defer Exit -> Commit/Cancel/return/panic
// Queue.In -> Enter -> [if idx < 0 {return}] -> defer Exit -> Commit/Cancel/return/panic
var sum int
Expand Down Expand Up @@ -67,5 +67,5 @@ for j := 0; j < N; j++ {
```

Batch is error and panic proof which means the user code can return error or panic in any place,
but as soon as all workers left the batch its state is reset.
but as soon as all the workers left the batch its state is reset.
But not the external state, it's callers responsibility to keep it consistent.
29 changes: 21 additions & 8 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package batch_test

import (
"context"
"errors"
"flag"
"log"
"sync"

"nikand.dev/go/batch"
Expand All @@ -13,7 +15,7 @@ var jobs = flag.Int("jobs", 5, "parallel jobs")
type Service struct {
sum int // state we collect to commit together

bc *batch.Coordinator[int] // result value type, set to struct{} if don't need it
bc *batch.Coordinator[int] // [int] is the result value type, set to struct{} if don't need it
}

func NewService() *Service {
Expand All @@ -26,7 +28,9 @@ func NewService() *Service {

func (s *Service) commit(ctx context.Context) (int, error) {
// suppose some heavy operation here
// update the file or write to db
// update a file or write to db

log.Printf("* * * commit %2d * * *", s.sum)

return s.sum, nil
}
Expand All @@ -37,26 +41,33 @@ func (s *Service) DoWork(ctx context.Context, data int) (int, error) {
_ = data // prepare data

idx := s.bc.Enter(true) // true for blocking, false if we want to leave instead of waiting
// if idx < 0 // we haven't entered the batch in non blocking mode
defer s.bc.Exit() // it's like Mutex.Unlock. Must be called with defer to outlive panics
if idx < 0 { // we haven't entered the batch in non blocking mode
return 0, errors.New("not in this time") // we have to leave in that case
}

defer s.bc.Exit() // it's like Mutex.Unlock. It's a pair to successful Enter. Must be called with defer to outlive panics

if idx == 0 { // we are first in the batch, reset the state
s.sum = 0
log.Printf("* * * reset batch * * *")
}

// return // leave the batch if we changed our mind
log.Printf("worker %2d got in with index %2d", ctx.Value("worker"), idx)

// if notThisTime() { return } // safely leave the batch if we changed our mind. Keep the state (s.sum) unchanged.

s.sum += data // add our work to the batch

// s.bc.Cancel(ctx, err) // cancel the whole batch if we spoilt it
// return
// if spoiltState() { return s.bc.Cancel(ctx, err) } // cancel the whole batch if we spoilt it

// only one of Leave/Cancel/Commit must be called and only once
// only one of leave(return)/Cancel/Commit must be called and only once
res, err := s.bc.Commit(ctx, false) // true to force batch to commit now, false to wait for others
if err != nil { // batch failed, each worker in it will get the same error
return 0, err
}

log.Printf("worker %2d got result %v %v", ctx.Value("worker"), res, err)

// if we are here, all of the workers have their work committed

return res, nil
Expand All @@ -69,12 +80,14 @@ func ExampleCoordinator() {
var wg sync.WaitGroup

for j := 0; j < *jobs; j++ {
j := j
wg.Add(1)

go func() {
defer wg.Done()

ctx := context.Background() // passed to commit function
ctx = context.WithValue(ctx, "worker", j)

Check failure on line 90 in examples_test.go

View workflow job for this annotation

GitHub Actions / lint

SA1029: should not use built-in type string as key for value; define your own type to avoid collisions (staticcheck)

res, err := s.DoWork(ctx, 1)
_, _ = res, err
Expand Down

0 comments on commit 77e6738

Please sign in to comment.