From 6a66e089461fb456a13be90cdd42e451d666944b Mon Sep 17 00:00:00 2001 From: Tejzpr Date: Wed, 10 Mar 2021 21:32:23 -0800 Subject: [PATCH] Added support for Out Channel Buffer --- README.md | 4 +++- main.go | 5 +++-- main_test.go | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e8fc322..4811b20 100644 --- a/README.md +++ b/README.md @@ -28,9 +28,10 @@ func workFn(val interface{}) interface{} { ```go func main() { max := 10 + // Can be a non blocking channel as well inputChan := make(chan *concurrently.OrderedInput) doneChan := make(chan bool) - outChan := concurrently.Process(inputChan, workFn, &concurrently.Options{PoolSize: 10}) + outChan := concurrently.Process(inputChan, workFn, &concurrently.Options{PoolSize: 10, OutChannelBufferSize: 2}) go func() { for { select { @@ -58,6 +59,7 @@ func main() { ```go func main() { max := 100 + // Can be a non blocking channel as well inputChan := make(chan *concurrently.OrderedInput) wg := &sync.WaitGroup{} diff --git a/main.go b/main.go index d7671fe..e2b604f 100644 --- a/main.go +++ b/main.go @@ -16,7 +16,8 @@ type OrderedOutput struct { // Options options for Process type Options struct { - PoolSize int + PoolSize int + OutChannelBuffer int } // WorkFunction the function which performs work @@ -26,7 +27,7 @@ type WorkFunction func(interface{}) interface{} // It Accepts an OrderedInput read channel, work function and concurrent go routine pool size. // It Returns an OrderedOutput channel. func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options) <-chan *OrderedOutput { - outputChan := make(chan *OrderedOutput) + outputChan := make(chan *OrderedOutput, options.OutChannelBuffer) type processInput struct { value interface{} order uint64 diff --git a/main_test.go b/main_test.go index ad261ef..1bf2576 100644 --- a/main_test.go +++ b/main_test.go @@ -48,7 +48,7 @@ func Test(t *testing.T) { max := 10 inputChan := make(chan *OrderedInput) doneChan := make(chan bool) - outChan := Process(inputChan, workFn, &Options{}) + outChan := Process(inputChan, workFn, &Options{OutChannelBuffer: 2}) go func(t *testing.T) { counter := 0 for {