Skip to content

Commit

Permalink
conditional processor execution (#1349)
Browse files Browse the repository at this point in the history
* add periods as allowed ids + fix http log

* add "condition" field to processor + bump pipeline config file minor version

* generate swagger

* add docs for "condition" + generate files

* make generate

* generate swagger

* generate swagger 2

* make proto-generate

* update workflow buf version

* regenrate swagger

* log diff

* regenerate

* add line break

* delete swagger + regenerate

* regenerate

---------

Co-authored-by: Haris Osmanagic <[email protected]>
  • Loading branch information
maha-hajja and hariso authored Jan 26, 2024
1 parent 7c390fb commit 2c3dfaf
Show file tree
Hide file tree
Showing 25 changed files with 855 additions and 767 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/validate-generated-files.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ jobs:
uses: arduino/setup-protoc@v2

- name: Set up Buf
uses: bufbuild/buf-setup-action@v1
uses: bufbuild/buf-setup-action@v1.29.0

- name: Check generated files
run: |
export PATH=$PATH:$(go env GOPATH)/bin
make install-tools
make generate
make proto-generate
git diff
git diff --exit-code --numstat
8 changes: 4 additions & 4 deletions pkg/orchestrator/mock/orchestrator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type ConnectorService interface {
type ProcessorService interface {
List(ctx context.Context) map[string]*processor.Instance
Get(ctx context.Context, id string) (*processor.Instance, error)
Create(ctx context.Context, id string, procType string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType) (*processor.Instance, error)
Create(ctx context.Context, id string, procType string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType, condition string) (*processor.Instance, error)
Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error)
Delete(ctx context.Context, id string) error
}
Expand Down
1 change: 1 addition & 0 deletions pkg/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func TestPipelineSimple(t *testing.T) {
Type: processor.ParentTypePipeline,
},
processor.Config{},
"",
)
is.NoErr(err)

Expand Down
5 changes: 3 additions & 2 deletions pkg/orchestrator/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (p *ProcessorOrchestrator) Create(
procType string,
parent processor.Parent,
cfg processor.Config,
cond string,
) (*processor.Instance, error) {
var r rollback.R
defer r.MustExecute()
Expand All @@ -57,7 +58,7 @@ func (p *ProcessorOrchestrator) Create(
}

// create processor and add to pipeline or connector
proc, err := p.processors.Create(ctx, uuid.NewString(), procType, parent, cfg, processor.ProvisionTypeAPI)
proc, err := p.processors.Create(ctx, uuid.NewString(), procType, parent, cfg, processor.ProvisionTypeAPI, cond)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -212,7 +213,7 @@ func (p *ProcessorOrchestrator) Delete(ctx context.Context, id string) error {
return err
}
r.Append(func() error {
_, err = p.processors.Create(ctx, id, proc.Type, proc.Parent, proc.Config, processor.ProvisionTypeAPI)
_, err = p.processors.Create(ctx, id, proc.Type, proc.Parent, proc.Config, processor.ProvisionTypeAPI, proc.Condition)
return err
})

Expand Down
27 changes: 19 additions & 8 deletions pkg/orchestrator/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestProcessorOrchestrator_CreateOnPipeline_Success(t *testing.T) {
Config: processor.Config{
Settings: map[string]string{"foo": "bar"},
},
Condition: "{{ true }}",
}

plsMock.EXPECT().
Expand All @@ -63,14 +64,15 @@ func TestProcessorOrchestrator_CreateOnPipeline_Success(t *testing.T) {
want.Parent,
want.Config,
processor.ProvisionTypeAPI,
want.Condition,
).
Return(want, nil)
plsMock.EXPECT().
AddProcessor(gomock.AssignableToTypeOf(ctxType), pl.ID, want.ID).
Return(pl, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, want.Type, want.Parent, want.Config)
got, err := orc.Processors.Create(ctx, want.Type, want.Parent, want.Config, want.Condition)
is.NoErr(err)
is.Equal(want, got)
}
Expand All @@ -92,7 +94,7 @@ func TestProcessorOrchestrator_CreateOnPipeline_PipelineNotExist(t *testing.T) {
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{})
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{}, "")
is.True(err != nil)
is.True(cerrors.Is(err, wantErr)) // errors did not match
is.True(got == nil)
Expand All @@ -118,7 +120,7 @@ func TestProcessorOrchestrator_CreateOnPipeline_PipelineRunning(t *testing.T) {
Return(pl, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{})
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{}, "")
is.True(err != nil)
is.Equal(pipeline.ErrPipelineRunning, err)
is.True(got == nil)
Expand All @@ -145,7 +147,7 @@ func TestProcessorOrchestrator_CreateOnPipeline_PipelineProvisionedByConfig(t *t
Return(pl, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{})
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{}, "")
is.Equal(got, nil)
is.True(err != nil)
is.True(cerrors.Is(err, ErrImmutableProvisionedByConfig)) // expected ErrImmutableProvisionedByConfig
Expand Down Expand Up @@ -179,11 +181,12 @@ func TestProcessorOrchestrator_CreateOnPipeline_CreateProcessorError(t *testing.
parent,
processor.Config{},
processor.ProvisionTypeAPI,
"",
).
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{})
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{}, "")
is.True(err != nil)
is.True(cerrors.Is(err, wantErr)) // errors did not match
is.True(got == nil)
Expand All @@ -210,6 +213,7 @@ func TestProcessorOrchestrator_CreateOnPipeline_AddProcessorError(t *testing.T)
Config: processor.Config{
Settings: map[string]string{"foo": "bar"},
},
Condition: "{{ true }}",
}
wantErr := cerrors.New("test error")

