diff --git a/.gitignore b/.gitignore index 6abac7b..f7157a4 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ .glide/ vendor -.vscode \ No newline at end of file +.vscode +.DS_Store \ No newline at end of file diff --git a/engine.go b/engine.go index b6d4e8b..e7cd155 100644 --- a/engine.go +++ b/engine.go @@ -1,10 +1,12 @@ package fished import ( + "errors" "runtime" "sync" "time" + "github.com/hooqtv/fished/pool" "github.com/knetic/govaluate" "github.com/patrickmn/go-cache" ) @@ -15,6 +17,9 @@ var ( // DefaultWorker is the default worker for Engine DefaultWorker = 0 + + // DefaultRuleLength ... + DefaultRuleLength = 100 ) type ( @@ -25,6 +30,7 @@ type ( RuleFunctions map[string]govaluate.ExpressionFunction RuleCache *cache.Cache RunLock sync.RWMutex + RuntimePool *pool.ReferenceCountedPool } // Rule is struct for rule in fished @@ -39,12 +45,12 @@ type ( // Runtime is an struct for each time Engine.Run() is called Runtime struct { + pool.ReferenceCounter Facts map[string]interface{} JobCh chan *Job ResultCh chan *EvalResult UsedRule map[int]struct{} FactsMutex sync.RWMutex - WorkerWg sync.WaitGroup } // Job struct @@ -63,10 +69,56 @@ type ( // New will create new engine func New() *Engine { + return NewWithCustomWorkerSize(0) +} + +// NewWithCustomWorkerSize ... +func NewWithCustomWorkerSize(worker int) *Engine { + var workerSize int + c := cache.New(24*time.Hour, 1*time.Hour) + if worker == DefaultWorker { + numCPU := runtime.NumCPU() + if numCPU <= 2 { + workerSize = 1 + } else { + workerSize = runtime.NumCPU() - 1 + } + } else { + workerSize = worker + } + + if workerSize <= 0 { + workerSize = 1 + } + return &Engine{ RuleCache: c, + RuntimePool: pool.NewReferenceCountedPool( + func(counter pool.ReferenceCounter) pool.ReferenceCountable { + br := new(Runtime) + br.JobCh = make(chan *Job, DefaultRuleLength) + br.ResultCh = make(chan *EvalResult, DefaultRuleLength) + br.UsedRule = make(map[int]struct{}) + br.ReferenceCounter = counter + + for i := 0; i < workerSize; i++ { + go func() { + for job := range br.JobCh { + br.Evaluate(job, br.ResultCh) + } + }() + } + return br + }, func(i interface{}) error { + obj, ok := i.(*Runtime) + if !ok { + return errors.New("Illegal object passed") + } + obj.Reset() + return nil + }), } } @@ -131,9 +183,14 @@ func (e *Engine) RunDefault() (interface{}, []error) { return e.Run(DefaultTarget, DefaultWorker) } +// RunWithCustomTarget will execute run using customizable end target +func (e *Engine) RunWithCustomTarget(target string) (interface{}, []error) { + return e.Run(target, 0) +} + // Run will execute rule and facts to get the result +// DEPRICATION NOTICE : worker param is depricated since it has been moved to engine struct func (e *Engine) Run(target string, worker int) (interface{}, []error) { - var workerSize int var endTarget string var errs []error @@ -146,42 +203,13 @@ func (e *Engine) Run(target string, worker int) (interface{}, []error) { endTarget = target } - if worker == DefaultWorker { - numCPU := runtime.NumCPU() - if numCPU <= 2 { - workerSize = 1 - } else { - workerSize = runtime.NumCPU() - 1 - } - } else { - workerSize = worker - } - - if workerSize <= 0 { - workerSize = 1 - } - facts := make(map[string]interface{}) for key, value := range e.InitialFacts { facts[key] = value } - r := &Runtime{ - Facts: facts, - JobCh: make(chan *Job, len(e.Rules)), - ResultCh: make(chan *EvalResult, len(e.Rules)), - UsedRule: make(map[int]struct{}), - } - - r.WorkerWg.Add(workerSize) - for i := 0; i < workerSize; i++ { - go func() { - defer r.WorkerWg.Done() - for job := range r.JobCh { - r.Evaluate(job, r.ResultCh) - } - }() - } + r := e.NewRuntime(facts) + defer r.DecrementReferenceCount() for { var jobLength int @@ -246,7 +274,6 @@ func (e *Engine) Run(target string, worker int) (interface{}, []error) { } if jobLength == 0 || parseRuleError { - r.PrepareExit() break } @@ -267,10 +294,16 @@ func (e *Engine) Run(target string, worker int) (interface{}, []error) { } } - r.WorkerWg.Wait() return r.Facts[endTarget], errs } +// NewRuntime ... +func (e *Engine) NewRuntime(facts map[string]interface{}) *Runtime { + r := e.RuntimePool.Get().(*Runtime) + r.Facts = facts + return r +} + // Evaluate will evaluate each job in runtime func (r *Runtime) Evaluate(job *Job, result chan<- *EvalResult) { evalResult := &EvalResult{ @@ -287,8 +320,10 @@ func (r *Runtime) Evaluate(job *Job, result chan<- *EvalResult) { result <- evalResult } -// PrepareExit the runtime and close -func (r *Runtime) PrepareExit() { - close(r.JobCh) - close(r.ResultCh) +// Reset Current Runtime +func (r *Runtime) Reset() error { + for i := range r.UsedRule { + delete(r.UsedRule, i) + } + return nil } diff --git a/engine_test.go b/engine_test.go index 4d51443..e677382 100644 --- a/engine_test.go +++ b/engine_test.go @@ -6,7 +6,7 @@ import ( "runtime" "testing" - "github.com/json-iterator/go" + jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" ) @@ -104,9 +104,9 @@ func TestRun(t *testing.T) { return } - e := New() + e := NewWithCustomWorkerSize(test.Worker) e.Set(test.Facts, ruleMap.Data, test.RuleFunction) - res, errs := e.Run(test.Target, test.Worker) + res, errs := e.RunWithCustomTarget(test.Target) if test.IsError { assert.NotNil(t, errs) } else { @@ -208,9 +208,9 @@ func TestDoubleRun(t *testing.T) { return } - e := New() + e := NewWithCustomWorkerSize(test.Worker) e.Set(test.Facts, ruleMap.Data, test.RuleFunction) - res, errs := e.Run(test.Target, test.Worker) + res, errs := e.RunWithCustomTarget(test.Target) if test.IsError { assert.NotNil(t, errs) } else { @@ -326,10 +326,10 @@ func BenchmarkRun(b *testing.B) { json.Unmarshal(byteValue, &ruleMap) - e := New() + e := NewWithCustomWorkerSize(test.Worker) e.Set(test.Facts, ruleMap.Data, test.RuleFunction) for i := 0; i < b.N; i++ { - e.Run(test.Target, test.Worker) + e.RunWithCustomTarget(test.Target) } }) } @@ -435,9 +435,9 @@ func BenchmarkRunFullRemakeEngine(b *testing.B) { json.Unmarshal(byteValue, &ruleMap) for i := 0; i < b.N; i++ { - e := New() + e := NewWithCustomWorkerSize(test.Worker) e.Set(test.Facts, ruleMap.Data, test.RuleFunction) - e.Run(test.Target, test.Worker) + e.RunWithCustomTarget(test.Target) } }) } diff --git a/pool/reference.go b/pool/reference.go new file mode 100644 index 0000000..a5cc53e --- /dev/null +++ b/pool/reference.go @@ -0,0 +1,92 @@ +package pool + +import ( + "reflect" + "sync" + "sync/atomic" +) + +// ReferenceCountable ... +type ReferenceCountable interface { + SetInstance(i interface{}) + IncrementReferenceCount() + DecrementReferenceCount() +} + +// ReferenceCounter ... +type ReferenceCounter struct { + count *uint32 + destination *sync.Pool + released *uint32 + instance interface{} + reset func(interface{}) error + id uint32 +} + +// IncrementReferenceCount Method to increment a reference +func (r ReferenceCounter) IncrementReferenceCount() { + atomic.AddUint32(r.count, 1) +} + +// DecrementReferenceCount Method to decrement a reference +// If the reference count goes to zero, the object is put back inside the pool +func (r ReferenceCounter) DecrementReferenceCount() { + if atomic.LoadUint32(r.count) == 0 { + panic("this should not happen =>" + reflect.TypeOf(r.instance).String()) + } + if atomic.AddUint32(r.count, ^uint32(0)) == 0 { + atomic.AddUint32(r.released, 1) + if err := r.reset(r.instance); err != nil { + panic("error while resetting an instance => " + err.Error()) + } + r.destination.Put(r.instance) + r.instance = nil + } +} + +// SetInstance Method to set the current instance +func (r *ReferenceCounter) SetInstance(i interface{}) { + r.instance = i +} + +// ReferenceCountedPool Struct representing the pool +type ReferenceCountedPool struct { + pool *sync.Pool + factory func() ReferenceCountable + returned uint32 + allocated uint32 + referenced uint32 +} + +// NewReferenceCountedPool Method to create a new pool +func NewReferenceCountedPool(factory func(referenceCounter ReferenceCounter) ReferenceCountable, reset func(interface{}) error) *ReferenceCountedPool { + p := new(ReferenceCountedPool) + p.pool = new(sync.Pool) + p.pool.New = func() interface{} { + // Incrementing allocated count + atomic.AddUint32(&p.allocated, 1) + c := factory(ReferenceCounter{ + count: new(uint32), + destination: p.pool, + released: &p.returned, + reset: reset, + id: p.allocated, + }) + return c + } + return p +} + +// Get Method to get new object +func (p *ReferenceCountedPool) Get() ReferenceCountable { + c := p.pool.Get().(ReferenceCountable) + c.SetInstance(c) + atomic.AddUint32(&p.referenced, 1) + c.IncrementReferenceCount() + return c +} + +// Stats Method to return reference counted pool stats +func (p *ReferenceCountedPool) Stats() map[string]interface{} { + return map[string]interface{}{"allocated": p.allocated, "referenced": p.referenced, "returned": p.returned} +}