Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
raulb committed Aug 23, 2024
1 parent 7d86818 commit 9f297cd
Show file tree
Hide file tree
Showing 32 changed files with 82 additions and 129 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ linters:
# inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint
disable-all: true
enable:
# We plan to enable all of the linters which are commented out.
# We plan to enable all the linters which are commented out.
# However, we want to enable them one by one (so we don't have to fix many issues at once).
- bodyclose
- depguard
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type DLQ struct {
Plugin string
Settings map[string]string

WindowSize int
WindowNackThreshold int
WindowSize uint64
WindowNackThreshold uint64
}

var DefaultDLQ = DLQ{
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,9 @@ func (s *Service) buildParallelProcessorNode(
) *stream.ParallelNode {
return &stream.ParallelNode{
Name: proc.ID + "-parallel",
NewNode: func(i int) stream.PubSubNode {
NewNode: func(i uint64) stream.PubSubNode {
n := s.buildProcessorNode(pl, proc)
n.Name = n.Name + "-" + strconv.Itoa(i) // add suffix to name
n.Name = n.Name + "-" + strconv.FormatUint(i, 10) // add suffix to name
return n
},
Workers: proc.Config.Workers,
Expand Down
6 changes: 0 additions & 6 deletions pkg/pipeline/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,6 @@ func (s *Service) UpdateDLQ(ctx context.Context, pipelineID string, cfg DLQ) (*I
if cfg.Plugin == "" {
return nil, cerrors.New("DLQ plugin must be provided")
}
if cfg.WindowSize < 0 {
return nil, cerrors.New("DLQ window size must be non-negative")
}
if cfg.WindowNackThreshold < 0 {
return nil, cerrors.New("DLQ window nack threshold must be non-negative")
}
if cfg.WindowSize > 0 && cfg.WindowSize <= cfg.WindowNackThreshold {
return nil, cerrors.New("DLQ window nack threshold must be lower than window size")
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/pipeline/stream/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type DLQHandlerNode struct {
Name string
Handler DLQHandler

WindowSize int
WindowNackThreshold int
WindowSize uint64
WindowNackThreshold uint64

Timer metrics.Timer
Histogram metrics.RecordBytesHistogram
Expand Down Expand Up @@ -212,13 +212,13 @@ type dlqWindow struct {
// nackThreshold represents the number of tolerated nacks, if the threshold
// is exceeded the window is frozen and returns an error for all further
// nacks.
nackThreshold int
nackThreshold uint64

ackCount int
nackCount int
ackCount uint64
nackCount uint64
}

func newDLQWindow(size, threshold int) *dlqWindow {
func newDLQWindow(size, threshold uint64) *dlqWindow {
if size > 0 && threshold == 0 {
// optimization - if threshold is 0 the window size does not matter,
// setting it to 1 ensures we don't use more memory than needed
Expand Down
14 changes: 7 additions & 7 deletions pkg/pipeline/stream/dlq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestDLQHandlerNode_Nack_ForwardToDLQ_Success(t *testing.T) {
is.NoErr(err)
}()

for i := 0; i < n.WindowNackThreshold; i++ {
for i := uint64(0); i < n.WindowNackThreshold; i++ {
msg := &Message{
Ctx: ctx,
Record: opencdc.Record{Position: []byte(uuid.NewString())},
Expand Down Expand Up @@ -326,8 +326,8 @@ func TestDLQWindow_WindowDisabled(t *testing.T) {

func TestDLQWindow_NackThresholdExceeded(t *testing.T) {
testCases := []struct {
windowSize int
nackThreshold int
windowSize uint64
nackThreshold uint64
}{
{1, 0},
{2, 0},
Expand All @@ -344,19 +344,19 @@ func TestDLQWindow_NackThresholdExceeded(t *testing.T) {
w := newDLQWindow(tc.windowSize, tc.nackThreshold)

// fill up window with nacks up to the threshold
for i := 0; i < tc.nackThreshold; i++ {
for i := uint64(0); i < tc.nackThreshold; i++ {
ok := w.Nack()
is.True(ok)
}

// fill up window again with acks
for i := 0; i < tc.windowSize; i++ {
for i := uint64(0); i < tc.windowSize; i++ {
w.Ack()
}

// since window is full of acks we should be able to fill up
// the window with nacks again
for i := 0; i < tc.nackThreshold; i++ {
for i := uint64(0); i < tc.nackThreshold; i++ {
ok := w.Nack()
is.True(ok)
}
Expand All @@ -367,7 +367,7 @@ func TestDLQWindow_NackThresholdExceeded(t *testing.T) {

// adding acks after that should make no difference, all nacks
// need to fail after the threshold is reached
for i := 0; i < tc.windowSize; i++ {
for i := uint64(0); i < tc.windowSize; i++ {
w.Ack()
}
ok = w.Nack()
Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/stream/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type ParallelNode struct {
Name string
// NewNode is the constructor of the wrapped PubSubNode, it should create
// the i-th node (useful for distinguishing nodes in logs).
NewNode func(i int) PubSubNode
Workers int
NewNode func(i uint64) PubSubNode
Workers uint64

base pubSubNodeBase
logger log.CtxLogger
Expand Down Expand Up @@ -73,7 +73,7 @@ func (n *ParallelNode) Run(ctx context.Context) error {
// buffered, so it blocks when all workers are busy
workerJobs := make(chan parallelNodeJob)
var workerWg sync.WaitGroup
for i := 0; i < n.Workers; i++ {
for i := uint64(0); i < n.Workers; i++ {
node := n.NewNode(i)
worker := newParallelNodeWorker(node, workerJobs, n.logger)
workerWg.Add(1)
Expand Down
8 changes: 4 additions & 4 deletions pkg/pipeline/stream/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestParallelNode_Success(t *testing.T) {

const workerCount = 10

newPubSubNode := func(i int) PubSubNode {
newPubSubNode := func(i uint64) PubSubNode {
return &parallelTestNode{
Name: fmt.Sprintf("test-node-%d", i),
F: func(ctx context.Context, sub <-chan *Message, pub chan<- *Message) error {
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestParallelNode_ErrorAll(t *testing.T) {

const workerCount = 10

newPubSubNode := func(i int) PubSubNode {
newPubSubNode := func(i uint64) PubSubNode {
name := fmt.Sprintf("test-node-%d", i)
return &parallelTestNode{
Name: name,
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestParallelNode_ErrorSingle(t *testing.T) {

const workerCount = 10

newPubSubNode := func(i int) PubSubNode {
newPubSubNode := func(i uint64) PubSubNode {
name := fmt.Sprintf("test-node-%d", i)
return &parallelTestNode{
Name: name,
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestParallelNode_Processor(t *testing.T) {
Teardown(gomock.Any()).
Times(workerCount)

newProcNode := func(i int) PubSubNode {
newProcNode := func(i uint64) PubSubNode {
return &ProcessorNode{
Name: fmt.Sprintf("test-%d", i),
Processor: proc,
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugin/processor/standalone/host_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (*hostModuleInstance) Close(context.Context) error { return nil }
// message. If the buffer is too small, it returns the size of the command
// request message and parks the command request. The next call to this function
// will return the same command request.
func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes) types.Uint32 {
func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes) types.Uint64 {
m.logger.Trace(ctx).Msg("executing command_request")

if m.parkedCommandRequest == nil {
Expand All @@ -126,7 +126,7 @@ func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes
Int("command_bytes", size).
Int("allocated_bytes", len(buf)).
Msgf("insufficient memory, command will be parked until next call to command_request")
return types.Uint32(size)
return types.Uint64(size)
}

// If the buffer is large enough, we marshal the command into the buffer and
Expand All @@ -140,7 +140,7 @@ func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes
m.parkedCommandRequest = nil

m.logger.Trace(ctx).Msg("returning next command")
return types.Uint32(len(out))
return types.Uint64(len(out))
}

// commandResponse is the exported function that is called by the WASM module to
Expand Down
2 changes: 1 addition & 1 deletion pkg/processor/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,5 @@ type Parent struct {
// Config holds configuration data for building a processor.
type Config struct {
Settings map[string]string
Workers int
Workers uint64
}
3 changes: 0 additions & 3 deletions pkg/processor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ func (s *Service) Create(
pt ProvisionType,
cond string,
) (*Instance, error) {
if cfg.Workers < 0 {
return nil, cerrors.New("processor workers can't be negative")
}
if cfg.Workers == 0 {
cfg.Workers = 1
}
Expand Down
21 changes: 0 additions & 21 deletions pkg/processor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,27 +197,6 @@ func TestService_Create_BuilderFail(t *testing.T) {
is.Equal(i, nil)
}

func TestService_Create_WorkersNegative(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}

service := NewService(log.Nop(), db, &proc_plugin.PluginService{})

got, err := service.Create(
ctx,
uuid.NewString(),
"processor-type",
Parent{},
Config{Workers: -1},
ProvisionTypeAPI,
"{{true}}",
)
is.True(err != nil) // expected workers error
is.Equal("processor workers can't be negative", err.Error())
is.Equal(got, nil)
}

func TestService_Delete_Success(t *testing.T) {
is := is.New(t)
ctx := context.Background()
Expand Down
6 changes: 3 additions & 3 deletions pkg/provisioning/config/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ type Processor struct {
ID string
Plugin string
Settings map[string]string
Workers int
Workers uint64
Condition string
}

type DLQ struct {
Plugin string
Settings map[string]string
WindowSize *int
WindowNackThreshold *int
WindowSize *uint64
WindowNackThreshold *uint64
}

// Classify fields as immutable, mutable or ignored. This is used by the
Expand Down
3 changes: 0 additions & 3 deletions pkg/provisioning/config/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ func validateProcessors(mp []Processor) []error {
if cfg.Plugin == "" {
errs = append(errs, cerrors.Errorf("processor %q: \"plugin\" is mandatory: %w", cfg.ID, ErrMandatoryField))
}
if cfg.Workers < 0 {
errs = append(errs, cerrors.Errorf("processor %q: \"workers\" can't be negative: %w", cfg.ID, ErrInvalidField))
}
if ids[cfg.ID] {
errs = append(errs, cerrors.Errorf("processor %q: \"id\" must be unique: %w", cfg.ID, ErrDuplicateID))
}
Expand Down
16 changes: 0 additions & 16 deletions pkg/provisioning/config/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,22 +209,6 @@ func TestValidator_InvalidFields(t *testing.T) {
}},
},
wantErr: ErrInvalidField,
}, {
name: "processor workers is negative",
config: Pipeline{
ID: "pipeline1",
Status: "running",
Name: "pipeline1",
Description: "desc1",
Processors: []Processor{{
ID: "proc1",
Plugin: "js",
Settings: map[string]string{},
// invalid field
Workers: -1,
}},
},
wantErr: ErrInvalidField,
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
14 changes: 8 additions & 6 deletions pkg/provisioning/config/yaml/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func TestParser_V1_Success(t *testing.T) {
is := is.New(t)
parser := NewParser(log.Nop())
filepath := "./v1/testdata/pipelines1-success.yml"
intPtr := func(i int) *int { return &i }

uint64Ptr := func(i uint64) *uint64 { return &i }
want := Configurations{
v1.Configuration{
Version: "1.0",
Expand Down Expand Up @@ -78,8 +79,8 @@ func TestParser_V1_Success(t *testing.T) {
Settings: map[string]string{
"foo": "bar",
},
WindowSize: intPtr(4),
WindowNackThreshold: intPtr(2),
WindowSize: uint64Ptr(4),
WindowNackThreshold: uint64Ptr(2),
},
},
},
Expand Down Expand Up @@ -274,7 +275,8 @@ func TestParser_V2_Success(t *testing.T) {
is := is.New(t)
parser := NewParser(log.Nop())
filepath := "./v2/testdata/pipelines1-success.yml"
intPtr := func(i int) *int { return &i }

uint64Ptr := func(i uint64) *uint64 { return &i }
want := Configurations{
v2.Configuration{
Version: "2.2",
Expand Down Expand Up @@ -321,8 +323,8 @@ func TestParser_V2_Success(t *testing.T) {
Settings: map[string]string{
"foo": "bar",
},
WindowSize: intPtr(4),
WindowNackThreshold: intPtr(2),
WindowSize: uint64Ptr(4),
WindowNackThreshold: uint64Ptr(2),
},
},
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/provisioning/config/yaml/v1/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ type Connector struct {
type Processor struct {
Type string `yaml:"type"`
Settings map[string]string `yaml:"settings"`
Workers int `yaml:"workers"`
Workers uint64 `yaml:"workers"`
}

type DLQ struct {
Plugin string `yaml:"plugin"`
Settings map[string]string `yaml:"settings"`
WindowSize *int `yaml:"window-size"`
WindowNackThreshold *int `yaml:"window-nack-threshold"`
WindowSize *uint64
WindowNackThreshold *uint64 `yaml:"window-nack-threshold"`
}

func (c Configuration) ToConfig() []config.Pipeline {
Expand Down
Loading

0 comments on commit 9f297cd

Please sign in to comment.