Skip to content

Commit

Permalink
change API
Browse files Browse the repository at this point in the history
  • Loading branch information
nikandfor committed Dec 17, 2023
1 parent 1f6ff3c commit 2ad13d6
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 62 deletions.
25 changes: 14 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ This is all without timeouts, additional goroutines, allocations, and channels.
```go
var tx int

b := batch.New(func(ctx context.Context) (interface{}, error) {
// commit tx
return res, err
})

// Optional hooks
b.Prepare = func(ctx context.Context) error { tx = 0; return nil } // called in the beginning on a new batch
b.Rollback = func(ctx context.Context, err error) error { return err } // if any worker returned error
b.Panic = func(ctx context.Context, p interface{}) error { // any worker panicked
return batch.PanicError{Panic: p} // returned to other workes
// panicked worker gets the panic back
b := batch.Batch{
// Required
Commit: func(ctx context.Context) (interface{}, error) {
// commit tx
return res, err
},

// Optional hooks
Prepare: func(ctx context.Context) error { tx = 0; return nil }, // called in the beginning on a new batch
Rollback: func(ctx context.Context, err error) error { return err }, // if any worker returned error
Panic: func(ctx context.Context, p interface{}) error { // any worker panicked
return batch.PanicError{Panic: p} // returned to other workes
// panicked worker gets the panic back
},
}

// only one of Panic, Rollback, and Commit is called (in respective priority order; panic wins, then error, commit is last)
Expand Down
43 changes: 17 additions & 26 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ import (

type (
Batch struct {
queue int32

Prepare func(ctx context.Context) error
Commit func(ctx context.Context) (interface{}, error)
Commit func(ctx context.Context) (interface{}, error) // required
Rollback func(ctx context.Context, err error) error
Panic func(ctx context.Context, p interface{}) error

Limit *Semaphore

sync.Mutex
sync.Cond
queue int32

mu sync.Mutex
cond sync.Cond

cnt int

Expand All @@ -33,31 +33,22 @@ type (
}
)

func New(commit func(ctx context.Context) (interface{}, error)) *Batch {
b := &Batch{}

b.Init(commit)

return b
}

func (b *Batch) Init(commit func(ctx context.Context) (interface{}, error)) {
b.Cond.L = &b.Mutex
b.Commit = commit
}

func (b *Batch) Do(ctx context.Context, f func(ctx context.Context) error) (res interface{}, err error) {
func (b *Batch) Do(ctx context.Context, f func(ctx context.Context) error) (interface{}, error) {
defer b.Limit.Exit()
b.Limit.Enter()

atomic.AddInt32(&b.queue, 1)

defer b.Unlock()
b.Lock()
defer b.mu.Unlock()
b.mu.Lock()

if b.cond.L == nil {
b.cond.L = &b.mu
}

// wait for all goroutines from the previous batch to exit
for b.cnt < 0 {
b.Cond.Wait()
b.cond.Wait()
}

var p, p2 interface{}
Expand All @@ -84,7 +75,7 @@ func (b *Batch) Do(ctx context.Context, f func(ctx context.Context) error) (res
b.cnt++ // count entered

if x != 0 { // we are not the last exiting the batch, wait for others
b.Cond.Wait() // so wait for the last one to finish the job
b.cond.Wait() // so wait for the last one to finish the job
} else {
b.cnt = -b.cnt // set committing mode, no new goroutines allowed to enter

Expand All @@ -103,9 +94,9 @@ func (b *Batch) Do(ctx context.Context, f func(ctx context.Context) error) (res
}

b.cnt++ // reset committing mode when everybody left
b.Cond.Broadcast()
b.cond.Broadcast()

res, err = b.res, b.err // return the same result to all the entered
res, err := b.res, b.err // return the same result to all the entered

if b.cnt == 0 { // the last turns the lights off
b.res, b.err, b.panic = nil, nil, nil
Expand All @@ -119,7 +110,7 @@ func (b *Batch) Do(ctx context.Context, f func(ctx context.Context) error) (res
panic(p)
}

return
return res, err
}

func (b *Batch) catchPanic(f func()) (p interface{}) {
Expand Down
47 changes: 23 additions & 24 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,27 @@ func TestBatch(tb *testing.T) {
var commits, rollbacks, panics int
var bucket string

b := batch.New(func(ctx context.Context) (interface{}, error) {
commits++

return bucket, nil
})

b.Prepare = func(ctx context.Context) error {
bucket = ""

return nil
}

b.Rollback = func(ctx context.Context, err error) error {
rollbacks++

return err
}

b.Panic = func(ctx context.Context, p interface{}) error {
panics++

return batch.PanicError{Panic: p}
b := &batch.Batch{
Commit: func(ctx context.Context) (interface{}, error) {
commits++

return bucket, nil
},
Prepare: func(ctx context.Context) error {
bucket = ""

return nil
},
Rollback: func(ctx context.Context, err error) error {
rollbacks++

return err
},
Panic: func(ctx context.Context, p interface{}) error {
panics++

return batch.PanicError{Panic: p}
},
}

var fail func() error
Expand Down Expand Up @@ -154,12 +153,12 @@ func BenchmarkBatch(tb *testing.B) {

var commits, sum int

b := batch.New(func(ctx context.Context) (interface{}, error) {
b := batch.Batch{Commit: func(ctx context.Context) (interface{}, error) {
commits++
sum = 0

return nil, nil
})
}}

ctx := context.Background()

Expand Down
24 changes: 23 additions & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type (
func New() *DB {
d := &DB{}

d.b.Init(d.commit)
d.b.Prepare = d.prepare
d.b.Commit = d.commit

return d
}
Expand Down Expand Up @@ -98,3 +98,25 @@ func (d *DB) SaveBatched(ctx context.Context, data int) error {
func (tx *Tx) Reset() {
tx.updates = tx.updates[:0]
}

func ExampleBatch() {
ctx := context.Background()
svc := New()

const M = 3

var wg sync.WaitGroup

wg.Add(M)

for j := 0; j < M; j++ {
go func() {
defer wg.Done()

err := svc.SaveBatched(ctx, 2)
_ = err
}()
}

wg.Wait()
}

0 comments on commit 2ad13d6

Please sign in to comment.