-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpipeline.go
121 lines (104 loc) · 3.27 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package phonelab
const (
DEFAULT_MAX_CONCURRENCY = 0
)
type PipelineSourceInfo interface {
Type() string
Context() string
}
type PipelineSourceInstance struct {
Processor Processor
Info PipelineSourceInfo
}
type PipelineSourceGenerator interface {
Process() <-chan *PipelineSourceInstance
}
// Conceptually, a pipeline is network graph with a single source and single
// sink. They aren't necessarily a flat list of processing nodes; the input
// may fork into multiple streams that are recombined by timestamps or in some
// other way. Since multiple paths can always be demuxed into a single
// processing node, having a single sink is not limiting. The same holds for
// input and muxing.
//
// Our representation of a pipeline is a sink-centric one. The pipeline is
// described by the last-hop Processor, of which we'll invoke its Process()
// method to kick off processing.
//
// The last hop can either be the sink, or the DataCollector can play that
// role. By letting the DataCollector act as the sink, you can get more
// reusability out of processors, theoretically anyways.
type Pipeline struct {
LastHop Processor
}
// Build a Pipeline configured to get its input from source.
type PipelineBuilder interface {
// Instantiate the pipline using the given source info.
BuildPipeline(source *PipelineSourceInstance) (*Pipeline, error)
}
// DataCollectors generally don't know anything about the source of their data,
// but can assume that they are processing at the right granularity. For
// example, the same processor logic should work if data is read from a single,
// local logcat file, gzipped log files, boot_id processor, or streamed over a
// network.
type DataCollector interface {
OnData(interface{}, PipelineSourceInfo)
Finish()
}
// A Runner manages running the processors. Its job is to facilitate building
// pipelines once for each source, kicking off the processing, and passing
// results to the DataCollector.
type Runner struct {
Source PipelineSourceGenerator
Collector DataCollector
Builder PipelineBuilder
MaxConcurrency int
}
func NewRunner(gen PipelineSourceGenerator, dc DataCollector, plb PipelineBuilder) *Runner {
return &Runner{
Source: gen,
Collector: dc,
Builder: plb,
MaxConcurrency: DEFAULT_MAX_CONCURRENCY,
}
}
func (r *Runner) runOne(source *PipelineSourceInstance, done chan error) {
// Build it
pipeline, err := r.Builder.BuildPipeline(source)
if err != nil {
done <- err
return
}
// Start the processing
resChan := pipeline.LastHop.Process()
// Drain the results and forward them to the DataCollector.
for res := range resChan {
r.Collector.OnData(res, source.Info)
}
done <- nil
}
// Synchronsously run the processor for all data sources.
func (runner *Runner) Run() []error {
running := 0
sourceChan := runner.Source.Process()
done := make(chan error)
allErrors := make([]error, 0)
for source := range sourceChan {
// Do we have a spot?
if runner.MaxConcurrency > 0 && running == runner.MaxConcurrency {
// No. Wait for something to finish.
<-done
running -= 1
}
running += 1
go runner.runOne(source, done)
}
for running > 0 {
err := <-done
if err != nil {
allErrors = append(allErrors, err)
}
running -= 1
}
runner.Collector.Finish()
return allErrors
}