diff --git a/pkg/provisioning/service.go b/pkg/provisioning/service.go index f305ff104..c49260135 100644 --- a/pkg/provisioning/service.go +++ b/pkg/provisioning/service.go @@ -102,6 +102,23 @@ func (s *Service) Init(ctx context.Context) error { allPls = append(allPls, duplicateID) } + // remove pipelines with duplicate IDs from API pipelines + var apiProvisioned []int + for i, pl := range configs { + pipelineInstance, err := s.pipelineService.Get(ctx, pl.ID) + if err != nil { + if !cerrors.Is(err, pipeline.ErrInstanceNotFound) { + multierr = multierror.Append(multierr, cerrors.Errorf("error getting pipeline instance with ID %q: %w", pl.ID, err)) + } + continue + } + if pipelineInstance.ProvisionedBy != pipeline.ProvisionTypeConfig { + multierr = multierror.Append(multierr, cerrors.Errorf("pipelines with ID %q will be skipped: %w", pl.ID, ErrNotProvisionedByConfig)) + apiProvisioned = append(apiProvisioned, i) + } + } + configs = s.deleteIndexes(configs, apiProvisioned) + // contains pipelineIDs of successfully provisioned pipelines. var successPls []string for _, cfg := range configs { diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index 5112c921e..f036b9709 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -110,10 +110,12 @@ func TestService_Init_Create(t *testing.T) { service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger) service.pipelinesPath = "./test/pipelines1" + // return a pipeline not provisioned by API + pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound) + pipelineService.EXPECT().List(anyCtx) // pipeline doesn't exist pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound) - // create pipeline pipelineService.EXPECT().CreateWithInstance(anyCtx, p1.P1) pipelineService.EXPECT().UpdateDLQ(anyCtx, p1.P1.ID, p1.P1.DLQ) @@ -144,6 +146,9 @@ func TestService_Init_Update(t *testing.T) { service.pipelinesPath = "./test/pipelines1" pipelineService.EXPECT().List(anyCtx) + // return a pipeline not provisioned by API + pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound) + // pipeline exists pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(oldPipelineInstance, nil) @@ -204,6 +209,9 @@ func TestService_Init_NoRollbackOnFailedStart(t *testing.T) { service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger) service.pipelinesPath = "./test/pipelines1" + // return a pipeline not provisioned by API + pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound) + pipelineService.EXPECT().List(anyCtx) pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound) @@ -237,6 +245,9 @@ func TestService_Init_RollbackCreate(t *testing.T) { service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger) service.pipelinesPath = "./test/pipelines1" + // return a pipeline not provisioned by API + pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound) + pipelineService.EXPECT().List(anyCtx) pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound) @@ -273,6 +284,9 @@ func TestService_Init_RollbackUpdate(t *testing.T) { service, pipelineService, connService, procService, _ := newTestService(ctrl, logger) service.pipelinesPath = "./test/pipelines1" + // return a pipeline not provisioned by API + pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound) + pipelineService.EXPECT().List(anyCtx) // pipeline exists pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(oldPipelineInstance, nil) @@ -310,6 +324,9 @@ func TestService_Init_MultiplePipelinesDuplicatedPipelineID(t *testing.T) { service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger) service.pipelinesPath = "./test/pipelines2" + // return a pipeline not provisioned by API + pipelineService.EXPECT().Get(anyCtx, p2.P2.ID).Return(nil, pipeline.ErrInstanceNotFound) + pipelineService.EXPECT().List(anyCtx) pipelineService.EXPECT().Get(anyCtx, p2.P2.ID).Return(nil, pipeline.ErrInstanceNotFound) @@ -336,6 +353,10 @@ func TestService_Init_MultiplePipelines(t *testing.T) { service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger) service.pipelinesPath = "./test/pipelines3" + // return a pipeline not provisioned by API + pipelineService.EXPECT().Get(anyCtx, p3.P1.ID).Return(nil, pipeline.ErrInstanceNotFound) + pipelineService.EXPECT().Get(anyCtx, p3.P2.ID).Return(nil, pipeline.ErrInstanceNotFound) + pipelineService.EXPECT().List(anyCtx) pipelineService.EXPECT().Get(anyCtx, p3.P1.ID).Return(nil, pipeline.ErrInstanceNotFound) pipelineService.EXPECT().Get(anyCtx, p3.P2.ID).Return(nil, pipeline.ErrInstanceNotFound) @@ -366,6 +387,63 @@ func TestService_Init_MultiplePipelines(t *testing.T) { is.NoErr(err) } +func TestService_Init_PipelineProvisionedFromAPI(t *testing.T) { + is := is.New(t) + logger := log.Nop() + ctrl := gomock.NewController(t) + + tmp := *oldPipelineInstance + APIPipelineInstance := &tmp + APIPipelineInstance.ProvisionedBy = pipeline.ProvisionTypeAPI // change the test pipeline to be API provisioned + + service, pipelineService, _, _, _ := newTestService(ctrl, logger) + service.pipelinesPath = "./test/pipelines1" + + // pipeline provisioned by API + pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(APIPipelineInstance, nil) + + pipelineService.EXPECT().List(anyCtx) + + err := service.Init(context.Background()) + is.True(cerrors.Is(err, ErrNotProvisionedByConfig)) +} + +func TestService_Init_PipelineProvisionedFromAPI_Error(t *testing.T) { + is := is.New(t) + logger := log.Nop() + ctrl := gomock.NewController(t) + otherErr := cerrors.New("GetError") + + service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger) + service.pipelinesPath = "./test/pipelines1" + + // error from calling Get + pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, otherErr) + + pipelineService.EXPECT().List(anyCtx) + // pipeline doesn't exist + pipelineService.EXPECT().Get(anyCtx, p1.P1.ID).Return(nil, pipeline.ErrInstanceNotFound) + // create pipeline + pipelineService.EXPECT().CreateWithInstance(anyCtx, p1.P1) + pipelineService.EXPECT().UpdateDLQ(anyCtx, p1.P1.ID, p1.P1.DLQ) + pipelineService.EXPECT().AddConnector(anyCtx, p1.P1.ID, p1.P1.ConnectorIDs[0]) + pipelineService.EXPECT().AddConnector(anyCtx, p1.P1.ID, p1.P1.ConnectorIDs[1]) + pipelineService.EXPECT().AddProcessor(anyCtx, p1.P1.ID, p1.P1.ProcessorIDs[0]) + + connService.EXPECT().CreateWithInstance(anyCtx, p1.P1C1) + connService.EXPECT().CreateWithInstance(anyCtx, p1.P1C2) + connService.EXPECT().AddProcessor(anyCtx, p1.P1C2.ID, p1.P1C2.ProcessorIDs[0]) + + procService.EXPECT().CreateWithInstance(anyCtx, p1.P1C2P1) + procService.EXPECT().CreateWithInstance(anyCtx, p1.P1P1) + + // start pipeline + pipelineService.EXPECT().Start(anyCtx, connService, procService, plugService, p1.P1.ID) + + err := service.Init(context.Background()) + is.True(cerrors.Is(err, otherErr)) +} + func TestService_Delete(t *testing.T) { is := is.New(t) logger := log.Nop() @@ -379,7 +457,6 @@ func TestService_Delete(t *testing.T) { connService.EXPECT().Get(anyCtx, oldConnector2Instance.ID).Return(oldConnector2Instance, nil) procService.EXPECT().Get(anyCtx, oldConnectorProcessorInstance.ID).Return(oldConnectorProcessorInstance, nil) procService.EXPECT().Get(anyCtx, oldPipelineProcessorInstance.ID).Return(oldPipelineProcessorInstance, nil) - // delete pipeline pipelineService.EXPECT().Delete(anyCtx, oldPipelineInstance.ID).Return(nil) connService.EXPECT().Delete(anyCtx, oldConnector1Instance.ID, plugService).Return(nil)