-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit d1b7fe2
Showing
11 changed files
with
750 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
version: 2 | ||
jobs: | ||
go1.21: &base | ||
docker: | ||
- image: cimg/go:1.21 | ||
steps: | ||
- run: go version | ||
- checkout | ||
- run: go test -race -v ./... | ||
|
||
go1.20: | ||
<<: *base | ||
docker: | ||
- image: cimg/go:1.20 | ||
|
||
go1.19: | ||
<<: *base | ||
docker: | ||
- image: cimg/go:1.19 | ||
|
||
go1.18: | ||
<<: *base | ||
docker: | ||
- image: cimg/go:1.18 | ||
|
||
workflows: | ||
version: 2 | ||
build: | ||
jobs: | ||
- go1.21 | ||
- go1.20 | ||
- go1.19 | ||
- go1.18 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# This workflow will build a golang project | ||
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go | ||
|
||
name: Go | ||
|
||
on: | ||
push: | ||
branches: [ "master" ] | ||
pull_request: | ||
branches: [ "master" ] | ||
|
||
jobs: | ||
test: | ||
strategy: | ||
matrix: | ||
os: ["ubuntu-latest", "macos-latest", "windows-latest"] | ||
go-ver: ["1.21", "1.20", "1.19"] | ||
include: | ||
- os: "ubuntu-latest" | ||
go-ver: "1.21" | ||
cover: true | ||
|
||
runs-on: ${{ matrix.os }} | ||
steps: | ||
- uses: actions/checkout@v3 | ||
|
||
- name: Set up Go | ||
uses: actions/setup-go@v4 | ||
with: | ||
go-version: ${{ matrix.go-ver }} | ||
|
||
- name: Build | ||
run: go build -v ./... | ||
|
||
- name: Test with Cover | ||
run: go test -v -coverprofile=coverage.txt -covermode=atomic ./... | ||
if: ${{ matrix.cover }} | ||
|
||
- name: Test without Cover | ||
run: go test -v ./... | ||
if: ${{ !matrix.cover }} | ||
|
||
- name: Test Race | ||
run: go test -race -v ./... | ||
|
||
- name: Upload coverage reports to Codecov | ||
uses: codecov/codecov-action@v3 | ||
if: ${{ matrix.cover }} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
name: golangci-lint | ||
on: | ||
push: | ||
pull_request: | ||
jobs: | ||
golangci: | ||
name: lint | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v2 | ||
- name: golangci-lint | ||
uses: golangci/golangci-lint-action@v2 | ||
with: | ||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. | ||
version: v1.55 | ||
|
||
# Optional: working directory, useful for monorepos | ||
# working-directory: somedir | ||
|
||
# Optional: golangci-lint command line arguments. | ||
# args: --issues-exit-code=0 | ||
|
||
# Optional: show only new issues if it's a pull request. The default value is `false`. | ||
# only-new-issues: true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
MIT License | ||
|
||
Copyright (c) 2019 Nikifor Seriakov | ||
|
||
Permission is hereby granted, free of charge, to any person obtaining a copy | ||
of this software and associated documentation files (the "Software"), to deal | ||
in the Software without restriction, including without limitation the rights | ||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
copies of the Software, and to permit persons to whom the Software is | ||
furnished to do so, subject to the following conditions: | ||
|
||
The above copyright notice and this permission notice shall be included in all | ||
copies or substantial portions of the Software. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
[![Documentation](https://pkg.go.dev/badge/nikand.dev/go/batch)](https://pkg.go.dev/nikand.dev/go/batch?tab=doc) | ||
[![Go workflow](https://github.com/nikandfor/batch/actions/workflows/go.yml/badge.svg)](https://github.com/nikandfor/batch/actions/workflows/go.yml) | ||
[![CircleCI](https://circleci.com/gh/nikandfor/batch.svg?style=svg)](https://circleci.com/gh/nikandfor/batch) | ||
[![codecov](https://codecov.io/gh/nikandfor/batch/tags/latest/graph/badge.svg)](https://codecov.io/gh/nikandfor/batch) | ||
[![Go Report Card](https://goreportcard.com/badge/nikand.dev/go/batch)](https://goreportcard.com/report/nikand.dev/go/batch) | ||
![GitHub tag (latest SemVer)](https://img.shields.io/github/v/tag/nikandfor/batch?sort=semver) | ||
|
||
# batch | ||
|
||
`batch` is a library to make concurrent work batcheable and reliable. | ||
Each worker either has its work committed or gets an error. | ||
|
||
> Hope is not a strategy. ([from Google SRE book](https://sre.google/sre-book/introduction/)) | ||
No more batch operations that add its data to a batch and go away hoping it would be committed. | ||
|
||
This is all without timeouts, additional goroutines, allocations, and channels. | ||
|
||
## How it works | ||
|
||
* Each worker adds its work to a shared batch. | ||
* If there are no more workers ready to commit the last one runs commit, the others wait. | ||
* Every worker in the batch gets the same result and error. | ||
|
||
## Usage | ||
|
||
```go | ||
var tx int | ||
|
||
b := batch.New(func(ctx context.Context) (interface{}, error) { | ||
// commit tx | ||
return res, err | ||
}) | ||
|
||
// Optional hooks | ||
b.Prepare = func(ctx context.Context) error { tx = 0; return nil } // called in the beginning on a new batch | ||
b.Rollback = func(ctx context.Context, err error) error { return err } // if any worker returned error | ||
b.Panic = func(ctx context.Context, p interface{}) error { // any worker panicked | ||
return batch.PanicError{Panic: p} // returned to other workes | ||
// panicked worker gets the panic back | ||
} | ||
|
||
// only one of Panic, Rollback, and Commit is called (in respective priority order; panic wins, then error, commit is last) | ||
|
||
for j := 0; j < N; j++ { | ||
go func(j int) { | ||
ctx := context.WithValue(ctx, workerID{}, j) // can be accessed in Commit and other hooks | ||
|
||
res, err := b.Do(ctx, func(ctx context.Context) error { | ||
tx += j // add work to the batch | ||
|
||
return nil // commit | ||
}) | ||
if err != nil { // works the same as we had independent commit in each goroutine | ||
_ = err | ||
} | ||
|
||
// batching is transparent for worker | ||
_ = res | ||
}(j) | ||
} | ||
``` | ||
|
||
Batch is error and panic proof which means any callback (Do, Commit, and friends) may return error or panic, | ||
but as soon as all workers left the batch its state is restored. | ||
But not the external state, it's callers responsibility to keep it consistent. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package batch | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"sync/atomic" | ||
) | ||
|
||
type ( | ||
Batch struct { | ||
queue atomic.Int32 | ||
|
||
Prepare func(ctx context.Context) error | ||
Commit func(ctx context.Context) (interface{}, error) | ||
Rollback func(ctx context.Context, err error) error | ||
Panic func(ctx context.Context, p interface{}) error | ||
|
||
Limit *Semaphore | ||
|
||
sync.Mutex | ||
sync.Cond | ||
|
||
cnt int | ||
|
||
res interface{} | ||
err error | ||
panic interface{} | ||
} | ||
|
||
PanicError struct { | ||
Panic interface{} | ||
} | ||
) | ||
|
||
func New(commit func(ctx context.Context) (interface{}, error)) *Batch { | ||
b := &Batch{} | ||
|
||
b.Init(commit) | ||
|
||
return b | ||
} | ||
|
||
func (b *Batch) Init(commit func(ctx context.Context) (interface{}, error)) { | ||
b.Cond.L = &b.Mutex | ||
b.Commit = commit | ||
} | ||
|
||
func (b *Batch) Do(ctx context.Context, f func(ctx context.Context) error) (res interface{}, err error) { | ||
defer b.Limit.Exit() | ||
b.Limit.Enter() | ||
|
||
b.queue.Add(1) | ||
|
||
defer b.Unlock() | ||
b.Lock() | ||
|
||
// wait for all goroutines from the previous batch to exit | ||
for b.cnt < 0 { | ||
b.Cond.Wait() | ||
} | ||
|
||
var p, p2 interface{} | ||
|
||
if b.cnt == 0 && b.Prepare != nil { // the first prepares the batch | ||
p = b.catchPanic(func() { | ||
b.err = b.Prepare(ctx) | ||
}) | ||
} | ||
|
||
// add state to the batch if no errors happened so far | ||
if p == nil && b.err == nil { | ||
p = b.catchPanic(func() { | ||
b.err = f(ctx) | ||
}) | ||
} | ||
|
||
if p != nil && b.panic == nil { // any goroutine sets panic if it happened | ||
b.panic = p | ||
b.err = PanicError{Panic: p} // panic overwrites error | ||
} | ||
|
||
x := b.queue.Add(-1) // will only be 0 if we are the last exiting the batch | ||
b.cnt++ // count entered | ||
|
||
if x != 0 { // we are not the last exiting the batch, wait for others | ||
b.Cond.Wait() // so wait for the last one to finish the job | ||
} else { | ||
b.cnt = -b.cnt // set committing mode, no new goroutines allowed to enter | ||
|
||
p2 = b.catchPanic(func() { | ||
switch { | ||
case b.panic != nil: | ||
if b.Panic != nil { | ||
b.err = b.Panic(ctx, b.panic) | ||
} | ||
case b.err == nil: | ||
b.res, b.err = b.Commit(ctx) | ||
case b.Rollback != nil: | ||
b.err = b.Rollback(ctx, b.err) | ||
} | ||
}) | ||
} | ||
|
||
b.cnt++ // reset committing mode when everybody left | ||
b.Cond.Broadcast() | ||
|
||
res, err = b.res, b.err // return the same result to all the entered | ||
|
||
if b.cnt == 0 { // the last turns the lights off | ||
b.res, b.err, b.panic = nil, nil, nil | ||
} | ||
|
||
if p2 != nil { | ||
panic(p2) | ||
} | ||
|
||
if p != nil { | ||
panic(p) | ||
} | ||
|
||
return | ||
} | ||
|
||
func (b *Batch) catchPanic(f func()) (p interface{}) { | ||
defer func() { | ||
p = recover() | ||
}() | ||
|
||
f() | ||
|
||
return | ||
} | ||
|
||
func (e PanicError) Error() string { | ||
return fmt.Sprintf("panic: %v", e.Panic) | ||
} |
Oops, something went wrong.