Skip to content

Commit

Permalink
Pipeline created from API gets overwritten by config file with duplic…
Browse files Browse the repository at this point in the history
…ate ID (#1229)
  • Loading branch information
AdamHaffar authored Oct 13, 2023
1 parent f077271 commit c40884a
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 2 deletions.
17 changes: 17 additions & 0 deletions pkg/provisioning/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,23 @@ func (s *Service) Init(ctx context.Context) error {
allPls = append(allPls, duplicateID)
}

// remove pipelines with duplicate IDs from API pipelines
var apiProvisioned []int
for i, pl := range configs {
pipelineInstance, err := s.pipelineService.Get(ctx, pl.ID)
if err != nil {
if !cerrors.Is(err, pipeline.ErrInstanceNotFound) {
multierr = multierror.Append(multierr, cerrors.Errorf("error getting pipeline instance with ID %q: %w", pl.ID, err))
}
continue
}
if pipelineInstance.ProvisionedBy != pipeline.ProvisionTypeConfig {
multierr = multierror.Append(multierr, cerrors.Errorf("pipelines with ID %q will be skipped: %w", pl.ID, ErrNotProvisionedByConfig))
apiProvisioned = append(apiProvisioned, i)
}
}
configs = s.deleteIndexes(configs, apiProvisioned)

// contains pipelineIDs of successfully provisioned pipelines.
var successPls []string
for _, cfg := range configs {
Expand Down
81 changes: 79 additions & 2 deletions pkg/provisioning/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ func TestService_Init_Create(t *testing.T) {
service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger)
service.pipelinesPath = "./test/pipelines1"

// return a pipeline not provisioned by API
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound)

pipelineService.EXPECT().List(anyCtx)
// pipeline doesn't exist
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound)

// create pipeline
pipelineService.EXPECT().CreateWithInstance(anyCtx, p1.P1)
pipelineService.EXPECT().UpdateDLQ(anyCtx, p1.P1.ID, p1.P1.DLQ)
Expand Down Expand Up @@ -144,6 +146,9 @@ func TestService_Init_Update(t *testing.T) {
service.pipelinesPath = "./test/pipelines1"

pipelineService.EXPECT().List(anyCtx)
// return a pipeline not provisioned by API
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound)

// pipeline exists
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(oldPipelineInstance, nil)

Expand Down Expand Up @@ -204,6 +209,9 @@ func TestService_Init_NoRollbackOnFailedStart(t *testing.T) {
service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger)
service.pipelinesPath = "./test/pipelines1"

// return a pipeline not provisioned by API
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound)

pipelineService.EXPECT().List(anyCtx)
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound)

Expand Down Expand Up @@ -237,6 +245,9 @@ func TestService_Init_RollbackCreate(t *testing.T) {
service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger)
service.pipelinesPath = "./test/pipelines1"

// return a pipeline not provisioned by API
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound)

pipelineService.EXPECT().List(anyCtx)
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound)

Expand Down Expand Up @@ -273,6 +284,9 @@ func TestService_Init_RollbackUpdate(t *testing.T) {
service, pipelineService, connService, procService, _ := newTestService(ctrl, logger)
service.pipelinesPath = "./test/pipelines1"

// return a pipeline not provisioned by API
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound)

pipelineService.EXPECT().List(anyCtx)
// pipeline exists
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(oldPipelineInstance, nil)
Expand Down Expand Up @@ -310,6 +324,9 @@ func TestService_Init_MultiplePipelinesDuplicatedPipelineID(t *testing.T) {
service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger)
service.pipelinesPath = "./test/pipelines2"

// return a pipeline not provisioned by API
pipelineService.EXPECT().Get(anyCtx, p2.P2.ID).Return(nil, pipeline.ErrInstanceNotFound)

pipelineService.EXPECT().List(anyCtx)
pipelineService.EXPECT().Get(anyCtx, p2.P2.ID).Return(nil, pipeline.ErrInstanceNotFound)

Expand All @@ -336,6 +353,10 @@ func TestService_Init_MultiplePipelines(t *testing.T) {
service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger)
service.pipelinesPath = "./test/pipelines3"

