Skip to content
This repository has been archived by the owner on Jul 27, 2021. It is now read-only.

Commit

Permalink
Pooled runtime (#9)
Browse files Browse the repository at this point in the history
* implement pooled runtime

* depricate Run, add RunWithCustomTarget()

Co-authored-by: Jonathan Sudibya <[email protected]>
  • Loading branch information
JonathanSudibya and Jonathan Sudibya authored Nov 25, 2020
1 parent f2262e0 commit 0aca58d
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 49 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
.glide/

vendor
.vscode
.vscode
.DS_Store
113 changes: 74 additions & 39 deletions engine.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package fished

import (
"errors"
"runtime"
"sync"
"time"

"github.com/hooqtv/fished/pool"
"github.com/knetic/govaluate"
"github.com/patrickmn/go-cache"
)
Expand All @@ -15,6 +17,9 @@ var (

// DefaultWorker is the default worker for Engine
DefaultWorker = 0

// DefaultRuleLength ...
DefaultRuleLength = 100
)

type (
Expand All @@ -25,6 +30,7 @@ type (
RuleFunctions map[string]govaluate.ExpressionFunction
RuleCache *cache.Cache
RunLock sync.RWMutex
RuntimePool *pool.ReferenceCountedPool
}

// Rule is struct for rule in fished
Expand All @@ -39,12 +45,12 @@ type (

// Runtime is an struct for each time Engine.Run() is called
Runtime struct {
pool.ReferenceCounter
Facts map[string]interface{}
JobCh chan *Job
ResultCh chan *EvalResult
UsedRule map[int]struct{}
FactsMutex sync.RWMutex
WorkerWg sync.WaitGroup
}

// Job struct
Expand All @@ -63,10 +69,56 @@ type (

// New will create new engine
func New() *Engine {
return NewWithCustomWorkerSize(0)
}

// NewWithCustomWorkerSize ...
func NewWithCustomWorkerSize(worker int) *Engine {
var workerSize int

c := cache.New(24*time.Hour, 1*time.Hour)

if worker == DefaultWorker {
numCPU := runtime.NumCPU()
if numCPU <= 2 {
workerSize = 1
} else {
workerSize = runtime.NumCPU() - 1
}
} else {
workerSize = worker
}

if workerSize <= 0 {
workerSize = 1
}

return &Engine{
RuleCache: c,
RuntimePool: pool.NewReferenceCountedPool(
func(counter pool.ReferenceCounter) pool.ReferenceCountable {
br := new(Runtime)
br.JobCh = make(chan *Job, DefaultRuleLength)
br.ResultCh = make(chan *EvalResult, DefaultRuleLength)
br.UsedRule = make(map[int]struct{})
br.ReferenceCounter = counter

for i := 0; i < workerSize; i++ {
go func() {
for job := range br.JobCh {
br.Evaluate(job, br.ResultCh)
}
}()
}
return br
}, func(i interface{}) error {
obj, ok := i.(*Runtime)
if !ok {
return errors.New("Illegal object passed")
}
obj.Reset()
return nil
}),
}
}

Expand Down Expand Up @@ -131,9 +183,14 @@ func (e *Engine) RunDefault() (interface{}, []error) {
return e.Run(DefaultTarget, DefaultWorker)
}

// RunWithCustomTarget will execute run using customizable end target
func (e *Engine) RunWithCustomTarget(target string) (interface{}, []error) {
return e.Run(target, 0)
}

// Run will execute rule and facts to get the result
// DEPRICATION NOTICE : worker param is depricated since it has been moved to engine struct
func (e *Engine) Run(target string, worker int) (interface{}, []error) {
var workerSize int
var endTarget string
var errs []error

Expand All @@ -146,42 +203,13 @@ func (e *Engine) Run(target string, worker int) (interface{}, []error) {
endTarget = target
}

if worker == DefaultWorker {
numCPU := runtime.NumCPU()
if numCPU <= 2 {
workerSize = 1
} else {
workerSize = runtime.NumCPU() - 1
}
} else {
workerSize = worker
}

if workerSize <= 0 {
workerSize = 1
}

facts := make(map[string]interface{})
for key, value := range e.InitialFacts {
facts[key] = value
}

r := &Runtime{
Facts: facts,
JobCh: make(chan *Job, len(e.Rules)),
ResultCh: make(chan *EvalResult, len(e.Rules)),
UsedRule: make(map[int]struct{}),
}

r.WorkerWg.Add(workerSize)
for i := 0; i < workerSize; i++ {
go func() {
defer r.WorkerWg.Done()
for job := range r.JobCh {
r.Evaluate(job, r.ResultCh)
}
}()
}
r := e.NewRuntime(facts)
defer r.DecrementReferenceCount()

for {
var jobLength int
Expand Down Expand Up @@ -246,7 +274,6 @@ func (e *Engine) Run(target string, worker int) (interface{}, []error) {
}

if jobLength == 0 || parseRuleError {
r.PrepareExit()
break
}

Expand All @@ -267,10 +294,16 @@ func (e *Engine) Run(target string, worker int) (interface{}, []error) {
}
}

r.WorkerWg.Wait()
return r.Facts[endTarget], errs
}

// NewRuntime ...
func (e *Engine) NewRuntime(facts map[string]interface{}) *Runtime {
r := e.RuntimePool.Get().(*Runtime)
r.Facts = facts
return r
}

// Evaluate will evaluate each job in runtime
func (r *Runtime) Evaluate(job *Job, result chan<- *EvalResult) {
evalResult := &EvalResult{
Expand All @@ -287,8 +320,10 @@ func (r *Runtime) Evaluate(job *Job, result chan<- *EvalResult) {
result <- evalResult
}

// PrepareExit the runtime and close
func (r *Runtime) PrepareExit() {
close(r.JobCh)
close(r.ResultCh)
// Reset Current Runtime
func (r *Runtime) Reset() error {
for i := range r.UsedRule {
delete(r.UsedRule, i)
}
return nil
}
18 changes: 9 additions & 9 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"runtime"
"testing"

"github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -104,9 +104,9 @@ func TestRun(t *testing.T) {
return
}

e := New()
e := NewWithCustomWorkerSize(test.Worker)
e.Set(test.Facts, ruleMap.Data, test.RuleFunction)
res, errs := e.Run(test.Target, test.Worker)
res, errs := e.RunWithCustomTarget(test.Target)
if test.IsError {
assert.NotNil(t, errs)
} else {
Expand Down Expand Up @@ -208,9 +208,9 @@ func TestDoubleRun(t *testing.T) {
return
}

e := New()
e := NewWithCustomWorkerSize(test.Worker)
e.Set(test.Facts, ruleMap.Data, test.RuleFunction)
res, errs := e.Run(test.Target, test.Worker)
res, errs := e.RunWithCustomTarget(test.Target)
if test.IsError {
assert.NotNil(t, errs)
} else {
Expand Down Expand Up @@ -326,10 +326,10 @@ func BenchmarkRun(b *testing.B) {

json.Unmarshal(byteValue, &ruleMap)

e := New()
e := NewWithCustomWorkerSize(test.Worker)
e.Set(test.Facts, ruleMap.Data, test.RuleFunction)
for i := 0; i < b.N; i++ {
e.Run(test.Target, test.Worker)
e.RunWithCustomTarget(test.Target)
}
})
}
Expand Down Expand Up @@ -435,9 +435,9 @@ func BenchmarkRunFullRemakeEngine(b *testing.B) {
json.Unmarshal(byteValue, &ruleMap)

for i := 0; i < b.N; i++ {
e := New()
e := NewWithCustomWorkerSize(test.Worker)
e.Set(test.Facts, ruleMap.Data, test.RuleFunction)
e.Run(test.Target, test.Worker)
e.RunWithCustomTarget(test.Target)
}
})
}
Expand Down
92 changes: 92 additions & 0 deletions pool/reference.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package pool

import (
"reflect"
"sync"
"sync/atomic"
)

// ReferenceCountable ...
type ReferenceCountable interface {
SetInstance(i interface{})
IncrementReferenceCount()
DecrementReferenceCount()
}

// ReferenceCounter ...
type ReferenceCounter struct {
count *uint32
destination *sync.Pool
released *uint32
instance interface{}
reset func(interface{}) error
id uint32
}

// IncrementReferenceCount Method to increment a reference
func (r ReferenceCounter) IncrementReferenceCount() {
atomic.AddUint32(r.count, 1)
}

// DecrementReferenceCount Method to decrement a reference
// If the reference count goes to zero, the object is put back inside the pool
func (r ReferenceCounter) DecrementReferenceCount() {
if atomic.LoadUint32(r.count) == 0 {
panic("this should not happen =>" + reflect.TypeOf(r.instance).String())
}
if atomic.AddUint32(r.count, ^uint32(0)) == 0 {
atomic.AddUint32(r.released, 1)
if err := r.reset(r.instance); err != nil {
panic("error while resetting an instance => " + err.Error())
}
r.destination.Put(r.instance)
r.instance = nil
}
}

// SetInstance Method to set the current instance
func (r *ReferenceCounter) SetInstance(i interface{}) {
r.instance = i
}

// ReferenceCountedPool Struct representing the pool
type ReferenceCountedPool struct {
pool *sync.Pool
factory func() ReferenceCountable
returned uint32
allocated uint32
referenced uint32
}

// NewReferenceCountedPool Method to create a new pool
func NewReferenceCountedPool(factory func(referenceCounter ReferenceCounter) ReferenceCountable, reset func(interface{}) error) *ReferenceCountedPool {
p := new(ReferenceCountedPool)
p.pool = new(sync.Pool)
p.pool.New = func() interface{} {
// Incrementing allocated count
atomic.AddUint32(&p.allocated, 1)
c := factory(ReferenceCounter{
count: new(uint32),
destination: p.pool,
released: &p.returned,
reset: reset,
id: p.allocated,
})
return c
}
return p
}

// Get Method to get new object
func (p *ReferenceCountedPool) Get() ReferenceCountable {
c := p.pool.Get().(ReferenceCountable)
c.SetInstance(c)
atomic.AddUint32(&p.referenced, 1)
c.IncrementReferenceCount()
return c
}

// Stats Method to return reference counted pool stats
func (p *ReferenceCountedPool) Stats() map[string]interface{} {
return map[string]interface{}{"allocated": p.allocated, "referenced": p.referenced, "returned": p.returned}
}

0 comments on commit 0aca58d

Please sign in to comment.