-
Notifications
You must be signed in to change notification settings - Fork 8
/
pipeline_test.go
100 lines (87 loc) · 1.97 KB
/
pipeline_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package warppipe
import (
"context"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestPipeline(t *testing.T) {
p := NewPipeline()
p.AddStage("remove_pii", func(change *Changeset) (*Changeset, error) {
var filtered []*ChangesetColumn
for _, val := range change.NewValues {
if val.Column == "first_name" {
continue
}
filtered = append(filtered, val)
}
change.NewValues = filtered
return change, nil
})
p.AddStage("uppercase_tablename", func(change *Changeset) (*Changeset, error) {
change.Table = strings.ToUpper(change.Table)
return change, nil
})
p.AddStage("filter_test_users", func(change *Changeset) (*Changeset, error) {
for _, val := range change.NewValues {
if val.Column == "is_test" && val.Value == "TRUE" {
return nil, nil
}
}
return change, nil
})
sourceCh := make(chan *Changeset)
ctx, cancel := context.WithCancel(context.Background())
outCh, errCh := p.Start(ctx, sourceCh)
changesetWithPii := &Changeset{
Table: "users",
NewValues: []*ChangesetColumn{
{
Column: "first_name",
Type: "string",
Value: "Bob",
},
{
Column: "is_test",
Type: "boolean",
Value: "FALSE",
},
},
}
changesetForTestUser := &Changeset{
Table: "users",
NewValues: []*ChangesetColumn{
{
Column: "first_name",
Type: "string",
Value: "Alice",
},
{
Column: "is_test",
Type: "boolean",
Value: "TRUE",
},
},
}
// NOTE: only add 1 to the waitgroup since the test user will be dropped
for _, change := range []*Changeset{changesetWithPii, changesetForTestUser} {
sourceCh <- change
}
var results []*Changeset
go func() {
for {
select {
case change := <-outCh:
results = append(results, change)
case err := <-errCh:
cancel()
t.Error(err)
}
}
}()
_ = <-time.After(300 * time.Millisecond)
assert.Equal(t, 1, len(results))
assert.Equal(t, 1, len(results[0].NewValues))
assert.Equal(t, "USERS", results[0].Table)
}