Skip to content

Commit

Permalink
expose delete method in provisioning service (#1230)
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Oct 5, 2023
1 parent fc541df commit dca7d58
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 32 deletions.
3 changes: 2 additions & 1 deletion pkg/provisioning/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ package provisioning
import "github.com/conduitio/conduit/pkg/foundation/cerrors"

var (
ErrDuplicatedPipelineID = cerrors.New("duplicated pipeline ID")
ErrDuplicatedPipelineID = cerrors.New("duplicated pipeline ID")
ErrNotProvisionedByConfig = cerrors.New("entity was not provisioned by a config file and therefore can't be mutated by the provisioning service")
)
55 changes: 34 additions & 21 deletions pkg/provisioning/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,28 @@ func (s *Service) Init(ctx context.Context) error {
return multierr
}

// Delete exposes a way to delete pipelines provisioned using the provisioning
// service.
func (s *Service) Delete(ctx context.Context, id string) error {
pl, err := s.pipelineService.Get(ctx, id)
if err != nil {
return cerrors.Errorf("could not get pipeline %q: %w", id, err)
}
if pl.ProvisionedBy != pipeline.ProvisionTypeConfig {
return ErrNotProvisionedByConfig
}
oldConfig, err := s.Export(ctx, id)
if err != nil {
return cerrors.Errorf("failed to export pipeline: %w", err)
}
actions := s.newActionsBuilder().Build(oldConfig, config.Pipeline{})
_, err = s.executeActions(ctx, actions)
if err != nil {
return cerrors.Errorf("failed to delete pipeline: %w", err)
}
return nil
}

// getYamlFiles recursively reads folders in the path and collects paths to all
// files that end with .yml or .yaml.
func (s *Service) getYamlFiles(path string) ([]string, error) {
Expand Down Expand Up @@ -223,31 +245,22 @@ func (s *Service) provisionPipeline(ctx context.Context, cfg config.Pipeline) er
return nil
}

func (s *Service) deleteOldPipelines(ctx context.Context, ids []string) []string {
func (s *Service) deleteOldPipelines(ctx context.Context, keepIDs []string) []string {
var deletedIDs []string
pipelines := s.pipelineService.List(ctx)
for id, pl := range pipelines {
if !slices.Contains(ids, id) && pl.ProvisionedBy == pipeline.ProvisionTypeConfig {
oldConfig, err := s.Export(ctx, id)
if err != nil {
s.logger.Warn(ctx).
Err(err).
Str(log.PipelineIDField, id).
Msg("failed to delete a pipeline provisioned by a config file, the pipeline is probably in a broken state, Conduit will try to remove it again next time it runs")
continue
}
actions := s.newActionsBuilder().Build(oldConfig, config.Pipeline{})
_, err = s.executeActions(ctx, actions)
if err != nil {
s.logger.Warn(ctx).
Err(err).
Str(log.PipelineIDField, id).
Msg("failed to delete a pipeline provisioned by a config file, the pipeline is probably in a broken state, Conduit will try to remove it again next time it runs")
continue
}

deletedIDs = append(deletedIDs, id)
if slices.Contains(keepIDs, id) || pl.ProvisionedBy != pipeline.ProvisionTypeConfig {
continue
}
err := s.Delete(ctx, id)
if err != nil {
s.logger.Warn(ctx).
Err(err).
Str(log.PipelineIDField, id).
Msg("failed to delete a pipeline provisioned by a config file, the pipeline is probably in a broken state, Conduit will try to remove it again next time it runs")
continue
}
deletedIDs = append(deletedIDs, id)
}
return deletedIDs
}
45 changes: 35 additions & 10 deletions pkg/provisioning/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var (
}
)

func TestProvision_Create(t *testing.T) {
func TestService_Init_Create(t *testing.T) {
is := is.New(t)
logger := log.Nop()
ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestProvision_Create(t *testing.T) {
is.NoErr(err)
}

func TestProvision_Update(t *testing.T) {
func TestService_Init_Update(t *testing.T) {
is := is.New(t)
logger := log.Nop()
ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestProvision_Update(t *testing.T) {
is.NoErr(err)
}

func TestProvision_Delete(t *testing.T) {
func TestService_Init_Delete(t *testing.T) {
is := is.New(t)
logger := log.Nop()
ctrl := gomock.NewController(t)
Expand All @@ -179,7 +179,7 @@ func TestProvision_Delete(t *testing.T) {
pipelineService.EXPECT().List(anyCtx).Return(map[string]*pipeline.Instance{oldPipelineInstance.ID: oldPipelineInstance})

// export pipeline
pipelineService.EXPECT().Get(anyCtx, oldPipelineInstance.ID).Return(oldPipelineInstance, nil)
pipelineService.EXPECT().Get(anyCtx, oldPipelineInstance.ID).Return(oldPipelineInstance, nil).Times(2)
connService.EXPECT().Get(anyCtx, oldConnector1Instance.ID).Return(oldConnector1Instance, nil)
connService.EXPECT().Get(anyCtx, oldConnector2Instance.ID).Return(oldConnector2Instance, nil)
procService.EXPECT().Get(anyCtx, oldConnectorProcessorInstance.ID).Return(oldConnectorProcessorInstance, nil)
Expand All @@ -196,7 +196,7 @@ func TestProvision_Delete(t *testing.T) {
is.NoErr(err)
}

func TestProvision_NoRollbackOnFailedStart(t *testing.T) {
func TestService_Init_NoRollbackOnFailedStart(t *testing.T) {
is := is.New(t)
logger := log.Nop()
ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestProvision_NoRollbackOnFailedStart(t *testing.T) {
is.True(cerrors.Is(err, wantErr))
}

func TestProvision_RollbackCreate(t *testing.T) {
func TestService_Init_RollbackCreate(t *testing.T) {
is := is.New(t)
logger := log.Nop()
ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestProvision_RollbackCreate(t *testing.T) {
is.True(cerrors.Is(err, wantErr))
}

func TestProvision_RollbackUpdate(t *testing.T) {
func TestService_Init_RollbackUpdate(t *testing.T) {
is := is.New(t)
logger := log.Nop()
ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestProvision_RollbackUpdate(t *testing.T) {
is.True(cerrors.Is(err, wantErr))
}

func TestProvision_MultiplePipelinesDuplicatedPipelineID(t *testing.T) {
func TestService_Init_MultiplePipelinesDuplicatedPipelineID(t *testing.T) {
is := is.New(t)
logger := log.Nop()
ctrl := gomock.NewController(t)
Expand All @@ -328,7 +328,7 @@ func TestProvision_MultiplePipelinesDuplicatedPipelineID(t *testing.T) {
is.True(cerrors.Is(err, ErrDuplicatedPipelineID)) // duplicated pipeline id
}

func TestProvision_MultiplePipelines(t *testing.T) {
func TestService_Init_MultiplePipelines(t *testing.T) {
is := is.New(t)
logger := log.Nop()
ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -366,7 +366,32 @@ func TestProvision_MultiplePipelines(t *testing.T) {
is.NoErr(err)
}

func TestProvision_IntegrationTestServices(t *testing.T) {
func TestService_Delete(t *testing.T) {
is := is.New(t)
logger := log.Nop()
ctrl := gomock.NewController(t)

service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger)

// export pipeline
pipelineService.EXPECT().Get(anyCtx, oldPipelineInstance.ID).Return(oldPipelineInstance, nil).Times(2)
connService.EXPECT().Get(anyCtx, oldConnector1Instance.ID).Return(oldConnector1Instance, nil)
connService.EXPECT().Get(anyCtx, oldConnector2Instance.ID).Return(oldConnector2Instance, nil)
procService.EXPECT().Get(anyCtx, oldConnectorProcessorInstance.ID).Return(oldConnectorProcessorInstance, nil)
procService.EXPECT().Get(anyCtx, oldPipelineProcessorInstance.ID).Return(oldPipelineProcessorInstance, nil)

// delete pipeline
pipelineService.EXPECT().Delete(anyCtx, oldPipelineInstance.ID).Return(nil)
connService.EXPECT().Delete(anyCtx, oldConnector1Instance.ID, plugService).Return(nil)
connService.EXPECT().Delete(anyCtx, oldConnector2Instance.ID, plugService).Return(nil)
procService.EXPECT().Delete(anyCtx, oldConnectorProcessorInstance.ID).Return(nil)
procService.EXPECT().Delete(anyCtx, oldPipelineProcessorInstance.ID).Return(nil)

err := service.Delete(context.Background(), oldPipelineInstance.ID)
is.NoErr(err)
}

func TestService_IntegrationTestServices(t *testing.T) {
is := is.New(t)
ctx, killAll := context.WithCancel(context.Background())
defer killAll()
Expand Down

0 comments on commit dca7d58

Please sign in to comment.