-
Notifications
You must be signed in to change notification settings - Fork 54
/
Copy pathstage.go
126 lines (105 loc) · 2.95 KB
/
stage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package pipeline
import (
"context"
"fmt"
"github.com/fatih/color"
)
// Stage is a collection of steps executed concurrently or sequentially
// concurrent: run the steps concurrently
//
// disableStrictMode: In strict mode if a single step fails, all the other concurrent steps are cancelled.
// Step.Cancel will be invoked for cancellation of the step. Set disableStrictMode to true to disable strict mode
type Stage struct {
Name string `json:"name"`
Steps []Step `json:"steps"`
Concurrent bool `json:"concurrent"`
DisableStrictMode bool `json:"disableStrictMode"`
index int
pipelineKey string
}
// NewStage returns a new stage
// name of the stage
// concurrent flag sets whether the steps will be executed concurrently
func NewStage(name string, concurrent bool, disableStrictMode bool) *Stage {
st := &Stage{Name: name, Concurrent: concurrent,DisableStrictMode: disableStrictMode}
return st
}
// AddStep adds a new step to the stage
func (st *Stage) AddStep(step ...Step) {
st.Steps = append(st.Steps, step...)
}
// Run the stage execution sequentially
func (st *Stage) run(request *Request) *Result {
if len(st.Steps) == 0 {
return &Result{Error: fmt.Errorf("No steps to be executed")}
}
st.status("begin")
defer st.status("end")
if st.Concurrent {
st.status("is concurrent")
g, ctx := withContext(context.Background())
for _, step := range st.Steps {
step.Status("begin")
g.run(func() *Result {
defer step.Status("end")
//disables strict mode. g.run will wait for all steps to finish
if st.DisableStrictMode {
return step.Exec(request)
}
resultChan := make(chan *Result, 1)
go func() {
result := step.Exec(request)
if result == nil {
result = &Result{}
}
resultChan <- result
}()
select {
case <-ctx.Done():
if err := step.Cancel(); err != nil {
st.status("Error Cancelling Step " + step.getCtx().name)
}
<-resultChan
return &Result{Error: ctx.Err()}
case result := <-resultChan:
if result == nil {
result = &Result{}
}
return result
}
})
}
if result := g.wait(); result != nil && result.Error != nil {
st.status(" >>>failed !!! ")
return result
}
} else {
st.status("is not concurrent")
res := &Result{}
for _, step := range st.Steps {
step.Status("begin")
res = step.Exec(request)
if res != nil && res.Error != nil {
step.Status(">>>failed !!!")
return res
}
if res == nil {
res = &Result{}
step.Status("end")
continue
}
request.Data = res.Data
request.KeyVal = res.KeyVal
step.Status("end")
}
return res
}
return &Result{}
}
// status writes a line to the out channel
func (st *Stage) status(line string) {
stageText := fmt.Sprintf("[stage-%d]", st.index)
yellow := color.New(color.FgYellow).SprintFunc()
line = yellow(stageText) + "[" + st.Name + "]: " + line
send(st.pipelineKey, line)
}