diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..d4c7e0f --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,33 @@ +version: 2 +jobs: + go1.21: &base + docker: + - image: cimg/go:1.21 + steps: + - run: go version + - checkout + - run: go test -race -v ./... + + go1.20: + <<: *base + docker: + - image: cimg/go:1.20 + + go1.19: + <<: *base + docker: + - image: cimg/go:1.19 + + go1.18: + <<: *base + docker: + - image: cimg/go:1.18 + +workflows: + version: 2 + build: + jobs: + - go1.21 + - go1.20 + - go1.19 + - go1.18 diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..ee7fa68 --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,48 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Go + +on: + push: + branches: [ "master" ] + pull_request: + branches: [ "master" ] + +jobs: + test: + strategy: + matrix: + os: ["ubuntu-latest", "macos-latest", "windows-latest"] + go-ver: ["1.21", "1.20", "1.19"] + include: + - os: "ubuntu-latest" + go-ver: "1.21" + cover: true + + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: ${{ matrix.go-ver }} + + - name: Build + run: go build -v ./... + + - name: Test with Cover + run: go test -v -coverprofile=coverage.txt -covermode=atomic ./... + if: ${{ matrix.cover }} + + - name: Test without Cover + run: go test -v ./... + if: ${{ !matrix.cover }} + + - name: Test Race + run: go test -race -v ./... + + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v3 + if: ${{ matrix.cover }} diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml new file mode 100644 index 0000000..cd2389d --- /dev/null +++ b/.github/workflows/golangci-lint.yml @@ -0,0 +1,24 @@ +name: golangci-lint +on: + push: + pull_request: +jobs: + golangci: + name: lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: golangci-lint + uses: golangci/golangci-lint-action@v2 + with: + # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. + version: v1.55 + + # Optional: working directory, useful for monorepos + # working-directory: somedir + + # Optional: golangci-lint command line arguments. + # args: --issues-exit-code=0 + + # Optional: show only new issues if it's a pull request. The default value is `false`. + # only-new-issues: true diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..29411cd --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 Nikifor Seriakov + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..7f37679 --- /dev/null +++ b/README.md @@ -0,0 +1,66 @@ +[![Documentation](https://pkg.go.dev/badge/nikand.dev/go/batch)](https://pkg.go.dev/nikand.dev/go/batch?tab=doc) +[![Go workflow](https://github.com/nikandfor/batch/actions/workflows/go.yml/badge.svg)](https://github.com/nikandfor/batch/actions/workflows/go.yml) +[![CircleCI](https://circleci.com/gh/nikandfor/batch.svg?style=svg)](https://circleci.com/gh/nikandfor/batch) +[![codecov](https://codecov.io/gh/nikandfor/batch/tags/latest/graph/badge.svg)](https://codecov.io/gh/nikandfor/batch) +[![Go Report Card](https://goreportcard.com/badge/nikand.dev/go/batch)](https://goreportcard.com/report/nikand.dev/go/batch) +![GitHub tag (latest SemVer)](https://img.shields.io/github/v/tag/nikandfor/batch?sort=semver) + +# batch + +`batch` is a library to make concurrent work batcheable and reliable. +Each worker either has its work committed or gets an error. + +> Hope is not a strategy. ([from Google SRE book](https://sre.google/sre-book/introduction/)) + +No more batch operations that add its data to a batch and go away hoping it would be committed. + +This is all without timeouts, additional goroutines, allocations, and channels. + +## How it works + +* Each worker adds its work to a shared batch. +* If there are no more workers ready to commit the last one runs commit, the others wait. +* Every worker in the batch gets the same result and error. + +## Usage + +```go +var tx int + +b := batch.New(func(ctx context.Context) (interface{}, error) { + // commit tx + return res, err +}) + +// Optional hooks +b.Prepare = func(ctx context.Context) error { tx = 0; return nil } // called in the beginning on a new batch +b.Rollback = func(ctx context.Context, err error) error { return err } // if any worker returned error +b.Panic = func(ctx context.Context, p interface{}) error { // any worker panicked + return batch.PanicError{Panic: p} // returned to other workes + // panicked worker gets the panic back +} + +// only one of Panic, Rollback, and Commit is called (in respective priority order; panic wins, then error, commit is last) + +for j := 0; j < N; j++ { + go func(j int) { + ctx := context.WithValue(ctx, workerID{}, j) // can be accessed in Commit and other hooks + + res, err := b.Do(ctx, func(ctx context.Context) error { + tx += j // add work to the batch + + return nil // commit + }) + if err != nil { // works the same as we had independent commit in each goroutine + _ = err + } + + // batching is transparent for worker + _ = res + }(j) +} +``` + +Batch is error and panic proof which means any callback (Do, Commit, and friends) may return error or panic, +but as soon as all workers left the batch its state is restored. +But not the external state, it's callers responsibility to keep it consistent. diff --git a/batch.go b/batch.go new file mode 100644 index 0000000..db48b42 --- /dev/null +++ b/batch.go @@ -0,0 +1,137 @@ +package batch + +import ( + "context" + "fmt" + "sync" + "sync/atomic" +) + +type ( + Batch struct { + queue atomic.Int32 + + Prepare func(ctx context.Context) error + Commit func(ctx context.Context) (interface{}, error) + Rollback func(ctx context.Context, err error) error + Panic func(ctx context.Context, p interface{}) error + + Limit *Semaphore + + sync.Mutex + sync.Cond + + cnt int + + res interface{} + err error + panic interface{} + } + + PanicError struct { + Panic interface{} + } +) + +func New(commit func(ctx context.Context) (interface{}, error)) *Batch { + b := &Batch{} + + b.Init(commit) + + return b +} + +func (b *Batch) Init(commit func(ctx context.Context) (interface{}, error)) { + b.Cond.L = &b.Mutex + b.Commit = commit +} + +func (b *Batch) Do(ctx context.Context, f func(ctx context.Context) error) (res interface{}, err error) { + defer b.Limit.Exit() + b.Limit.Enter() + + b.queue.Add(1) + + defer b.Unlock() + b.Lock() + + // wait for all goroutines from the previous batch to exit + for b.cnt < 0 { + b.Cond.Wait() + } + + var p, p2 interface{} + + if b.cnt == 0 && b.Prepare != nil { // the first prepares the batch + p = b.catchPanic(func() { + b.err = b.Prepare(ctx) + }) + } + + // add state to the batch if no errors happened so far + if p == nil && b.err == nil { + p = b.catchPanic(func() { + b.err = f(ctx) + }) + } + + if p != nil && b.panic == nil { // any goroutine sets panic if it happened + b.panic = p + b.err = PanicError{Panic: p} // panic overwrites error + } + + x := b.queue.Add(-1) // will only be 0 if we are the last exiting the batch + b.cnt++ // count entered + + if x != 0 { // we are not the last exiting the batch, wait for others + b.Cond.Wait() // so wait for the last one to finish the job + } else { + b.cnt = -b.cnt // set committing mode, no new goroutines allowed to enter + + p2 = b.catchPanic(func() { + switch { + case b.panic != nil: + if b.Panic != nil { + b.err = b.Panic(ctx, b.panic) + } + case b.err == nil: + b.res, b.err = b.Commit(ctx) + case b.Rollback != nil: + b.err = b.Rollback(ctx, b.err) + } + }) + } + + b.cnt++ // reset committing mode when everybody left + b.Cond.Broadcast() + + res, err = b.res, b.err // return the same result to all the entered + + if b.cnt == 0 { // the last turns the lights off + b.res, b.err, b.panic = nil, nil, nil + } + + if p2 != nil { + panic(p2) + } + + if p != nil { + panic(p) + } + + return +} + +func (b *Batch) catchPanic(f func()) (p interface{}) { + defer func() { + p = recover() + }() + + f() + + return +} + +func (e PanicError) Error() string { + return fmt.Sprintf("panic: %v", e.Panic) +} diff --git a/batch_test.go b/batch_test.go new file mode 100644 index 0000000..b136232 --- /dev/null +++ b/batch_test.go @@ -0,0 +1,174 @@ +package batch_test + +import ( + "context" + "errors" + "flag" + "fmt" + "sync" + "testing" + + "nikand.dev/go/batch" +) + +var jobs = flag.Int("jobs", 10, "parallel workers") + +func TestBatch(tb *testing.T) { + ctx := context.Background() + + var commits, rollbacks, panics int + var bucket string + + b := batch.New(func(ctx context.Context) (interface{}, error) { + commits++ + + return bucket, nil + }) + + b.Prepare = func(ctx context.Context) error { + bucket = "" + + return nil + } + + b.Rollback = func(ctx context.Context, err error) error { + rollbacks++ + + return err + } + + b.Panic = func(ctx context.Context, p interface{}) error { + panics++ + + return batch.PanicError{Panic: p} + } + + var fail func() error + var expErr error + var expPanic, prepPanic, panicPanic interface{} + + body := func(tb *testing.T) { + commits, rollbacks, panics = 0, 0, 0 + bucket = "" + + var wg sync.WaitGroup + wg.Add(*jobs) + + for j := 0; j < *jobs; j++ { + j := j + + go func() { + defer wg.Done() + defer func() { + p := recover() + + if p != nil { + tb.Logf("worker %2x got panic %v", j, p) + } + + if j == 1 && p != expPanic && p != prepPanic && p != panicPanic { + tb.Errorf("worker %2x panicked %v, expected %v", j, p, expPanic) + } + }() + + //ctx := context.WithValue(ctx, "j", j) + + res, err := b.Do(ctx, func(ctx context.Context) error { + if j == 1 && fail != nil { + return fail() + } + + bucket += fmt.Sprintf(" %x", j) + + return nil + }) + + if err != nil { + tb.Logf("worker %2x got common error %v", j, err) + } else { + tb.Logf("worker %2x got common result %v", j, res) + } + + switch { + case expPanic != nil && err == batch.PanicError{Panic: expPanic}: + case prepPanic != nil && err == batch.PanicError{Panic: prepPanic}: + case panicPanic != nil && err == batch.PanicError{Panic: panicPanic}: + case j == 1 && err != expErr: + tb.Errorf("got error %v, expected %v", err, expErr) + } + }() + } + + wg.Wait() + + tb.Logf("commits %v rollbacks %v panics %v", commits, rollbacks, panics) + } + + tb.Run("ok", body) + + expErr = errors.New("test fail") + + fail = func() error { + return expErr + } + + tb.Run("fail", body) + + expPanic = "test panic" + expErr = nil + + fail = func() error { + panic(expPanic) + } + + tb.Run("panic", body) + + expPanic = nil + prepPanic = "prepare panic" + + b.Prepare = func(ctx context.Context) error { + panic(prepPanic) + } + + tb.Run("PreparePanic", body) + + expPanic = "before panic panic" + panicPanic = "panic panic" + + b.Prepare = nil + b.Panic = func(ctx context.Context, p interface{}) error { + panics++ + panic(panicPanic) + } + + tb.Run("PanicPanic", body) + + fail = nil + expPanic = nil + + tb.Run("okAfter", body) +} + +func BenchmarkBatch(tb *testing.B) { + tb.ReportAllocs() + + var commits, sum int + + b := batch.New(func(ctx context.Context) (interface{}, error) { + commits++ + sum = 0 + + return nil, nil + }) + + ctx := context.Background() + + tb.RunParallel(func(tb *testing.PB) { + for tb.Next() { + _, _ = b.Do(ctx, func(context.Context) error { + sum++ + return nil + }) + } + }) +} diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..fe24b02 --- /dev/null +++ b/example_test.go @@ -0,0 +1,100 @@ +package batch_test + +import ( + "context" + "fmt" + "sync" + + "nikand.dev/go/batch" +) + +type ( + DB struct { + b batch.Batch + tx *Tx + + mu sync.Mutex // for unbatched operations + } + + Tx struct { + updates []int + } +) + +func New() *DB { + d := &DB{} + + d.b.Init(d.commit) + d.b.Prepare = d.prepare + + return d +} + +func (d *DB) prepare(ctx context.Context) error { + if d.tx == nil { + d.tx = &Tx{} + } + + d.tx.Reset() + + return nil +} + +func (d *DB) commit(ctx context.Context) (interface{}, error) { + // commit changes + response := fmt.Sprintf("%v", d.tx.updates) + + return response, nil // each goroutine in the batch get this response and error +} + +func (d *DB) SaveUnbatchedParallel(ctx context.Context, data int) error { + tx := &Tx{} // new tx/connection + + tx.updates = append(tx.updates, data) // add one value + + // commit: make heavy work for each portion of data + // discard allocated resources + + return nil +} + +func (d *DB) SaveUnbatchedSyncronized(ctx context.Context, data int) error { + defer d.mu.Unlock() + d.mu.Lock() + + err := d.prepare(ctx) + if err != nil { + return err + } + + d.tx.updates = append(d.tx.updates, data) // add one value + + res, err := d.commit(ctx) // commit: make heavy work for each portion of data + if err != nil { + return err + } + + _ = res + + return nil +} + +func (d *DB) SaveBatched(ctx context.Context, data int) error { + // the same result, but all goroutines committed their data in a single shared batch + + response, err := d.b.Do(ctx, func(ctx context.Context) error { + // access to common resources is syncronized + d.tx.updates = append(d.tx.updates, data) + + return nil + }) + // each goroutine only returns after commit (or rollback) is finished + + _ = response // only one commit is done: the same result is shared + + return err // shared error +} + +func (tx *Tx) Reset() { + tx.updates = tx.updates[:0] +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ba99bfc --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module nikand.dev/go/batch + +go 1.15 diff --git a/semaphore.go b/semaphore.go new file mode 100644 index 0000000..31f2395 --- /dev/null +++ b/semaphore.go @@ -0,0 +1,86 @@ +package batch + +import "sync" + +type ( + Semaphore struct { + mu sync.Mutex + cond sync.Cond + + n, lim int + } +) + +func NewSemaphore(n int) *Semaphore { + b := &Semaphore{} + + b.Reset(n) + + return b +} + +func (b *Semaphore) Reset(n int) { + if b == nil { + return + } + + defer b.mu.Unlock() + b.mu.Lock() + + if b.cond.L == nil { + b.cond.L = &b.mu + } + + b.lim = n +} + +func (b *Semaphore) Enter() int { + if b == nil { + return 0 + } + + defer b.mu.Unlock() + b.mu.Lock() + + for b.n >= b.lim { + b.cond.Wait() + } + + b.n++ + + return b.n +} + +func (b *Semaphore) Exit() { + if b == nil { + return + } + + defer b.mu.Unlock() + b.mu.Lock() + + b.n-- + b.cond.Signal() +} + +func (b *Semaphore) Len() int { + if b == nil { + return 0 + } + + defer b.mu.Unlock() + b.mu.Lock() + + return b.n +} + +func (b *Semaphore) Cap() int { + if b == nil { + return 0 + } + + defer b.mu.Unlock() + b.mu.Lock() + + return b.lim +} diff --git a/semaphore_test.go b/semaphore_test.go new file mode 100644 index 0000000..000f6fa --- /dev/null +++ b/semaphore_test.go @@ -0,0 +1,58 @@ +package batch_test + +import ( + "sync" + "testing" + + "nikand.dev/go/batch" +) + +func TestSemaphore(tb *testing.T) { + s := batch.NewSemaphore(4) + + var wg sync.WaitGroup + wg.Add(*jobs) + + for j := 0; j < *jobs; j++ { + go func() { + defer wg.Done() + + defer s.Exit() + n := s.Enter() + + tb.Logf("%d routines in a zone", n) + }() + } + + wg.Add(1) + + go func() { + defer wg.Done() + + s.Reset(2) + }() + + wg.Add(1) + + go func() { + defer wg.Done() + + s.Reset(1) + }() + + tb.Logf("%d of %d in zone", s.Len(), s.Cap()) + + wg.Wait() +} + +func TestSemaphoreNil(tb *testing.T) { + var s *batch.Semaphore + + s.Reset(2) + + s.Enter() + s.Exit() + + s.Len() + s.Cap() +}