diff --git a/.golangci.yml b/.golangci.yml index ab7ef0106..533ce6457 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -62,7 +62,7 @@ linters: # inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint disable-all: true enable: - # We plan to enable all of the linters which are commented out. + # We plan to enable all the linters which are commented out. # However, we want to enable them one by one (so we don't have to fix many issues at once). - bodyclose - depguard diff --git a/pkg/pipeline/instance.go b/pkg/pipeline/instance.go index 35654eb98..b2f486bfe 100644 --- a/pkg/pipeline/instance.go +++ b/pkg/pipeline/instance.go @@ -83,8 +83,8 @@ type DLQ struct { Plugin string Settings map[string]string - WindowSize int - WindowNackThreshold int + WindowSize uint64 + WindowNackThreshold uint64 } var DefaultDLQ = DLQ{ diff --git a/pkg/pipeline/lifecycle.go b/pkg/pipeline/lifecycle.go index 9eb8b24a3..aa0b7f491 100644 --- a/pkg/pipeline/lifecycle.go +++ b/pkg/pipeline/lifecycle.go @@ -324,9 +324,9 @@ func (s *Service) buildParallelProcessorNode( ) *stream.ParallelNode { return &stream.ParallelNode{ Name: proc.ID + "-parallel", - NewNode: func(i int) stream.PubSubNode { + NewNode: func(i uint64) stream.PubSubNode { n := s.buildProcessorNode(pl, proc) - n.Name = n.Name + "-" + strconv.Itoa(i) // add suffix to name + n.Name = n.Name + "-" + strconv.FormatUint(i, 10) // add suffix to name return n }, Workers: proc.Config.Workers, diff --git a/pkg/pipeline/service.go b/pkg/pipeline/service.go index 95f910765..290ce6b3e 100644 --- a/pkg/pipeline/service.go +++ b/pkg/pipeline/service.go @@ -187,12 +187,6 @@ func (s *Service) UpdateDLQ(ctx context.Context, pipelineID string, cfg DLQ) (*I if cfg.Plugin == "" { return nil, cerrors.New("DLQ plugin must be provided") } - if cfg.WindowSize < 0 { - return nil, cerrors.New("DLQ window size must be non-negative") - } - if cfg.WindowNackThreshold < 0 { - return nil, cerrors.New("DLQ window nack threshold must be non-negative") - } if cfg.WindowSize > 0 && cfg.WindowSize <= cfg.WindowNackThreshold { return nil, cerrors.New("DLQ window nack threshold must be lower than window size") } diff --git a/pkg/pipeline/stream/dlq.go b/pkg/pipeline/stream/dlq.go index f1aa4a350..44b5a8f85 100644 --- a/pkg/pipeline/stream/dlq.go +++ b/pkg/pipeline/stream/dlq.go @@ -42,8 +42,8 @@ type DLQHandlerNode struct { Name string Handler DLQHandler - WindowSize int - WindowNackThreshold int + WindowSize uint64 + WindowNackThreshold uint64 Timer metrics.Timer Histogram metrics.RecordBytesHistogram @@ -212,13 +212,13 @@ type dlqWindow struct { // nackThreshold represents the number of tolerated nacks, if the threshold // is exceeded the window is frozen and returns an error for all further // nacks. - nackThreshold int + nackThreshold uint64 - ackCount int - nackCount int + ackCount uint64 + nackCount uint64 } -func newDLQWindow(size, threshold int) *dlqWindow { +func newDLQWindow(size, threshold uint64) *dlqWindow { if size > 0 && threshold == 0 { // optimization - if threshold is 0 the window size does not matter, // setting it to 1 ensures we don't use more memory than needed diff --git a/pkg/pipeline/stream/dlq_test.go b/pkg/pipeline/stream/dlq_test.go index 2ee6fcbfc..2ba7bc957 100644 --- a/pkg/pipeline/stream/dlq_test.go +++ b/pkg/pipeline/stream/dlq_test.go @@ -236,7 +236,7 @@ func TestDLQHandlerNode_Nack_ForwardToDLQ_Success(t *testing.T) { is.NoErr(err) }() - for i := 0; i < n.WindowNackThreshold; i++ { + for i := uint64(0); i < n.WindowNackThreshold; i++ { msg := &Message{ Ctx: ctx, Record: opencdc.Record{Position: []byte(uuid.NewString())}, @@ -326,8 +326,8 @@ func TestDLQWindow_WindowDisabled(t *testing.T) { func TestDLQWindow_NackThresholdExceeded(t *testing.T) { testCases := []struct { - windowSize int - nackThreshold int + windowSize uint64 + nackThreshold uint64 }{ {1, 0}, {2, 0}, @@ -344,19 +344,19 @@ func TestDLQWindow_NackThresholdExceeded(t *testing.T) { w := newDLQWindow(tc.windowSize, tc.nackThreshold) // fill up window with nacks up to the threshold - for i := 0; i < tc.nackThreshold; i++ { + for i := uint64(0); i < tc.nackThreshold; i++ { ok := w.Nack() is.True(ok) } // fill up window again with acks - for i := 0; i < tc.windowSize; i++ { + for i := uint64(0); i < tc.windowSize; i++ { w.Ack() } // since window is full of acks we should be able to fill up // the window with nacks again - for i := 0; i < tc.nackThreshold; i++ { + for i := uint64(0); i < tc.nackThreshold; i++ { ok := w.Nack() is.True(ok) } @@ -367,7 +367,7 @@ func TestDLQWindow_NackThresholdExceeded(t *testing.T) { // adding acks after that should make no difference, all nacks // need to fail after the threshold is reached - for i := 0; i < tc.windowSize; i++ { + for i := uint64(0); i < tc.windowSize; i++ { w.Ack() } ok = w.Nack() diff --git a/pkg/pipeline/stream/parallel.go b/pkg/pipeline/stream/parallel.go index 965fd84d7..821b09789 100644 --- a/pkg/pipeline/stream/parallel.go +++ b/pkg/pipeline/stream/parallel.go @@ -32,8 +32,8 @@ type ParallelNode struct { Name string // NewNode is the constructor of the wrapped PubSubNode, it should create // the i-th node (useful for distinguishing nodes in logs). - NewNode func(i int) PubSubNode - Workers int + NewNode func(i uint64) PubSubNode + Workers uint64 base pubSubNodeBase logger log.CtxLogger @@ -73,7 +73,7 @@ func (n *ParallelNode) Run(ctx context.Context) error { // buffered, so it blocks when all workers are busy workerJobs := make(chan parallelNodeJob) var workerWg sync.WaitGroup - for i := 0; i < n.Workers; i++ { + for i := uint64(0); i < n.Workers; i++ { node := n.NewNode(i) worker := newParallelNodeWorker(node, workerJobs, n.logger) workerWg.Add(1) diff --git a/pkg/pipeline/stream/parallel_test.go b/pkg/pipeline/stream/parallel_test.go index 09db1ebc7..a4a7fbf42 100644 --- a/pkg/pipeline/stream/parallel_test.go +++ b/pkg/pipeline/stream/parallel_test.go @@ -343,7 +343,7 @@ func TestParallelNode_Success(t *testing.T) { const workerCount = 10 - newPubSubNode := func(i int) PubSubNode { + newPubSubNode := func(i uint64) PubSubNode { return ¶llelTestNode{ Name: fmt.Sprintf("test-node-%d", i), F: func(ctx context.Context, sub <-chan *Message, pub chan<- *Message) error { @@ -422,7 +422,7 @@ func TestParallelNode_ErrorAll(t *testing.T) { const workerCount = 10 - newPubSubNode := func(i int) PubSubNode { + newPubSubNode := func(i uint64) PubSubNode { name := fmt.Sprintf("test-node-%d", i) return ¶llelTestNode{ Name: name, @@ -483,7 +483,7 @@ func TestParallelNode_ErrorSingle(t *testing.T) { const workerCount = 10 - newPubSubNode := func(i int) PubSubNode { + newPubSubNode := func(i uint64) PubSubNode { name := fmt.Sprintf("test-node-%d", i) return ¶llelTestNode{ Name: name, @@ -585,7 +585,7 @@ func TestParallelNode_Processor(t *testing.T) { Teardown(gomock.Any()). Times(workerCount) - newProcNode := func(i int) PubSubNode { + newProcNode := func(i uint64) PubSubNode { return &ProcessorNode{ Name: fmt.Sprintf("test-%d", i), Processor: proc, diff --git a/pkg/plugin/processor/standalone/host_module.go b/pkg/plugin/processor/standalone/host_module.go index caa21a9b1..339474522 100644 --- a/pkg/plugin/processor/standalone/host_module.go +++ b/pkg/plugin/processor/standalone/host_module.go @@ -106,7 +106,7 @@ func (*hostModuleInstance) Close(context.Context) error { return nil } // message. If the buffer is too small, it returns the size of the command // request message and parks the command request. The next call to this function // will return the same command request. -func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes) types.Uint32 { +func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes) types.Uint64 { m.logger.Trace(ctx).Msg("executing command_request") if m.parkedCommandRequest == nil { @@ -126,7 +126,7 @@ func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes Int("command_bytes", size). Int("allocated_bytes", len(buf)). Msgf("insufficient memory, command will be parked until next call to command_request") - return types.Uint32(size) + return types.Uint64(size) } // If the buffer is large enough, we marshal the command into the buffer and @@ -140,7 +140,7 @@ func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes m.parkedCommandRequest = nil m.logger.Trace(ctx).Msg("returning next command") - return types.Uint32(len(out)) + return types.Uint64(len(out)) } // commandResponse is the exported function that is called by the WASM module to diff --git a/pkg/processor/instance.go b/pkg/processor/instance.go index cce408b2b..d2d9e3f9e 100644 --- a/pkg/processor/instance.go +++ b/pkg/processor/instance.go @@ -94,5 +94,5 @@ type Parent struct { // Config holds configuration data for building a processor. type Config struct { Settings map[string]string - Workers int + Workers uint64 } diff --git a/pkg/processor/service.go b/pkg/processor/service.go index e0ab16ee8..cee87320e 100644 --- a/pkg/processor/service.go +++ b/pkg/processor/service.go @@ -119,9 +119,6 @@ func (s *Service) Create( pt ProvisionType, cond string, ) (*Instance, error) { - if cfg.Workers < 0 { - return nil, cerrors.New("processor workers can't be negative") - } if cfg.Workers == 0 { cfg.Workers = 1 } diff --git a/pkg/processor/service_test.go b/pkg/processor/service_test.go index cc2e69220..873d89c9b 100644 --- a/pkg/processor/service_test.go +++ b/pkg/processor/service_test.go @@ -197,27 +197,6 @@ func TestService_Create_BuilderFail(t *testing.T) { is.Equal(i, nil) } -func TestService_Create_WorkersNegative(t *testing.T) { - is := is.New(t) - ctx := context.Background() - db := &inmemory.DB{} - - service := NewService(log.Nop(), db, &proc_plugin.PluginService{}) - - got, err := service.Create( - ctx, - uuid.NewString(), - "processor-type", - Parent{}, - Config{Workers: -1}, - ProvisionTypeAPI, - "{{true}}", - ) - is.True(err != nil) // expected workers error - is.Equal("processor workers can't be negative", err.Error()) - is.Equal(got, nil) -} - func TestService_Delete_Success(t *testing.T) { is := is.New(t) ctx := context.Background() diff --git a/pkg/provisioning/config/parser.go b/pkg/provisioning/config/parser.go index b6cf10d72..30c421d7a 100644 --- a/pkg/provisioning/config/parser.go +++ b/pkg/provisioning/config/parser.go @@ -42,15 +42,15 @@ type Processor struct { ID string Plugin string Settings map[string]string - Workers int + Workers uint64 Condition string } type DLQ struct { Plugin string Settings map[string]string - WindowSize *int - WindowNackThreshold *int + WindowSize *uint64 + WindowNackThreshold *uint64 } // Classify fields as immutable, mutable or ignored. This is used by the diff --git a/pkg/provisioning/config/validate.go b/pkg/provisioning/config/validate.go index fc357e9fb..cf4a68091 100644 --- a/pkg/provisioning/config/validate.go +++ b/pkg/provisioning/config/validate.go @@ -97,9 +97,6 @@ func validateProcessors(mp []Processor) []error { if cfg.Plugin == "" { errs = append(errs, cerrors.Errorf("processor %q: \"plugin\" is mandatory: %w", cfg.ID, ErrMandatoryField)) } - if cfg.Workers < 0 { - errs = append(errs, cerrors.Errorf("processor %q: \"workers\" can't be negative: %w", cfg.ID, ErrInvalidField)) - } if ids[cfg.ID] { errs = append(errs, cerrors.Errorf("processor %q: \"id\" must be unique: %w", cfg.ID, ErrDuplicateID)) } diff --git a/pkg/provisioning/config/validate_test.go b/pkg/provisioning/config/validate_test.go index 40bcfb6e7..7ed34a1b4 100644 --- a/pkg/provisioning/config/validate_test.go +++ b/pkg/provisioning/config/validate_test.go @@ -209,22 +209,6 @@ func TestValidator_InvalidFields(t *testing.T) { }}, }, wantErr: ErrInvalidField, - }, { - name: "processor workers is negative", - config: Pipeline{ - ID: "pipeline1", - Status: "running", - Name: "pipeline1", - Description: "desc1", - Processors: []Processor{{ - ID: "proc1", - Plugin: "js", - Settings: map[string]string{}, - // invalid field - Workers: -1, - }}, - }, - wantErr: ErrInvalidField, }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/provisioning/config/yaml/parser_test.go b/pkg/provisioning/config/yaml/parser_test.go index 9852c3eb5..bf5f851da 100644 --- a/pkg/provisioning/config/yaml/parser_test.go +++ b/pkg/provisioning/config/yaml/parser_test.go @@ -35,7 +35,8 @@ func TestParser_V1_Success(t *testing.T) { is := is.New(t) parser := NewParser(log.Nop()) filepath := "./v1/testdata/pipelines1-success.yml" - intPtr := func(i int) *int { return &i } + + uint64Ptr := func(i uint64) *uint64 { return &i } want := Configurations{ v1.Configuration{ Version: "1.0", @@ -78,8 +79,8 @@ func TestParser_V1_Success(t *testing.T) { Settings: map[string]string{ "foo": "bar", }, - WindowSize: intPtr(4), - WindowNackThreshold: intPtr(2), + WindowSize: uint64Ptr(4), + WindowNackThreshold: uint64Ptr(2), }, }, }, @@ -274,7 +275,8 @@ func TestParser_V2_Success(t *testing.T) { is := is.New(t) parser := NewParser(log.Nop()) filepath := "./v2/testdata/pipelines1-success.yml" - intPtr := func(i int) *int { return &i } + + uint64Ptr := func(i uint64) *uint64 { return &i } want := Configurations{ v2.Configuration{ Version: "2.2", @@ -321,8 +323,8 @@ func TestParser_V2_Success(t *testing.T) { Settings: map[string]string{ "foo": "bar", }, - WindowSize: intPtr(4), - WindowNackThreshold: intPtr(2), + WindowSize: uint64Ptr(4), + WindowNackThreshold: uint64Ptr(2), }, }, }, diff --git a/pkg/provisioning/config/yaml/v1/model.go b/pkg/provisioning/config/yaml/v1/model.go index 429d52a75..ffd3aff5f 100644 --- a/pkg/provisioning/config/yaml/v1/model.go +++ b/pkg/provisioning/config/yaml/v1/model.go @@ -68,14 +68,14 @@ type Connector struct { type Processor struct { Type string `yaml:"type"` Settings map[string]string `yaml:"settings"` - Workers int `yaml:"workers"` + Workers uint64 `yaml:"workers"` } type DLQ struct { Plugin string `yaml:"plugin"` Settings map[string]string `yaml:"settings"` - WindowSize *int `yaml:"window-size"` - WindowNackThreshold *int `yaml:"window-nack-threshold"` + WindowSize *uint64 + WindowNackThreshold *uint64 `yaml:"window-nack-threshold"` } func (c Configuration) ToConfig() []config.Pipeline { diff --git a/pkg/provisioning/config/yaml/v2/config_test.go b/pkg/provisioning/config/yaml/v2/config_test.go index b0b90aa38..349f5e03e 100644 --- a/pkg/provisioning/config/yaml/v2/config_test.go +++ b/pkg/provisioning/config/yaml/v2/config_test.go @@ -34,7 +34,7 @@ func TestConfiguration_FromConfig(t *testing.T) { } func testPipelineConfigs() []config.Pipeline { - intPtr := func(i int) *int { return &i } + uint64Ptr := func(i uint64) *uint64 { return &i } return []config.Pipeline{ { @@ -88,8 +88,8 @@ func testPipelineConfigs() []config.Pipeline { "level": "error", "message": "record delivery failed", }, - WindowSize: intPtr(4), - WindowNackThreshold: intPtr(2), + WindowSize: uint64Ptr(4), + WindowNackThreshold: uint64Ptr(2), }, }, { @@ -124,7 +124,7 @@ func testPipelineConfigs() []config.Pipeline { } func expectedModelConfiguration() Configuration { - intPtr := func(i int) *int { return &i } + uint64Ptr := func(i uint64) *uint64 { return &i } return Configuration{ Version: "2.2", @@ -181,8 +181,8 @@ func expectedModelConfiguration() Configuration { "level": "error", "message": "record delivery failed", }, - WindowSize: intPtr(4), - WindowNackThreshold: intPtr(2), + WindowSize: uint64Ptr(4), + WindowNackThreshold: uint64Ptr(2), }, }, { diff --git a/pkg/provisioning/config/yaml/v2/model.go b/pkg/provisioning/config/yaml/v2/model.go index 696585201..a4032484e 100644 --- a/pkg/provisioning/config/yaml/v2/model.go +++ b/pkg/provisioning/config/yaml/v2/model.go @@ -94,14 +94,14 @@ type Processor struct { Plugin string `yaml:"plugin" json:"plugin"` Condition string `yaml:"condition" json:"condition"` Settings map[string]string `yaml:"settings" json:"settings"` - Workers int `yaml:"workers" json:"workers"` + Workers uint64 `yaml:"workers" json:"workers"` } type DLQ struct { Plugin string `yaml:"plugin" json:"plugin"` Settings map[string]string `yaml:"settings" json:"settings"` - WindowSize *int `yaml:"window-size" json:"window-size"` - WindowNackThreshold *int `yaml:"window-nack-threshold" json:"window-nack-threshold"` + WindowSize *uint64 `yaml:"window-size" json:"window-size"` + WindowNackThreshold *uint64 `yaml:"window-nack-threshold" json:"window-nack-threshold"` } func (c Configuration) ToConfig() []config.Pipeline { diff --git a/pkg/provisioning/config/yaml/v2/model_test.go b/pkg/provisioning/config/yaml/v2/model_test.go index f9406b8ff..9d8c23254 100644 --- a/pkg/provisioning/config/yaml/v2/model_test.go +++ b/pkg/provisioning/config/yaml/v2/model_test.go @@ -23,7 +23,7 @@ import ( func TestConfiguration_JSON(t *testing.T) { is := is.New(t) - intPtr := func(i int) *int { return &i } + uint64Ptr := func(i uint64) *uint64 { return &i } p := Pipeline{ ID: "pipeline1", Status: "running", @@ -67,8 +67,8 @@ func TestConfiguration_JSON(t *testing.T) { Settings: map[string]string{ "foo": "bar", }, - WindowSize: intPtr(4), - WindowNackThreshold: intPtr(2), + WindowSize: uint64Ptr(4), + WindowNackThreshold: uint64Ptr(2), }, } diff --git a/pkg/provisioning/import_actions_test.go b/pkg/provisioning/import_actions_test.go index 7cd60597b..f165a5218 100644 --- a/pkg/provisioning/import_actions_test.go +++ b/pkg/provisioning/import_actions_test.go @@ -42,8 +42,8 @@ func TestCreatePipelineAction_Do(t *testing.T) { DLQ: config.DLQ{ Plugin: "dlq-plugin", Settings: map[string]string{"foo": "bar"}, - WindowSize: intPtr(1), - WindowNackThreshold: intPtr(2), + WindowSize: uint64Ptr(1), + WindowNackThreshold: uint64Ptr(2), }, } wantCfg := pipeline.Config{ @@ -87,8 +87,8 @@ func TestCreatePipelineAction_Rollback(t *testing.T) { DLQ: config.DLQ{ Plugin: "dlq-plugin", Settings: map[string]string{"foo": "bar"}, - WindowSize: intPtr(1), - WindowNackThreshold: intPtr(2), + WindowSize: uint64Ptr(1), + WindowNackThreshold: uint64Ptr(2), }, } @@ -113,8 +113,8 @@ func TestUpdatePipelineAction(t *testing.T) { DLQ: config.DLQ{ Plugin: "dlq-plugin", Settings: map[string]string{"foo": "bar"}, - WindowSize: intPtr(1), - WindowNackThreshold: intPtr(2), + WindowSize: uint64Ptr(1), + WindowNackThreshold: uint64Ptr(2), }, } @@ -198,8 +198,8 @@ func TestDeletePipelineAction_Do(t *testing.T) { DLQ: config.DLQ{ Plugin: "dlq-plugin", Settings: map[string]string{"foo": "bar"}, - WindowSize: intPtr(1), - WindowNackThreshold: intPtr(2), + WindowSize: uint64Ptr(1), + WindowNackThreshold: uint64Ptr(2), }, } @@ -228,8 +228,8 @@ func TestDeletePipelineAction_Rollback(t *testing.T) { DLQ: config.DLQ{ Plugin: "dlq-plugin", Settings: map[string]string{"foo": "bar"}, - WindowSize: intPtr(1), - WindowNackThreshold: intPtr(2), + WindowSize: uint64Ptr(1), + WindowNackThreshold: uint64Ptr(2), }, } wantCfg := pipeline.Config{ diff --git a/pkg/provisioning/import_test.go b/pkg/provisioning/import_test.go index b32603b8f..22f20d91d 100644 --- a/pkg/provisioning/import_test.go +++ b/pkg/provisioning/import_test.go @@ -806,7 +806,7 @@ func TestActionsBuilder_PrepareProcessorActions_Recreate(t *testing.T) { // -- HELPERS -- // ------------- -func intPtr(i int) *int { return &i } +func uint64Ptr(i uint64) *uint64 { return &i } func newTestService(ctrl *gomock.Controller, logger log.CtxLogger) (*Service, *mock.PipelineService, *mock.ConnectorService, *mock.ProcessorService, *mock.ConnectorPluginService) { db := &inmemory.DB{} diff --git a/pkg/web/api/fromproto/pipeline.go b/pkg/web/api/fromproto/pipeline.go index 45556da30..ac2bc61e9 100644 --- a/pkg/web/api/fromproto/pipeline.go +++ b/pkg/web/api/fromproto/pipeline.go @@ -36,7 +36,7 @@ func PipelineDLQ(in *apiv1.Pipeline_DLQ) pipeline.DLQ { return pipeline.DLQ{ Plugin: in.Plugin, Settings: in.Settings, - WindowSize: int(in.WindowSize), - WindowNackThreshold: int(in.WindowNackThreshold), + WindowSize: in.WindowSize, + WindowNackThreshold: in.WindowNackThreshold, } } diff --git a/pkg/web/api/fromproto/processor.go b/pkg/web/api/fromproto/processor.go index 042af2e52..74137f150 100644 --- a/pkg/web/api/fromproto/processor.go +++ b/pkg/web/api/fromproto/processor.go @@ -32,7 +32,7 @@ func ProcessorConfig(in *apiv1.Processor_Config) processor.Config { } return processor.Config{ Settings: in.Settings, - Workers: int(in.Workers), + Workers: in.Workers, } } diff --git a/pkg/web/api/toproto/connector.go b/pkg/web/api/toproto/connector.go index 589c36c70..a7cd8dda0 100644 --- a/pkg/web/api/toproto/connector.go +++ b/pkg/web/api/toproto/connector.go @@ -57,7 +57,7 @@ func ConnectorConfig(in connector.Config) *apiv1.Connector_Config { } func ConnectorType(in connector.Type) apiv1.Connector_Type { - return apiv1.Connector_Type(in) + return apiv1.Connector_Type(in) //nolint:gosec // this is deprecated } func ConnectorDestinationState(in connector.DestinationState) *apiv1.Connector_DestinationState_ { diff --git a/pkg/web/api/toproto/pipeline.go b/pkg/web/api/toproto/pipeline.go index d324b91f5..69c99f85e 100644 --- a/pkg/web/api/toproto/pipeline.go +++ b/pkg/web/api/toproto/pipeline.go @@ -62,7 +62,7 @@ func PipelineDLQ(in pipeline.DLQ) *apiv1.Pipeline_DLQ { return &apiv1.Pipeline_DLQ{ Plugin: in.Plugin, Settings: in.Settings, - WindowSize: uint64(in.WindowSize), - WindowNackThreshold: uint64(in.WindowNackThreshold), + WindowSize: in.WindowSize, + WindowNackThreshold: in.WindowNackThreshold, } } diff --git a/pkg/web/api/toproto/plugin.go b/pkg/web/api/toproto/plugin.go index 5274d3cff..cc0b3ff93 100644 --- a/pkg/web/api/toproto/plugin.go +++ b/pkg/web/api/toproto/plugin.go @@ -60,7 +60,7 @@ func PluginParamsMap(in map[string]config.Parameter) map[string]*apiv1.PluginSpe out[k] = &apiv1.PluginSpecifications_Parameter{ Description: v.Description, Default: v.Default, - Type: apiv1.PluginSpecifications_Parameter_Type(v.Type), + Type: apiv1.PluginSpecifications_Parameter_Type(v.Type), //nolint:gosec // this is deprecated Validations: PluginParamValidations(v.Validations), } } diff --git a/pkg/web/api/toproto/processor.go b/pkg/web/api/toproto/processor.go index 0f40d20c7..736bcf139 100644 --- a/pkg/web/api/toproto/processor.go +++ b/pkg/web/api/toproto/processor.go @@ -42,7 +42,7 @@ func Processor(in *processor.Instance) *apiv1.Processor { func ProcessorConfig(in processor.Config) *apiv1.Processor_Config { return &apiv1.Processor_Config{ Settings: in.Settings, - Workers: int32(in.Workers), + Workers: in.Workers, } } diff --git a/pkg/web/openapi/swagger-ui/api/v1/api.swagger.json b/pkg/web/openapi/swagger-ui/api/v1/api.swagger.json index 128a2ddb3..55b1404ee 100644 --- a/pkg/web/openapi/swagger-ui/api/v1/api.swagger.json +++ b/pkg/web/openapi/swagger-ui/api/v1/api.swagger.json @@ -2316,8 +2316,8 @@ } }, "workers": { - "type": "integer", - "format": "int32" + "type": "string", + "format": "uint64" } } }, diff --git a/proto/api/v1/api.pb.go b/proto/api/v1/api.pb.go index 6764febaf..297704573 100644 --- a/proto/api/v1/api.pb.go +++ b/proto/api/v1/api.pb.go @@ -4236,7 +4236,7 @@ type Processor_Config struct { unknownFields protoimpl.UnknownFields Settings map[string]string `protobuf:"bytes,1,rep,name=settings,proto3" json:"settings,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - Workers int32 `protobuf:"varint,2,opt,name=workers,proto3" json:"workers,omitempty"` + Workers uint64 `protobuf:"varint,2,opt,name=workers,proto3" json:"workers,omitempty"` } func (x *Processor_Config) Reset() { @@ -4278,7 +4278,7 @@ func (x *Processor_Config) GetSettings() map[string]string { return nil } -func (x *Processor_Config) GetWorkers() int32 { +func (x *Processor_Config) GetWorkers() uint64 { if x != nil { return x.Workers } @@ -4582,7 +4582,7 @@ var file_api_v1_api_proto_rawDesc = []byte{ 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x53, 0x65, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x73, 0x65, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x73, 0x12, 0x18, - 0x0a, 0x07, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x0a, 0x07, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x1a, 0x3b, 0x0a, 0x0d, 0x53, 0x65, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, diff --git a/proto/api/v1/api.proto b/proto/api/v1/api.proto index a04196c7e..fd830d36d 100644 --- a/proto/api/v1/api.proto +++ b/proto/api/v1/api.proto @@ -154,7 +154,7 @@ message Processor { } message Config { map settings = 1; - int32 workers = 2; + uint64 workers = 2; } string id = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; diff --git a/proto/api/v1/api.swagger.json b/proto/api/v1/api.swagger.json index 128a2ddb3..55b1404ee 100644 --- a/proto/api/v1/api.swagger.json +++ b/proto/api/v1/api.swagger.json @@ -2316,8 +2316,8 @@ } }, "workers": { - "type": "integer", - "format": "int32" + "type": "string", + "format": "uint64" } } },