-
Notifications
You must be signed in to change notification settings - Fork 11
/
execute.go
142 lines (111 loc) · 3.05 KB
/
execute.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package roulette
import "time"
// SimpleExecute interface provides methods to retrieve a parser and a method which executes on the incoming values.
type SimpleExecute interface {
Execute(vals ...interface{})
}
// QueueExecute interface provides methods to retrieve a parser and a method which executes on the incoming values on the input channel.
type QueueExecute interface {
Execute(in <-chan interface{}, out chan<- interface{}) // in channel to write, out channel to read.
CloseResult()
}
// SimpleExecutor implements the SimpleExecute interface
type SimpleExecutor struct {
Parser Parser
}
// Execute executes rules in order of priority.
// one(true): executes in order of priority until a high priority rule is successful, after which execution stops
func (s *SimpleExecutor) Execute(vals ...interface{}) {
s.Parser.Execute(vals)
}
// QueueExecutor implements the QueueExecute
type QueueExecutor struct {
Parser Parser
}
// Execute ...
func (q *QueueExecutor) Execute(in <-chan interface{}, out chan<- interface{}) {
go q.drainQueue(out)
go q.fillQueue(in)
}
func (q *QueueExecutor) processWorker(vals interface{}) {
q.process(vals)
}
func (q *QueueExecutor) process(vals interface{}) error {
q.Parser.Execute(vals)
return nil
}
func (q *QueueExecutor) fillQueue(in <-chan interface{}) {
fill:
for {
select {
case v, ok := <-in:
if !ok {
break fill
}
go q.processWorker(v)
//TODO: quit the loop clean
//TODO: Pool of process workers
}
}
}
// adapter from github.com/kylelemons/iq
func (q *QueueExecutor) drainQueue(out chan<- interface{}) {
defer close(out)
// pending events (this is the "infinite" part)
var pending []interface{}
recv:
for {
// Ensure that pending always has values so the select can
// multiplex between the receiver and sender properly
if len(pending) == 0 {
v, ok := <-q.Parser.GetResult().Get().(chan interface{})
if !ok {
// in is closed, flush values
break recv
}
switch v.(type) {
case empty:
continue
}
pending = append(pending, v)
}
select {
// Queue incoming values
case v, ok := <-q.Parser.GetResult().Get().(chan interface{}):
if !ok {
// in is closed, flush values
break recv
}
switch v.(type) {
case empty:
continue
}
pending = append(pending, v)
// Send queued values
case out <- pending[0]:
pending = pending[1:]
}
}
// After in is closed, we may still have events to send
for _, v := range pending {
out <- v
}
}
// CloseResult closes the result channel
func (q *QueueExecutor) CloseResult() {
q.Parser.GetResult().Get().(chan interface{}) <- quit{}
select {
case <-q.Parser.GetResult().Get().(chan interface{}):
case <-time.After(time.Millisecond * 10):
close(q.Parser.GetResult().Get().(chan interface{}))
break
}
}
// NewSimpleExecutor returns a new SimpleExecutor
func NewSimpleExecutor(parser Parser) SimpleExecute {
return &SimpleExecutor{Parser: parser}
}
// NewQueueExecutor returns a new QueueExecutor
func NewQueueExecutor(parser Parser) QueueExecute {
return &QueueExecutor{Parser: parser}
}