// return a pipeline not provisioned by API
pipelineService.EXPECT().Get(anyCtx, p3.P1.ID).Return(nil, pipeline.ErrInstanceNotFound)
pipelineService.EXPECT().Get(anyCtx, p3.P2.ID).Return(nil, pipeline.ErrInstanceNotFound)

pipelineService.EXPECT().List(anyCtx)
pipelineService.EXPECT().Get(anyCtx, p3.P1.ID).Return(nil, pipeline.ErrInstanceNotFound)
pipelineService.EXPECT().Get(anyCtx, p3.P2.ID).Return(nil, pipeline.ErrInstanceNotFound)
Expand Down Expand Up @@ -366,6 +387,63 @@ func TestService_Init_MultiplePipelines(t *testing.T) {
is.NoErr(err)
}

func TestService_Init_PipelineProvisionedFromAPI(t *testing.T) {
is := is.New(t)
logger := log.Nop()
ctrl := gomock.NewController(t)

tmp := *oldPipelineInstance
APIPipelineInstance := &tmp
APIPipelineInstance.ProvisionedBy = pipeline.ProvisionTypeAPI // change the test pipeline to be API provisioned

service, pipelineService, _, _, _ := newTestService(ctrl, logger)
service.pipelinesPath = "./test/pipelines1"

// pipeline provisioned by API
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(APIPipelineInstance, nil)

pipelineService.EXPECT().List(anyCtx)

err := service.Init(context.Background())
is.True(cerrors.Is(err, ErrNotProvisionedByConfig))
}

func TestService_Init_PipelineProvisionedFromAPI_Error(t *testing.T) {
is := is.New(t)
logger := log.Nop()
ctrl := gomock.NewController(t)
otherErr := cerrors.New("GetError")

service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger)
service.pipelinesPath = "./test/pipelines1"

// error from calling Get
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, otherErr)

pipelineService.EXPECT().List(anyCtx)
// pipeline doesn't exist
pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound)
// create pipeline
pipelineService.EXPECT().CreateWithInstance(anyCtx, p1.P1)
pipelineService.EXPECT().UpdateDLQ(anyCtx, p1.P1.ID, p1.P1.DLQ)
pipelineService.EXPECT().AddConnector(anyCtx, p1.P1.ID, p1.P1.ConnectorIDs[0])
pipelineService.EXPECT().AddConnector(anyCtx, p1.P1.ID, p1.P1.ConnectorIDs[1])
pipelineService.EXPECT().AddProcessor(anyCtx, p1.P1.ID, p1.P1.ProcessorIDs[0])

connService.EXPECT().CreateWithInstance(anyCtx, p1.P1C1)
connService.EXPECT().CreateWithInstance(anyCtx, p1.P1C2)
connService.EXPECT().AddProcessor(anyCtx, p1.P1C2.ID, p1.P1C2.ProcessorIDs[0])

procService.EXPECT().CreateWithInstance(anyCtx, p1.P1C2P1)
procService.EXPECT().CreateWithInstance(anyCtx, p1.P1P1)

// start pipeline
pipelineService.EXPECT().Start(anyCtx, connService, procService, plugService, p1.P1.ID)

err := service.Init(context.Background())
is.True(cerrors.Is(err, otherErr))
}

func TestService_Delete(t *testing.T) {
is := is.New(t)
logger := log.Nop()
Expand All @@ -379,7 +457,6 @@ func TestService_Delete(t *testing.T) {
connService.EXPECT().Get(anyCtx, oldConnector2Instance.ID).Return(oldConnector2Instance, nil)
procService.EXPECT().Get(anyCtx, oldConnectorProcessorInstance.ID).Return(oldConnectorProcessorInstance, nil)
procService.EXPECT().Get(anyCtx, oldPipelineProcessorInstance.ID).Return(oldPipelineProcessorInstance, nil)

// delete pipeline
pipelineService.EXPECT().Delete(anyCtx, oldPipelineInstance.ID).Return(nil)
connService.EXPECT().Delete(anyCtx, oldConnector1Instance.ID, plugService).Return(nil)
Expand Down

0 comments on commit c40884a

Please sign in to comment.