From 7f25b9b6ba350c98b2aa5c3a43d23ee9b764f81e Mon Sep 17 00:00:00 2001 From: Julian Matschinske Date: Mon, 13 Aug 2018 23:24:48 +0200 Subject: [PATCH] Optimize slang.control.Reduce --- pkg/elem/control_reduce.go | 154 ++++++++---------- pkg/elem/control_reduce_test.go | 142 +++------------- .../daemon/services/operator/reduce.yaml | 4 +- tests/test_data/slib/merge_sort.yaml | 6 +- tests/test_data/sum/reduce.yaml | 4 +- 5 files changed, 97 insertions(+), 213 deletions(-) diff --git a/pkg/elem/control_reduce.go b/pkg/elem/control_reduce.go index e2c829ca..ad72af8c 100644 --- a/pkg/elem/control_reduce.go +++ b/pkg/elem/control_reduce.go @@ -2,6 +2,7 @@ package elem import ( "github.com/Bitspark/slang/pkg/core" + "sync" ) var controlReduceCfg = &builtinConfig{ @@ -22,27 +23,21 @@ var controlReduceCfg = &builtinConfig{ }, }, DelegateDefs: map[string]*core.DelegateDef{ - "selection": { + "reducer": { In: core.TypeDef{ - Type: "stream", - Stream: &core.TypeDef{ - Type: "generic", - Generic: "itemType", - }, + Type: "generic", + Generic: "itemType", }, Out: core.TypeDef{ - Type: "stream", - Stream: &core.TypeDef{ - Type: "map", - Map: map[string]*core.TypeDef{ - "a": { - Type: "generic", - Generic: "itemType", - }, - "b": { - Type: "generic", - Generic: "itemType", - }, + Type: "map", + Map: map[string]*core.TypeDef{ + "a": { + Type: "generic", + Generic: "itemType", + }, + "b": { + Type: "generic", + Generic: "itemType", }, }, }, @@ -50,7 +45,7 @@ var controlReduceCfg = &builtinConfig{ }, PropertyDefs: map[string]*core.TypeDef{ "emptyValue": { - Type: "generic", + Type: "generic", Generic: "itemType", }, }, @@ -58,91 +53,70 @@ var controlReduceCfg = &builtinConfig{ opFunc: func(op *core.Operator) { in := op.Main().In() out := op.Main().Out() - sIn := op.Delegate("selection").In() - sOut := op.Delegate("selection").Out() + sIn := op.Delegate("reducer").In() + sOut := op.Delegate("reducer").Out() nullValue := op.Property("emptyValue") for !op.CheckStop() { - i := in.Pull() - - if core.IsMarker(i) { - sOut.Push(i) - sel := sIn.Pull() - - if sel != i { - panic("expected different marker") - } + i := in.Stream().Pull() + if !in.OwnBOS(i) { out.Push(i) - - continue - } - - items, ok := i.([]interface{}) - if !ok { - panic("expected stream") - } - - if len(items) == 0 { - out.Push(nullValue) continue } - if len(items) == 1 { - out.Push(items[0]) - continue - } - - sOut.PushBOS() - j := 0 - for j+1 < len(items) { - sOut.Stream().Map("a").Push(items[j]) - sOut.Stream().Map("b").Push(items[j+1]) - j += 2 - } - sOut.PushEOS() - - var leftover interface{} - if j != len(items) { - leftover = items[len(items)-1] - } - - // POOL - - for { - p := sIn.Pull() - - items, ok := p.([]interface{}) - if !ok { - panic("expected stream") + mutex := &sync.Mutex{} + pool := []interface{}{} + done := false + doneChan := make(chan bool) + + // Reducer + go func() { + for { + mutex.Lock() + if done && len(pool) < 2 { + doneChan <- true + break + } + mutex.Unlock() + + mutex.Lock() + if len(pool) > 1 { + sOut.Push(map[string]interface{}{"a": pool[0], "b": pool[1]}) + pool = pool[2:] + } else { + mutex.Unlock() + continue + } + mutex.Unlock() + + i := sIn.Pull() + + mutex.Lock() + pool = append(pool, i) + mutex.Unlock() } + }() - if leftover != nil { - items = append([]interface{}{leftover}, items...) - } - - if len(items) == 0 { - panic("empty pool") - } + for { + // Stream items - if len(items) == 1 { - out.Push(items[0]) + i = in.Stream().Pull() + if in.OwnEOS(i) { + done = true break } - sOut.PushBOS() - j := 0 - for j+1 < len(items) { - sOut.Stream().Map("a").Push(items[j]) - sOut.Stream().Map("b").Push(items[j+1]) - j += 2 - } - sOut.PushEOS() + mutex.Lock() + pool = append(pool, i) + mutex.Unlock() + } - if j != len(items) { - leftover = items[len(items)-1] - } else { - leftover = nil - } + <-doneChan + + if len(pool) == 1 { + out.Push(pool[0]) + } else { + out.Push(nullValue) } } }, diff --git a/pkg/elem/control_reduce_test.go b/pkg/elem/control_reduce_test.go index 9b12cc1a..26ff55d2 100644 --- a/pkg/elem/control_reduce_test.go +++ b/pkg/elem/control_reduce_test.go @@ -28,13 +28,10 @@ func Test_CtrlReduce__InPorts(t *testing.T) { o, err := buildOperator(core.InstanceDef{Operator: "slang.control.Reduce", Generics: map[string]*core.TypeDef{"itemType": {Type: "number"}}, Properties: map[string]interface{}{"emptyValue": -1}}) r.NoError(err) - a.Equal(core.TYPE_STREAM, o.Main().In().Type()) - a.Equal(core.TYPE_STREAM, o.Delegate("selection").In().Type()) - // Item type itemType := core.TYPE_NUMBER a.Equal(itemType, o.Main().In().Stream().Type()) - a.Equal(itemType, o.Delegate("selection").In().Stream().Type()) + a.Equal(itemType, o.Delegate("reducer").In().Type()) o, err = buildOperator(core.InstanceDef{Operator: "slang.control.Reduce", Generics: map[string]*core.TypeDef{"itemType": {Type: "string"}}, Properties: map[string]interface{}{"emptyValue": ""}}) r.NoError(err) @@ -42,7 +39,7 @@ func Test_CtrlReduce__InPorts(t *testing.T) { // Item type itemType = core.TYPE_STRING a.Equal(itemType, o.Main().In().Stream().Type()) - a.Equal(itemType, o.Delegate("selection").In().Stream().Type()) + a.Equal(itemType, o.Delegate("reducer").In().Type()) r.NoError(err) } @@ -54,21 +51,20 @@ func Test_CtrlReduce__OutPorts(t *testing.T) { r.NoError(err) a.Equal(core.TYPE_NUMBER, o.Main().Out().Type()) - a.Equal(core.TYPE_STREAM, o.Delegate("selection").Out().Type()) - a.Equal(core.TYPE_MAP, o.Delegate("selection").Out().Stream().Type()) + a.Equal(core.TYPE_MAP, o.Delegate("reducer").Out().Type()) // Item type itemType := core.TYPE_NUMBER - a.Equal(itemType, o.Delegate("selection").Out().Stream().Map("a").Type()) - a.Equal(itemType, o.Delegate("selection").Out().Stream().Map("b").Type()) + a.Equal(itemType, o.Delegate("reducer").Out().Map("a").Type()) + a.Equal(itemType, o.Delegate("reducer").Out().Map("b").Type()) o, err = buildOperator(core.InstanceDef{Operator: "slang.control.Reduce", Generics: map[string]*core.TypeDef{"itemType": {Type: "string"}}, Properties: map[string]interface{}{"emptyValue": ""}}) r.NoError(err) // Item type itemType = core.TYPE_STRING - a.Equal(itemType, o.Delegate("selection").Out().Stream().Map("a").Type()) - a.Equal(itemType, o.Delegate("selection").Out().Stream().Map("b").Type()) + a.Equal(itemType, o.Delegate("reducer").Out().Map("a").Type()) + a.Equal(itemType, o.Delegate("reducer").Out().Map("b").Type()) } func Test_CtrlReduce__PassMarkers(t *testing.T) { @@ -79,15 +75,13 @@ func Test_CtrlReduce__PassMarkers(t *testing.T) { r.NoError(err) o.Main().Out().Bufferize() - o.Delegate("selection").Out().Stream().Bufferize() + o.Delegate("reducer").Out().Bufferize() o.Start() bos := core.BOS{} eos := core.BOS{} o.Main().In().Stream().Push(bos) o.Main().In().Stream().Push(eos) - o.Delegate("selection").In().Stream().Push(bos) - o.Delegate("selection").In().Stream().Push(eos) a.PortPushesAll([]interface{}{bos, eos}, o.Main().Out()) } @@ -104,8 +98,7 @@ func Test_CtrlReduce__SelectionFromItemsEmpty(t *testing.T) { r.NoError(err) o.Main().Out().Bufferize() - o.Delegate("selection").Out().Stream().Map("a").Bufferize() - o.Delegate("selection").Out().Stream().Map("b").Bufferize() + o.Delegate("reducer").Out().Bufferize() o.Start() o.Main().In().Push([]interface{}{}) @@ -122,8 +115,7 @@ func Test_CtrlReduce__SelectionFromItemsSingle(t *testing.T) { r.NoError(err) o.Main().Out().Bufferize() - o.Delegate("selection").Out().Stream().Map("a").Bufferize() - o.Delegate("selection").Out().Stream().Map("b").Bufferize() + o.Delegate("reducer").Out().Bufferize() o.Start() o.Main().In().Push([]interface{}{123.0}) @@ -140,15 +132,14 @@ func Test_CtrlReduce__SelectionFromItemsMultiple(t *testing.T) { r.NoError(err) o.Main().Out().Bufferize() - o.Delegate("selection").Out().Stream().Map("a").Bufferize() - o.Delegate("selection").Out().Stream().Map("b").Bufferize() + o.Delegate("reducer").Out().Bufferize() o.Start() o.Main().In().Push([]interface{}{1.0, 2.0}) - o.Delegate("selection").In().Push([]interface{}{3.0}) + o.Delegate("reducer").In().Push(3.0) - i := o.Delegate("selection").Out().Pull() - a.Equal([]interface{}{map[string]interface{}{"a": 1.0, "b": 2.0}}, i) + i := o.Delegate("reducer").Out().Pull() + a.Equal(map[string]interface{}{"a": 1.0, "b": 2.0}, i) i = o.Main().Out().Pull() a.Equal(3.0, i) @@ -162,99 +153,18 @@ func Test_CtrlReduce__SelectionFromPool(t *testing.T) { r.NoError(err) o.Main().Out().Bufferize() - o.Delegate("selection").Out().Stream().Map("a").Bufferize() - o.Delegate("selection").Out().Stream().Map("b").Bufferize() - o.Start() - - o.Main().In().Push([]interface{}{1.0, 2.0}) - o.Delegate("selection").In().Push([]interface{}{3.0, 4.0, 5.0, 6.0}) - - i := o.Delegate("selection").Out().Pull() - a.Equal([]interface{}{map[string]interface{}{"a": 1.0, "b": 2.0}}, i) - - i = o.Delegate("selection").Out().Pull() - a.Equal([]interface{}{ - map[string]interface{}{"a": 3.0, "b": 4.0}, - map[string]interface{}{"a": 5.0, "b": 6.0}, - }, i) -} - -func Test_CtrlReduce__MixedSelection1(t *testing.T) { - a := assertions.New(t) - r := require.New(t) - - o, err := buildOperator(core.InstanceDef{Operator: "slang.control.Reduce", Generics: map[string]*core.TypeDef{"itemType": {Type: "number"}}, Properties: map[string]interface{}{"emptyValue": -1}}) - r.NoError(err) - - o.Main().Out().Bufferize() - o.Delegate("selection").Out().Stream().Map("a").Bufferize() - o.Delegate("selection").Out().Stream().Map("b").Bufferize() - o.Start() - - o.Main().In().Push([]interface{}{1.0, 2.0, 3.0}) - o.Delegate("selection").In().Push([]interface{}{4.0}) - - i := o.Delegate("selection").Out().Pull() - a.Equal([]interface{}{map[string]interface{}{"a": 1.0, "b": 2.0}}, i) - i = o.Delegate("selection").Out().Pull() - a.Equal([]interface{}{map[string]interface{}{"a": 3.0, "b": 4.0}}, i) -} - -func Test_CtrlReduce__MixedSelection2(t *testing.T) { - a := assertions.New(t) - r := require.New(t) - - o, err := buildOperator(core.InstanceDef{Operator: "slang.control.Reduce", Generics: map[string]*core.TypeDef{"itemType": {Type: "number"}}, Properties: map[string]interface{}{"emptyValue": -1}}) - r.NoError(err) - - o.Main().Out().Bufferize() - o.Delegate("selection").Out().Stream().Map("a").Bufferize() - o.Delegate("selection").Out().Stream().Map("b").Bufferize() + o.Delegate("reducer").Out().Bufferize() o.Start() - o.Main().In().Push([]interface{}{1.0, 2.0, 3.0}) - o.Delegate("selection").In().Push([]interface{}{4.0, 5.0, 6.0}) - o.Delegate("selection").In().Push([]interface{}{7.0, 8.0, 9.0}) - o.Delegate("selection").In().Push([]interface{}{10.0}) - o.Delegate("selection").In().Push([]interface{}{11.0}) - - i := o.Delegate("selection").Out().Pull() - a.Equal([]interface{}{map[string]interface{}{"a": 1.0, "b": 2.0}}, i) - - i = o.Delegate("selection").Out().Pull() - a.Equal([]interface{}{ - map[string]interface{}{"a": 3.0, "b": 4.0}, - map[string]interface{}{"a": 5.0, "b": 6.0}, - }, i) - - i = o.Delegate("selection").Out().Pull() - a.Equal([]interface{}{ - map[string]interface{}{"a": 7.0, "b": 8.0}, - }, i) - - i = o.Delegate("selection").Out().Pull() - a.Equal([]interface{}{ - map[string]interface{}{"a": 9.0, "b": 10.0}, - }, i) - - i = o.Main().Out().Pull() - a.Equal(11.0, i) -} - -func Test_CtrlReduce__MixedSelection3(t *testing.T) { - a := assertions.New(t) - r := require.New(t) - - o, err := buildOperator(core.InstanceDef{Operator: "slang.control.Reduce", Generics: map[string]*core.TypeDef{"itemType": {Type: "number"}}, Properties: map[string]interface{}{"emptyValue": -1}}) - r.NoError(err) - - o.Main().Out().Bufferize() - o.Delegate("selection").Out().Stream().Map("a").Bufferize() - o.Delegate("selection").Out().Stream().Map("b").Bufferize() - o.Start() - - o.Main().In().Push([]interface{}{1.0, 2.0}) - o.Delegate("selection").In().Push([]interface{}{3.0}) - - a.PortPushesAll([]interface{}{3.0}, o.Main().Out()) + o.Main().In().Push([]interface{}{1.0, 1.0, 1.0, 1.0}) + o.Delegate("reducer").In().Push(2.0) + o.Delegate("reducer").In().Push(2.0) + o.Delegate("reducer").In().Push(4.0) + + i := o.Delegate("reducer").Out().Pull() + a.Equal(map[string]interface{}{"a": 1.0, "b": 1.0}, i) + i = o.Delegate("reducer").Out().Pull() + a.Equal(map[string]interface{}{"a": 1.0, "b": 1.0}, i) + i = o.Delegate("reducer").Out().Pull() + a.Equal(map[string]interface{}{"a": 2.0, "b": 2.0}, i) } diff --git a/tests/test_data/daemon/services/operator/reduce.yaml b/tests/test_data/daemon/services/operator/reduce.yaml index 6ee26fbc..a91f48f2 100644 --- a/tests/test_data/daemon/services/operator/reduce.yaml +++ b/tests/test_data/daemon/services/operator/reduce.yaml @@ -26,7 +26,7 @@ connections: - (reducer reducer): - ) - reducer.selection)~: + reducer.reducer): - (adder adder): - - ~(reducer.selection + - (reducer.reducer diff --git a/tests/test_data/slib/merge_sort.yaml b/tests/test_data/slib/merge_sort.yaml index 788f59c3..191cd032 100644 --- a/tests/test_data/slib/merge_sort.yaml +++ b/tests/test_data/slib/merge_sort.yaml @@ -46,9 +46,9 @@ connections: - (packer packer): - ~(reducer - reducer.selection)~.a: + reducer.reducer)a: - true(sorter - reducer.selection)~.b: + reducer.reducer)b: - false(sorter sorter.compare)~.true: - a(comparator @@ -57,6 +57,6 @@ connections: comparator): - ~(sorter.compare sorter): - - ~(reducer.selection + - (reducer.reducer reducer): - ) diff --git a/tests/test_data/sum/reduce.yaml b/tests/test_data/sum/reduce.yaml index 6ee26fbc..a91f48f2 100644 --- a/tests/test_data/sum/reduce.yaml +++ b/tests/test_data/sum/reduce.yaml @@ -26,7 +26,7 @@ connections: - (reducer reducer): - ) - reducer.selection)~: + reducer.reducer): - (adder adder): - - ~(reducer.selection + - (reducer.reducer