Skip to content

Commit

Permalink
Optional concurrent hash map for error tracking 🔥
Browse files Browse the repository at this point in the history
  • Loading branch information
bharat-rajani committed Jun 12, 2021
1 parent 8bfefd2 commit a4a2620
Show file tree
Hide file tree
Showing 6 changed files with 730 additions and 9 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@

# Dependency directories (remove the comment below to include it)
# vendor/

.idea/
120 changes: 118 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,118 @@
# run
Goroutines lifecycle manager
# rungroup
[![GoDoc](https://pkg.go.dev/badge/github.com/bharat-rajani/rungroup)](https://godoc.org/github.com/bharat-rajani/rungroup)
[![GoVersion](https://img.shields.io/github/go-mod/go-version/bharat-rajani/rungroup)](https://github.com/bharat-rajani/rungroup/blob/main/go.mod)
[![Go Report Card](https://img.shields.io/badge/go%20report-A+-brightgreen.svg)](https://goreportcard.com/report/github.com/bharat-rajani/rungroup)
[![MIT licensed](https://img.shields.io/github/license/bharat-rajani/rungroup)](https://github.com/bharat-rajani/rungroup/blob/main/LICENSE)

### Goroutines lifecycle manager

Rungroup was created to manage multiple goroutines which may or may not interrupt other goroutines on error.

Rationale:
Whilst hacking with golang, some or the other day you will encounter a situation where you have to manage multiple goroutines which can interrupt other goroutines if an error occurs.

A rungroup is essentially a composition of:
- goroutines
- optional concurrent(thread safe) ErrorMap to track errors from different goroutines
- context cancel func, when cancelled can stop other goroutines

A goroutine in rungroup is essentially composition of:
- a user(programmer) defined function which returns error
- an identifier (string) which may help you to track goroutine error.

> Rungroup is inspired by errorgroup.
### Installation:
```shell
go get -u github.com/bharat-rajani/rungroup
```

### Example:

#### A quick and simple example, where we need to call 3 REST Endpoints concurrently.

Three gorutines:
- F_API, interrupter
- S_API
- T_API, interrupter

Let's say we don't care about the response from second REST API Endpoint, hence that routine cannot interrupt other routines (F_API, T_API).
Now as soon as there is an error in F_API (or in T_API) then all other goroutines will be stopped.

```go
package main

import (
"fmt"
"context"
"net/http"
"github.com/bharat-rajani/rungroup"
"github.com/bharat-rajani/rungroup/pkg/concurrent"
)

func main() {
g, ctx := rungroup.WithContextErrorMap(context.Background(),concurrent.NewRWMutexMap())
// g, ctx := rungroup.WithContextErrorMap(context.Background(),new(sync)) //refer Benchmarks for performance difference

// error placeholder
var tErr error
var fResp, sResp, tResp *http.Response
g.GoWithFunc(func(ctx context.Context) error {

fResp, tErr = http.Get("F_API_URL")
if tErr != nil {
return tErr
}
return nil
}, ctx, true, "F_API")

g.GoWithFunc(func(ctx context.Context) error {

sResp, tErr = http.Get("S_API_URL")
if tErr != nil {
return tErr
}
return nil

}, ctx, false, "S_API")

g.GoWithFunc(func(ctx context.Context) error {
tResp, tErr = http.Get("T_API_URL")
if tErr != nil {
return tErr
}
return nil
}, ctx, true, "T_API")

// returns first error from interrupter routine
err := g.Wait()
if err != nil {
fmt.Println(err)
}
}
```


#### What if error occurs in "S_API" routine ? How can I retrieve its error?

Since "S_API" is a non interrupter goroutine hence the only way to track its error is by:

```golang
err, ok := g.GetErrorByID("S_API")
if ok && err!=nil{
fmt.Println(err)
}
```

#### I don't want to concurrently Read or Write errors.

Ok, I heard you, using concurrent maps comes with performance tradeoff.
If you don't want to track errors of all gorutines and you are happy with first occurring error, then just use rungroup WithContext:

```golang
g, ctx := rungroup.WithContext(context.Background(),concurrent.NewRWMutexMap())
...
...
```

> Note: When you use rungroup.WithContext (no error tracking) then calling g.GetErrorByID() will yield you a nice uninitialized map error and ok = false.
104 changes: 104 additions & 0 deletions pkg/concurrent/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package concurrent

import "sync"

// ConcurrentMap is an interface representing the functionality of hashmap
// which can be used for concurrent reads and writes.
//
// A ConcurrentMap must be safe for concurrent use by multiple
// goroutines.
type ConcurrentMap interface {
Load(interface{}) (interface{}, bool)
Store(key, value interface{})
LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
LoadAndDelete(key interface{}) (value interface{}, loaded bool)
Delete(interface{})
Range(func(key, value interface{}) (shouldContinue bool))
}

// RWMutexMap is like a Go map[interface{}]interface{} but is safe for concurrent use
// by multiple goroutines .
// Loads, stores, and deletes run in amortized constant time.
//
// It is essentially a Go map paired with a separate RWMutex.
//
// You may look out for sync.Map from go library which provides better performance
// under certain cases like "less write many reads".
type RWMutexMap struct {
mu sync.RWMutex
internal map[interface{}]interface{}
}

func NewRWMutexMap() *RWMutexMap {
return &RWMutexMap{
mu: sync.RWMutex{},
internal: make(map[interface{}]interface{}),
}
}

func (m *RWMutexMap) Load(key interface{}) (value interface{}, ok bool) {
m.mu.RLock()
value, ok = m.internal[key]
m.mu.RUnlock()
return
}

func (m *RWMutexMap) Store(key, value interface{}) {
m.mu.Lock()
if m.internal == nil {
m.internal = make(map[interface{}]interface{})
}
m.internal[key] = value
m.mu.Unlock()
}

func (m *RWMutexMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
m.mu.Lock()
actual, loaded = m.internal[key]
if !loaded {
actual = value
if m.internal == nil {
m.internal = make(map[interface{}]interface{})
}
m.internal[key] = value
}
m.mu.Unlock()
return actual, loaded
}

func (m *RWMutexMap) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
m.mu.Lock()
value, loaded = m.internal[key]
if !loaded {
m.mu.Unlock()
return nil, false
}
delete(m.internal, key)
m.mu.Unlock()
return value, loaded
}

func (m *RWMutexMap) Delete(key interface{}) {
m.mu.Lock()
delete(m.internal, key)
m.mu.Unlock()
}

func (m *RWMutexMap) Range(f func(key, value interface{}) (shouldContinue bool)) {
m.mu.RLock()
keys := make([]interface{}, 0, len(m.internal))
for k := range m.internal {
keys = append(keys, k)
}
m.mu.RUnlock()

for _, k := range keys {
v, ok := m.Load(k)
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
161 changes: 161 additions & 0 deletions pkg/concurrent/map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package concurrent

import (
"math/rand"
"reflect"
"runtime"
"sync"
"testing"
"testing/quick"
)

type mapOp string

const (
opLoad = mapOp("Load")
opStore = mapOp("Store")
opLoadOrStore = mapOp("LoadOrStore")
opLoadAndDelete = mapOp("LoadAndDelete")
opDelete = mapOp("Delete")
)

var mapOps = [...]mapOp{opLoad, opStore, opLoadOrStore, opLoadAndDelete, opDelete}

// mapCall is a quick.Generator for calls on mapInterface.
type mapCall struct {
op mapOp
k, v interface{}
}

func (c mapCall) apply(m ConcurrentMap) (interface{}, bool) {
switch c.op {
case opLoad:
return m.Load(c.k)
case opStore:
m.Store(c.k, c.v)
return nil, false
case opLoadOrStore:
return m.LoadOrStore(c.k, c.v)
case opLoadAndDelete:
return m.LoadAndDelete(c.k)
case opDelete:
m.Delete(c.k)
return nil, false
default:
panic("invalid mapOp")
}
}

type mapResult struct {
value interface{}
ok bool
}

func randValue(r *rand.Rand) interface{} {
b := make([]byte, r.Intn(4))
for i := range b {
b[i] = 'a' + byte(rand.Intn(26))
}
return string(b)
}

func (mapCall) Generate(r *rand.Rand, size int) reflect.Value {
c := mapCall{op: mapOps[rand.Intn(len(mapOps))], k: randValue(r)}
switch c.op {
case opStore, opLoadOrStore:
c.v = randValue(r)
}
return reflect.ValueOf(c)
}

func applyCalls(m ConcurrentMap, calls []mapCall) (results []mapResult, final map[interface{}]interface{}) {
for _, c := range calls {
v, ok := c.apply(m)
results = append(results, mapResult{v, ok})
}

final = make(map[interface{}]interface{})
m.Range(func(k, v interface{}) bool {
final[k] = v
return true
})

return results, final
}

func applyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) {
return applyCalls(new(sync.Map), calls)
}

func applyRWMutexMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) {
return applyCalls(new(RWMutexMap), calls)
}


func TestMapMatchesRWMutex(t *testing.T) {
if err := quick.CheckEqual(applyMap, applyRWMutexMap, nil); err != nil {
t.Error(err)
}
}


func TestConcurrentRange(t *testing.T) {
const mapSize = 1 << 10

m := new(sync.Map)
for n := int64(1); n <= mapSize; n++ {
m.Store(n, int64(n))
}

done := make(chan struct{})
var wg sync.WaitGroup
defer func() {
close(done)
wg.Wait()
}()
for g := int64(runtime.GOMAXPROCS(0)); g > 0; g-- {
r := rand.New(rand.NewSource(g))
wg.Add(1)
go func(g int64) {
defer wg.Done()
for i := int64(0); ; i++ {
select {
case <-done:
return
default:
}
for n := int64(1); n < mapSize; n++ {
if r.Int63n(mapSize) == 0 {
m.Store(n, n*i*g)
} else {
m.Load(n)
}
}
}
}(g)
}

iters := 1 << 10
if testing.Short() {
iters = 16
}
for n := iters; n > 0; n-- {
seen := make(map[int64]bool, mapSize)

m.Range(func(ki, vi interface{}) bool {
k, v := ki.(int64), vi.(int64)
if v%k != 0 {
t.Fatalf("while Storing multiples of %v, Range saw value %v", k, v)
}
if seen[k] {
t.Fatalf("Range visited key %v twice", k)
}
seen[k] = true
return true
})

if len(seen) != mapSize {
t.Fatalf("Range visited %v elements of %v-element Map", len(seen), mapSize)
}
}
}
Loading

0 comments on commit a4a2620

Please sign in to comment.