Expand All @@ -224,6 +228,7 @@ func TestProcessorOrchestrator_CreateOnPipeline_AddProcessorError(t *testing.T)
proc.Parent,
proc.Config,
processor.ProvisionTypeAPI,
proc.Condition,
).
Return(proc, nil)
plsMock.EXPECT().
Expand All @@ -235,7 +240,7 @@ func TestProcessorOrchestrator_CreateOnPipeline_AddProcessorError(t *testing.T)
Return(nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, proc.Type, proc.Parent, proc.Config)
got, err := orc.Processors.Create(ctx, proc.Type, proc.Parent, proc.Config, proc.Condition)
is.True(err != nil)
is.True(cerrors.Is(err, wantErr)) // errors did not match
is.True(got == nil)
Expand Down Expand Up @@ -266,6 +271,7 @@ func TestProcessorOrchestrator_CreateOnConnector_Success(t *testing.T) {
Config: processor.Config{
Settings: map[string]string{"foo": "bar"},
},
Condition: "{{ true }}",
}

consMock.EXPECT().
Expand All @@ -282,14 +288,15 @@ func TestProcessorOrchestrator_CreateOnConnector_Success(t *testing.T) {
want.Parent,
want.Config,
processor.ProvisionTypeAPI,
want.Condition,
).
Return(want, nil)
consMock.EXPECT().
AddProcessor(gomock.AssignableToTypeOf(ctxType), conn.ID, want.ID).
Return(conn, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, want.Type, want.Parent, want.Config)
got, err := orc.Processors.Create(ctx, want.Type, want.Parent, want.Config, want.Condition)
is.NoErr(err)
is.Equal(want, got)
}
Expand All @@ -311,7 +318,7 @@ func TestProcessorOrchestrator_CreateOnConnector_ConnectorNotExist(t *testing.T)
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{})
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{}, "")
is.True(err != nil)
is.True(cerrors.Is(err, wantErr)) // errors did not match
is.True(got == nil)
Expand Down Expand Up @@ -746,6 +753,7 @@ func TestProcessorOrchestrator_DeleteOnPipeline_RemoveProcessorFail(t *testing.T
Config: processor.Config{
Settings: map[string]string{"foo": "bar"},
},
Condition: "{{ true }}",
}

wantErr := cerrors.New("couldn't remove the processor")
Expand All @@ -770,6 +778,7 @@ func TestProcessorOrchestrator_DeleteOnPipeline_RemoveProcessorFail(t *testing.T
want.Parent,
want.Config,
processor.ProvisionTypeAPI,
want.Condition,
).
Return(want, nil)

Expand Down Expand Up @@ -803,6 +812,7 @@ func TestProcessorOrchestrator_DeleteOnConnector_Fail(t *testing.T) {
Config: processor.Config{
Settings: map[string]string{"foo": "bar"},
},
Condition: "{{ true }}",
}

wantErr := cerrors.New("couldn't remove processor from connector")
Expand Down Expand Up @@ -830,6 +840,7 @@ func TestProcessorOrchestrator_DeleteOnConnector_Fail(t *testing.T) {
want.Parent,
want.Config,
processor.ProvisionTypeAPI,
want.Condition,
).
Return(want, nil)

