Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
nikandfor committed Dec 16, 2023
0 parents commit 5eebc15
Show file tree
Hide file tree
Showing 11 changed files with 750 additions and 0 deletions.
33 changes: 33 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
version: 2
jobs:
go1.21: &base
docker:
- image: cimg/go:1.21
steps:
- run: go version
- checkout
- run: go test -race -v ./...

go1.20:
<<: *base
docker:
- image: cimg/go:1.20

go1.19:
<<: *base
docker:
- image: cimg/go:1.19

go1.18:
<<: *base
docker:
- image: cimg/go:1.18

workflows:
version: 2
build:
jobs:
- go1.21
- go1.20
- go1.19
- go1.18
48 changes: 48 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: Go

on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]

jobs:
test:
strategy:
matrix:
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
go-ver: ["1.21", "1.20", "1.19"]
include:
- os: "ubuntu-latest"
go-ver: "1.21"
cover: true

runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-ver }}

- name: Build
run: go build -v ./...

- name: Test with Cover
run: go test -v -coverprofile=coverage.txt -covermode=atomic ./...
if: ${{ matrix.cover }}

- name: Test without Cover
run: go test -v ./...
if: ${{ !matrix.cover }}

- name: Test Race
run: go test -race -v ./...

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
if: ${{ matrix.cover }}
24 changes: 24 additions & 0 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: golangci-lint
on:
push:
pull_request:
jobs:
golangci:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.55

# Optional: working directory, useful for monorepos
# working-directory: somedir

# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0

# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2019 Nikifor Seriakov

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
66 changes: 66 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
[![Documentation](https://pkg.go.dev/badge/nikand.dev/go/batch)](https://pkg.go.dev/nikand.dev/go/batch?tab=doc)
[![Go workflow](https://github.com/nikandfor/batch/actions/workflows/go.yml/badge.svg)](https://github.com/nikandfor/batch/actions/workflows/go.yml)
[![CircleCI](https://circleci.com/gh/nikandfor/batch.svg?style=svg)](https://circleci.com/gh/nikandfor/batch)
[![codecov](https://codecov.io/gh/nikandfor/batch/tags/latest/graph/badge.svg)](https://codecov.io/gh/nikandfor/batch)
[![Go Report Card](https://goreportcard.com/badge/nikand.dev/go/batch)](https://goreportcard.com/report/nikand.dev/go/batch)
![GitHub tag (latest SemVer)](https://img.shields.io/github/v/tag/nikandfor/batch?sort=semver)

# batch

`batch` is a library to make concurrent work batcheable and reliable.
Each worker either has its work committed or gets an error.

> Hope is not a strategy. ([from Google SRE book](https://sre.google/sre-book/introduction/))
No more batch operations that add its data to a batch and go away hoping it would be committed.

This is all without timeouts, additional goroutines, allocations, and channels.

## How it works

* Each worker adds its work to a shared batch.
* If there are no more workers ready to commit the last one runs commit, the others wait.
* Every worker in the batch gets the same result and error.

## Usage

```
var tx int
b := batch.New(func(ctx context.Context) (interface{}, error) {
// commit data in tx
return res, err
})
// Optional hooks
b.Prepare = func(ctx context.Context) error { tx = 0; return nil } // called in the beginning on a new batch
b.Rollback = func(ctx context.Context, err error) error { return err } // if any worker returned error
b.Panic = func(ctx context.Context, p interface{}) error { // any worker panicked
return batch.PanicError{Panic: p} // returned to other workes
// panicked worker gets the panic back
}
// only one of Panic, Rollback, and Commit is called (in respective priority order; panic wins, than error, commit is last)
for j := 0; j < N; j++ {
go func(j int) {
ctx := context.WithValue(ctx, workerID{}, j) // can be accessed in Commit and other hooks
res, err := b.Do(ctx, func(ctx context.Context) error {
tx += j // add work to the batch
return nil // commit
})
if err != nil { // works the same as we had independent commit in each goroutine
_ = err
}
// batching is transparent for worker
_ = res
}(j)
}
```

Batch is error and panic proof which means any callback (Do, Commit, and friends) may return error or panic,
but as soon as all workers left the batch its state is restored.
But not the external state, it's callers responsibility to keep it consistent.
137 changes: 137 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package batch

import (
"context"
"fmt"
"sync"
"sync/atomic"
)

type (
Batch struct {
queue atomic.Int32

Prepare func(ctx context.Context) error
Commit func(ctx context.Context) (interface{}, error)
Rollback func(ctx context.Context, err error) error
Panic func(ctx context.Context, p interface{}) error

Limit *Semaphore

sync.Mutex
sync.Cond

cnt int

res interface{}
err error
panic interface{}
}

PanicError struct {
Panic interface{}
}
)

func New(commit func(ctx context.Context) (interface{}, error)) *Batch {
b := &Batch{}

b.Init(commit)

return b
}

func (b *Batch) Init(commit func(ctx context.Context) (interface{}, error)) {
b.Cond.L = &b.Mutex
b.Commit = commit
}

func (b *Batch) Do(ctx context.Context, f func(ctx context.Context) error) (res interface{}, err error) {
defer b.Limit.Exit()
b.Limit.Enter()

b.queue.Add(1)

defer b.Unlock()
b.Lock()

// wait for all goroutines from the previous batch to exit
for b.cnt < 0 {
b.Cond.Wait()
}

var p, p2 interface{}

if b.cnt == 0 && b.Prepare != nil { // the first prepares the batch
p = b.catchPanic(func() {
b.err = b.Prepare(ctx)
})
}

// add state to the batch if no errors happened so far
if p == nil && b.err == nil {
p = b.catchPanic(func() {
b.err = f(ctx)
})
}

if p != nil && b.panic == nil { // any goroutine sets panic if it happened
b.panic = p
b.err = PanicError{Panic: p} // panic overwrites error
}

x := b.queue.Add(-1) // will only be 0 if we are the last exiting the batch
b.cnt++ // count entered

if x != 0 { // we are not the last exiting the batch, wait for others
b.Cond.Wait() // so wait for the last one to finish the job
} else {
b.cnt = -b.cnt // set committing mode, no new goroutines allowed to enter

p2 = b.catchPanic(func() {
switch {
case b.panic != nil:
if b.Panic != nil {
b.err = b.Panic(ctx, b.panic)
}
case b.err == nil:
b.res, b.err = b.Commit(ctx)
case b.Rollback != nil:
b.err = b.Rollback(ctx, b.err)
}
})
}

b.cnt++ // reset committing mode when everybody left
b.Cond.Broadcast()

res, err = b.res, b.err // return the same result to all the entered

if b.cnt == 0 { // the last turns the lights off
b.res, b.err, b.panic = nil, nil, nil
}

if p2 != nil {
panic(p2)
}

if p != nil {
panic(p)
}

return
}

func (b *Batch) catchPanic(f func()) (p interface{}) {
defer func() {
p = recover()
}()

f()

return
}

func (e PanicError) Error() string {
return fmt.Sprintf("panic: %v", e.Panic)
}
Loading

0 comments on commit 5eebc15

Please sign in to comment.