Skip to content

Commit

Permalink
Optimize slang.control.Reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
jm9e committed Aug 13, 2018
1 parent 4a47448 commit 7f25b9b
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 213 deletions.
154 changes: 64 additions & 90 deletions pkg/elem/control_reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package elem

import (
"github.com/Bitspark/slang/pkg/core"
"sync"
)

var controlReduceCfg = &builtinConfig{
Expand All @@ -22,127 +23,100 @@ var controlReduceCfg = &builtinConfig{
},
},
DelegateDefs: map[string]*core.DelegateDef{
"selection": {
"reducer": {
In: core.TypeDef{
Type: "stream",
Stream: &core.TypeDef{
Type: "generic",
Generic: "itemType",
},
Type: "generic",
Generic: "itemType",
},
Out: core.TypeDef{
Type: "stream",
Stream: &core.TypeDef{
Type: "map",
Map: map[string]*core.TypeDef{
"a": {
Type: "generic",
Generic: "itemType",
},
"b": {
Type: "generic",
Generic: "itemType",
},
Type: "map",
Map: map[string]*core.TypeDef{
"a": {
Type: "generic",
Generic: "itemType",
},
"b": {
Type: "generic",
Generic: "itemType",
},
},
},
},
},
PropertyDefs: map[string]*core.TypeDef{
"emptyValue": {
Type: "generic",
Type: "generic",
Generic: "itemType",
},
},
},
opFunc: func(op *core.Operator) {
in := op.Main().In()
out := op.Main().Out()
sIn := op.Delegate("selection").In()
sOut := op.Delegate("selection").Out()
sIn := op.Delegate("reducer").In()
sOut := op.Delegate("reducer").Out()
nullValue := op.Property("emptyValue")
for !op.CheckStop() {
i := in.Pull()

if core.IsMarker(i) {
sOut.Push(i)
sel := sIn.Pull()

if sel != i {
panic("expected different marker")
}
i := in.Stream().Pull()

if !in.OwnBOS(i) {
out.Push(i)

continue
}

items, ok := i.([]interface{})
if !ok {
panic("expected stream")
}

if len(items) == 0 {
out.Push(nullValue)
continue
}

if len(items) == 1 {
out.Push(items[0])
continue
}

sOut.PushBOS()
j := 0
for j+1 < len(items) {
sOut.Stream().Map("a").Push(items[j])
sOut.Stream().Map("b").Push(items[j+1])
j += 2
}
sOut.PushEOS()

var leftover interface{}
if j != len(items) {
leftover = items[len(items)-1]
}

// POOL

for {
p := sIn.Pull()

items, ok := p.([]interface{})
if !ok {
panic("expected stream")
mutex := &sync.Mutex{}
pool := []interface{}{}
done := false
doneChan := make(chan bool)

// Reducer
go func() {
for {
mutex.Lock()
if done && len(pool) < 2 {
doneChan <- true
break
}
mutex.Unlock()

mutex.Lock()
if len(pool) > 1 {
sOut.Push(map[string]interface{}{"a": pool[0], "b": pool[1]})
pool = pool[2:]
} else {
mutex.Unlock()
continue
}
mutex.Unlock()

i := sIn.Pull()

mutex.Lock()
pool = append(pool, i)
mutex.Unlock()
}
}()

if leftover != nil {
items = append([]interface{}{leftover}, items...)
}

if len(items) == 0 {
panic("empty pool")
}
for {
// Stream items

if len(items) == 1 {
out.Push(items[0])
i = in.Stream().Pull()
if in.OwnEOS(i) {
done = true
break
}

sOut.PushBOS()
j := 0
for j+1 < len(items) {
sOut.Stream().Map("a").Push(items[j])
sOut.Stream().Map("b").Push(items[j+1])
j += 2
}
sOut.PushEOS()
mutex.Lock()
pool = append(pool, i)
mutex.Unlock()
}

if j != len(items) {
leftover = items[len(items)-1]
} else {
leftover = nil
}
<-doneChan

if len(pool) == 1 {
out.Push(pool[0])
} else {
out.Push(nullValue)
}
}
},
Expand Down
Loading

0 comments on commit 7f25b9b

Please sign in to comment.