Expand Down
6 changes: 5 additions & 1 deletion pkg/processor/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ type Instance struct {
UpdatedAt time.Time
ProvisionedBy ProvisionType

Type string
Type string
// Condition is a goTemplate formatted string, the value provided to the template is a sdk.Record, it should evaluate
// to a boolean value, indicating a condition to run the processor for a specific record or not. (template functions
// provided by `sprig` are injected)
Condition string
Parent Parent
Config Config
Processor Interface
Expand Down
2 changes: 2 additions & 0 deletions pkg/processor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (s *Service) Create(
parent Parent,
cfg Config,
pt ProvisionType,
cond string,
) (*Instance, error) {
if cfg.Workers < 0 {
return nil, cerrors.New("processor workers can't be negative")
Expand Down Expand Up @@ -119,6 +120,7 @@ func (s *Service) Create(
Parent: parent,
Config: cfg,
Processor: p,
Condition: cond,
}

// persist instance
Expand Down
19 changes: 11 additions & 8 deletions pkg/processor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestService_Init_Success(t *testing.T) {
service := processor.NewService(log.Nop(), db, registry)

// create a processor instance
_, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI)
_, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI, "")
is.NoErr(err)

want := service.List(ctx)
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestService_Create_Success(t *testing.T) {
registry := newTestBuilderRegistry(is, map[string]processor.Interface{want.Type: p})
service := processor.NewService(log.Nop(), db, registry)

got, err := service.Create(ctx, want.ID, want.Type, want.Parent, want.Config, processor.ProvisionTypeAPI)
got, err := service.Create(ctx, want.ID, want.Type, want.Parent, want.Config, processor.ProvisionTypeAPI, "")
is.NoErr(err)
want.ID = got.ID // uuid is random
want.CreatedAt = got.CreatedAt
Expand All @@ -139,6 +139,7 @@ func TestService_Create_BuilderNotFound(t *testing.T) {
processor.Parent{},
processor.Config{},
processor.ProvisionTypeAPI,
"{{true}}",
)

is.True(err != nil)
Expand Down Expand Up @@ -171,6 +172,7 @@ func TestService_Create_BuilderFail(t *testing.T) {
processor.Parent{},
processor.Config{},
processor.ProvisionTypeAPI,
"{{true}}",
)
is.True(cerrors.Is(err, wantErr)) // expected builder error
is.Equal(got, nil)
Expand All @@ -191,6 +193,7 @@ func TestService_Create_WorkersNegative(t *testing.T) {
processor.Parent{},
processor.Config{},
processor.ProvisionTypeAPI,
"{{true}}",
)
is.True(err != nil) // expected workers error
is.Equal(got, nil)
Expand All @@ -210,7 +213,7 @@ func TestService_Delete_Success(t *testing.T) {
service := processor.NewService(log.Nop(), db, registry)

// create a processor instance
i, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI)
i, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI, "cond")
is.NoErr(err)

err = service.Delete(ctx, i.ID)
Expand Down Expand Up @@ -244,7 +247,7 @@ func TestService_Get_Success(t *testing.T) {
service := processor.NewService(log.Nop(), db, registry)

// create a processor instance
want, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI)
want, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI, "cond")
is.NoErr(err)

got, err := service.Get(ctx, want.ID)
Expand Down Expand Up @@ -287,11 +290,11 @@ func TestService_List_Some(t *testing.T) {
service := processor.NewService(log.Nop(), db, registry)

// create a couple of processor instances
i1, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI)
i1, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI, "")
is.NoErr(err)
i2, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI)
i2, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI, "")
is.NoErr(err)
i3, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI)
i3, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI, "")
is.NoErr(err)

instances := service.List(ctx)
Expand All @@ -311,7 +314,7 @@ func TestService_Update_Success(t *testing.T) {
service := processor.NewService(log.Nop(), db, registry)

// create a processor instance
want, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI)
want, err := service.Create(ctx, uuid.NewString(), procType, processor.Parent{}, processor.Config{}, processor.ProvisionTypeAPI, "")
is.NoErr(err)

newConfig := processor.Config{
Expand Down
11 changes: 6 additions & 5 deletions pkg/provisioning/config/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ type Connector struct {
}

type Processor struct {
ID string
Type string
Settings map[string]string
Workers int
ID string
Type string
Settings map[string]string
Workers int
Condition string
}

type DLQ struct {
Expand All @@ -64,7 +65,7 @@ var (
ConnectorMutableFields = []string{"Name", "Settings", "Processors"}

ProcessorImmutableFields = []string{"Type"}
ProcessorMutableFields = []string{"Settings", "Workers"}
ProcessorMutableFields = []string{"Settings", "Workers", "Condition"}
)

// Parser reads data from reader and parses all pipelines defined in the
Expand Down
Loading

0 comments on commit 2c3dfaf

Please sign in to comment.