Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gosec linting errors #1805

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ linters:
# inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint
disable-all: true
enable:
# We plan to enable all of the linters which are commented out.
# We plan to enable all the linters which are commented out.
# However, we want to enable them one by one (so we don't have to fix many issues at once).
- bodyclose
- depguard
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type DLQ struct {
Plugin string
Settings map[string]string

WindowSize int
WindowNackThreshold int
WindowSize uint64
WindowNackThreshold uint64
}

var DefaultDLQ = DLQ{
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,9 @@ func (s *Service) buildParallelProcessorNode(
) *stream.ParallelNode {
return &stream.ParallelNode{
Name: proc.ID + "-parallel",
NewNode: func(i int) stream.PubSubNode {
NewNode: func(i uint64) stream.PubSubNode {
n := s.buildProcessorNode(pl, proc)
n.Name = n.Name + "-" + strconv.Itoa(i) // add suffix to name
n.Name = n.Name + "-" + strconv.FormatUint(i, 10) // add suffix to name
return n
},
Workers: proc.Config.Workers,
Expand Down
6 changes: 0 additions & 6 deletions pkg/pipeline/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,6 @@ func (s *Service) UpdateDLQ(ctx context.Context, pipelineID string, cfg DLQ) (*I
if cfg.Plugin == "" {
return nil, cerrors.New("DLQ plugin must be provided")
}
if cfg.WindowSize < 0 {
return nil, cerrors.New("DLQ window size must be non-negative")
}
if cfg.WindowNackThreshold < 0 {
return nil, cerrors.New("DLQ window nack threshold must be non-negative")
}
if cfg.WindowSize > 0 && cfg.WindowSize <= cfg.WindowNackThreshold {
return nil, cerrors.New("DLQ window nack threshold must be lower than window size")
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/pipeline/stream/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type DLQHandlerNode struct {
Name string
Handler DLQHandler

WindowSize int
WindowNackThreshold int
WindowSize uint64
WindowNackThreshold uint64

Timer metrics.Timer
Histogram metrics.RecordBytesHistogram
Expand Down Expand Up @@ -212,13 +212,13 @@ type dlqWindow struct {
// nackThreshold represents the number of tolerated nacks, if the threshold
// is exceeded the window is frozen and returns an error for all further
// nacks.
nackThreshold int
nackThreshold uint64

ackCount int
nackCount int
ackCount uint64
nackCount uint64
}

func newDLQWindow(size, threshold int) *dlqWindow {
func newDLQWindow(size, threshold uint64) *dlqWindow {
if size > 0 && threshold == 0 {
// optimization - if threshold is 0 the window size does not matter,
// setting it to 1 ensures we don't use more memory than needed
Expand Down
14 changes: 7 additions & 7 deletions pkg/pipeline/stream/dlq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestDLQHandlerNode_Nack_ForwardToDLQ_Success(t *testing.T) {
is.NoErr(err)
}()

for i := 0; i < n.WindowNackThreshold; i++ {
for i := uint64(0); i < n.WindowNackThreshold; i++ {
msg := &Message{
Ctx: ctx,
Record: opencdc.Record{Position: []byte(uuid.NewString())},
Expand Down Expand Up @@ -326,8 +326,8 @@ func TestDLQWindow_WindowDisabled(t *testing.T) {

func TestDLQWindow_NackThresholdExceeded(t *testing.T) {
testCases := []struct {
windowSize int
nackThreshold int
windowSize uint64
nackThreshold uint64
}{
{1, 0},
{2, 0},
Expand All @@ -344,19 +344,19 @@ func TestDLQWindow_NackThresholdExceeded(t *testing.T) {
w := newDLQWindow(tc.windowSize, tc.nackThreshold)

// fill up window with nacks up to the threshold
for i := 0; i < tc.nackThreshold; i++ {
for i := uint64(0); i < tc.nackThreshold; i++ {
ok := w.Nack()
is.True(ok)
}

// fill up window again with acks
for i := 0; i < tc.windowSize; i++ {
for i := uint64(0); i < tc.windowSize; i++ {
w.Ack()
}

// since window is full of acks we should be able to fill up
// the window with nacks again
for i := 0; i < tc.nackThreshold; i++ {
for i := uint64(0); i < tc.nackThreshold; i++ {
ok := w.Nack()
is.True(ok)
}
Expand All @@ -367,7 +367,7 @@ func TestDLQWindow_NackThresholdExceeded(t *testing.T) {

// adding acks after that should make no difference, all nacks
// need to fail after the threshold is reached
for i := 0; i < tc.windowSize; i++ {
for i := uint64(0); i < tc.windowSize; i++ {
w.Ack()
}
ok = w.Nack()
Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/stream/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type ParallelNode struct {
Name string
// NewNode is the constructor of the wrapped PubSubNode, it should create
// the i-th node (useful for distinguishing nodes in logs).
NewNode func(i int) PubSubNode
Workers int
NewNode func(i uint64) PubSubNode
Workers uint64

base pubSubNodeBase
logger log.CtxLogger
Expand Down Expand Up @@ -73,7 +73,7 @@ func (n *ParallelNode) Run(ctx context.Context) error {
// buffered, so it blocks when all workers are busy
workerJobs := make(chan parallelNodeJob)
var workerWg sync.WaitGroup
for i := 0; i < n.Workers; i++ {
for i := uint64(0); i < n.Workers; i++ {
node := n.NewNode(i)
worker := newParallelNodeWorker(node, workerJobs, n.logger)
workerWg.Add(1)
Expand Down
8 changes: 4 additions & 4 deletions pkg/pipeline/stream/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestParallelNode_Success(t *testing.T) {

const workerCount = 10

newPubSubNode := func(i int) PubSubNode {
newPubSubNode := func(i uint64) PubSubNode {
return &parallelTestNode{
Name: fmt.Sprintf("test-node-%d", i),
F: func(ctx context.Context, sub <-chan *Message, pub chan<- *Message) error {
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestParallelNode_ErrorAll(t *testing.T) {

const workerCount = 10

newPubSubNode := func(i int) PubSubNode {
newPubSubNode := func(i uint64) PubSubNode {
name := fmt.Sprintf("test-node-%d", i)
return &parallelTestNode{
Name: name,
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestParallelNode_ErrorSingle(t *testing.T) {

const workerCount = 10

newPubSubNode := func(i int) PubSubNode {
newPubSubNode := func(i uint64) PubSubNode {
name := fmt.Sprintf("test-node-%d", i)
return &parallelTestNode{
Name: name,
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestParallelNode_Processor(t *testing.T) {
Teardown(gomock.Any()).
Times(workerCount)

newProcNode := func(i int) PubSubNode {
newProcNode := func(i uint64) PubSubNode {
return &ProcessorNode{
Name: fmt.Sprintf("test-%d", i),
Processor: proc,
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugin/processor/standalone/host_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
// message. If the buffer is too small, it returns the size of the command
// request message and parks the command request. The next call to this function
// will return the same command request.
func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes) types.Uint32 {
func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes) types.Uint64 {
m.logger.Trace(ctx).Msg("executing command_request")

if m.parkedCommandRequest == nil {
Expand All @@ -126,7 +126,7 @@
Int("command_bytes", size).
Int("allocated_bytes", len(buf)).
Msgf("insufficient memory, command will be parked until next call to command_request")
return types.Uint32(size)
return types.Uint64(size)
}

// If the buffer is large enough, we marshal the command into the buffer and
Expand All @@ -140,7 +140,7 @@
m.parkedCommandRequest = nil

m.logger.Trace(ctx).Msg("returning next command")
return types.Uint32(len(out))
return types.Uint64(len(out))
}

// commandResponse is the exported function that is called by the WASM module to
Expand Down Expand Up @@ -202,7 +202,7 @@
Int("response_bytes", size).
Int("allocated_bytes", len(buf)).
Msgf("insufficient memory, response will be parked until next call to %s", serviceMethod)
return types.Uint32(size)

Check failure on line 205 in pkg/plugin/processor/standalone/host_module.go

View workflow job for this annotation

GitHub Actions / golangci-lint

G115: integer overflow conversion int -> uint32 (gosec)
}

out, err := proto.MarshalOptions{}.MarshalAppend(buf[:0], parkedResponse)
Expand All @@ -214,7 +214,7 @@
m.lastRequestBytes[serviceMethod] = nil
m.parkedResponses[serviceMethod] = nil

return types.Uint32(len(out))

Check failure on line 217 in pkg/plugin/processor/standalone/host_module.go

View workflow job for this annotation

GitHub Actions / golangci-lint

G115: integer overflow conversion int -> uint32 (gosec)
}

// createSchema is the exported function that is called by the WASM module to
Expand Down
2 changes: 1 addition & 1 deletion pkg/processor/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,5 @@ type Parent struct {
// Config holds configuration data for building a processor.
type Config struct {
Settings map[string]string
Workers int
Workers uint64
}
3 changes: 0 additions & 3 deletions pkg/processor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ func (s *Service) Create(
pt ProvisionType,
cond string,
) (*Instance, error) {
if cfg.Workers < 0 {
return nil, cerrors.New("processor workers can't be negative")
}
if cfg.Workers == 0 {
cfg.Workers = 1
}
Expand Down
21 changes: 0 additions & 21 deletions pkg/processor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,27 +197,6 @@ func TestService_Create_BuilderFail(t *testing.T) {
is.Equal(i, nil)
}

func TestService_Create_WorkersNegative(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}

service := NewService(log.Nop(), db, &proc_plugin.PluginService{})

got, err := service.Create(
ctx,
uuid.NewString(),
"processor-type",
Parent{},
Config{Workers: -1},
ProvisionTypeAPI,
"{{true}}",
)
is.True(err != nil) // expected workers error
is.Equal("processor workers can't be negative", err.Error())
is.Equal(got, nil)
}

func TestService_Delete_Success(t *testing.T) {
is := is.New(t)
ctx := context.Background()
Expand Down
6 changes: 3 additions & 3 deletions pkg/provisioning/config/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ type Processor struct {
ID string
Plugin string
Settings map[string]string
Workers int
Workers uint64
Condition string
}

type DLQ struct {
Plugin string
Settings map[string]string
WindowSize *int
WindowNackThreshold *int
WindowSize *uint64
WindowNackThreshold *uint64
}

// Classify fields as immutable, mutable or ignored. This is used by the
Expand Down
3 changes: 0 additions & 3 deletions pkg/provisioning/config/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ func validateProcessors(mp []Processor) []error {
if cfg.Plugin == "" {
errs = append(errs, cerrors.Errorf("processor %q: \"plugin\" is mandatory: %w", cfg.ID, ErrMandatoryField))
}
if cfg.Workers < 0 {
errs = append(errs, cerrors.Errorf("processor %q: \"workers\" can't be negative: %w", cfg.ID, ErrInvalidField))
}
if ids[cfg.ID] {
errs = append(errs, cerrors.Errorf("processor %q: \"id\" must be unique: %w", cfg.ID, ErrDuplicateID))
}
Expand Down
16 changes: 0 additions & 16 deletions pkg/provisioning/config/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,22 +209,6 @@ func TestValidator_InvalidFields(t *testing.T) {
}},
},
wantErr: ErrInvalidField,
}, {
name: "processor workers is negative",
config: Pipeline{
ID: "pipeline1",
Status: "running",
Name: "pipeline1",
Description: "desc1",
Processors: []Processor{{
ID: "proc1",
Plugin: "js",
Settings: map[string]string{},
// invalid field
Workers: -1,
}},
},
wantErr: ErrInvalidField,
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
14 changes: 8 additions & 6 deletions pkg/provisioning/config/yaml/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func TestParser_V1_Success(t *testing.T) {
is := is.New(t)
parser := NewParser(log.Nop())
filepath := "./v1/testdata/pipelines1-success.yml"
intPtr := func(i int) *int { return &i }

uint64Ptr := func(i uint64) *uint64 { return &i }
want := Configurations{
v1.Configuration{
Version: "1.0",
Expand Down Expand Up @@ -78,8 +79,8 @@ func TestParser_V1_Success(t *testing.T) {
Settings: map[string]string{
"foo": "bar",
},
WindowSize: intPtr(4),
WindowNackThreshold: intPtr(2),
WindowSize: uint64Ptr(4),
WindowNackThreshold: uint64Ptr(2),
},
},
},
Expand Down Expand Up @@ -274,7 +275,8 @@ func TestParser_V2_Success(t *testing.T) {
is := is.New(t)
parser := NewParser(log.Nop())
filepath := "./v2/testdata/pipelines1-success.yml"
intPtr := func(i int) *int { return &i }

uint64Ptr := func(i uint64) *uint64 { return &i }
want := Configurations{
v2.Configuration{
Version: "2.2",
Expand Down Expand Up @@ -321,8 +323,8 @@ func TestParser_V2_Success(t *testing.T) {
Settings: map[string]string{
"foo": "bar",
},
WindowSize: intPtr(4),
WindowNackThreshold: intPtr(2),
WindowSize: uint64Ptr(4),
WindowNackThreshold: uint64Ptr(2),
},
},
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/provisioning/config/yaml/v1/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ type Connector struct {
type Processor struct {
Type string `yaml:"type"`
Settings map[string]string `yaml:"settings"`
Workers int `yaml:"workers"`
Workers uint64 `yaml:"workers"`
}

type DLQ struct {
Plugin string `yaml:"plugin"`
Settings map[string]string `yaml:"settings"`
WindowSize *int `yaml:"window-size"`
WindowNackThreshold *int `yaml:"window-nack-threshold"`
WindowSize *uint64
WindowNackThreshold *uint64 `yaml:"window-nack-threshold"`
}

func (c Configuration) ToConfig() []config.Pipeline {
Expand Down
Loading
Loading