diff --git a/internal/nha/activities/hari_test.go b/internal/nha/activities/hari_test.go index bf34414f..51d396ba 100644 --- a/internal/nha/activities/hari_test.go +++ b/internal/nha/activities/hari_test.go @@ -18,7 +18,6 @@ import ( collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake" "github.com/artefactual-labs/enduro/internal/nha" - "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/workflow/manager" ) @@ -506,7 +505,6 @@ func createHariActivity(t *testing.T, hariConfig map[string]interface{}) *Update manager := manager.NewManager( collectionfake.NewMockService(ctrl), - &pipeline.Registry{}, hooks, ) diff --git a/internal/nha/activities/prod_test.go b/internal/nha/activities/prod_test.go index 98e5c562..7598def1 100644 --- a/internal/nha/activities/prod_test.go +++ b/internal/nha/activities/prod_test.go @@ -13,7 +13,6 @@ import ( collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake" "github.com/artefactual-labs/enduro/internal/nha" - "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/workflow/manager" ) @@ -162,7 +161,6 @@ func createProdActivity(t *testing.T, hookConfig map[string]interface{}) *Update manager := manager.NewManager( collectionfake.NewMockService(ctrl), - &pipeline.Registry{}, map[string]map[string]interface{}{ "prod": hookConfig, }, diff --git a/internal/workflow/activities/download.go b/internal/workflow/activities/download.go index 88c363d2..f961c1ac 100644 --- a/internal/workflow/activities/download.go +++ b/internal/workflow/activities/download.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/temporal" "github.com/artefactual-labs/enduro/internal/watcher" "github.com/artefactual-labs/enduro/internal/workflow/manager" @@ -11,16 +12,17 @@ import ( // DownloadActivity downloads the blob into the pipeline processing directory. type DownloadActivity struct { - manager *manager.Manager - wsvc watcher.Service + manager *manager.Manager + wsvc watcher.Service + pipelineRegistry *pipeline.Registry } -func NewDownloadActivity(m *manager.Manager, wsvc watcher.Service) *DownloadActivity { - return &DownloadActivity{manager: m, wsvc: wsvc} +func NewDownloadActivity(m *manager.Manager, pipelineRegistry *pipeline.Registry, wsvc watcher.Service) *DownloadActivity { + return &DownloadActivity{manager: m, pipelineRegistry: pipelineRegistry, wsvc: wsvc} } func (a *DownloadActivity) Execute(ctx context.Context, pipelineName, watcherName, key string) (string, error) { - p, err := a.manager.Pipelines.ByName(pipelineName) + p, err := a.pipelineRegistry.ByName(pipelineName) if err != nil { return "", temporal.NewNonRetryableError(err) } diff --git a/internal/workflow/activities/hide_package.go b/internal/workflow/activities/hide_package.go index 6a73dcc9..ea4a7db6 100644 --- a/internal/workflow/activities/hide_package.go +++ b/internal/workflow/activities/hide_package.go @@ -4,20 +4,20 @@ import ( "context" "fmt" + "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/temporal" - "github.com/artefactual-labs/enduro/internal/workflow/manager" ) type HidePackageActivity struct { - manager *manager.Manager + pipelineRegistry *pipeline.Registry } -func NewHidePackageActivity(m *manager.Manager) *HidePackageActivity { - return &HidePackageActivity{manager: m} +func NewHidePackageActivity(pipelineRegistry *pipeline.Registry) *HidePackageActivity { + return &HidePackageActivity{pipelineRegistry: pipelineRegistry} } func (a *HidePackageActivity) Execute(ctx context.Context, unitID, unitType, pipelineName string) error { - p, err := a.manager.Pipelines.ByName(pipelineName) + p, err := a.pipelineRegistry.ByName(pipelineName) if err != nil { return temporal.NewNonRetryableError(err) } diff --git a/internal/workflow/activities/transfer.go b/internal/workflow/activities/transfer.go index 6d7b5bc0..dacdc8bb 100644 --- a/internal/workflow/activities/transfer.go +++ b/internal/workflow/activities/transfer.go @@ -7,8 +7,8 @@ import ( "go.artefactual.dev/amclient" + "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/temporal" - "github.com/artefactual-labs/enduro/internal/workflow/manager" ) // TransferActivity submits the transfer to Archivematica and returns its ID. @@ -16,11 +16,11 @@ import ( // This is our first interaction with Archivematica. The workflow ends here // after authentication errors. type TransferActivity struct { - manager *manager.Manager + pipelineRegistry *pipeline.Registry } -func NewTransferActivity(m *manager.Manager) *TransferActivity { - return &TransferActivity{manager: m} +func NewTransferActivity(pipelineRegistry *pipeline.Registry) *TransferActivity { + return &TransferActivity{pipelineRegistry: pipelineRegistry} } type TransferActivityParams struct { @@ -40,7 +40,7 @@ type TransferActivityResponse struct { } func (a *TransferActivity) Execute(ctx context.Context, params *TransferActivityParams) (*TransferActivityResponse, error) { - p, err := a.manager.Pipelines.ByName(params.PipelineName) + p, err := a.pipelineRegistry.ByName(params.PipelineName) if err != nil { return nil, temporal.NewNonRetryableError(err) } diff --git a/internal/workflow/local_activities.go b/internal/workflow/local_activities.go index bb367332..2d2698dd 100644 --- a/internal/workflow/local_activities.go +++ b/internal/workflow/local_activities.go @@ -8,6 +8,7 @@ import ( temporalsdk_activity "go.temporal.io/sdk/activity" "github.com/artefactual-labs/enduro/internal/collection" + "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/workflow/manager" ) @@ -64,8 +65,8 @@ func checkDuplicatePackageLocalActivity(ctx context.Context, logger logr.Logger, return colsvc.CheckDuplicate(ctx, id) } -func loadConfigLocalActivity(ctx context.Context, m *manager.Manager, logger logr.Logger, pipeline string, tinfo *TransferInfo) (*TransferInfo, error) { - p, err := m.Pipelines.ByName(pipeline) +func loadConfigLocalActivity(ctx context.Context, m *manager.Manager, pipelineRegistry *pipeline.Registry, logger logr.Logger, pipeline string, tinfo *TransferInfo) (*TransferInfo, error) { + p, err := pipelineRegistry.ByName(pipeline) if err != nil { logger.Error(err, "Error loading local configuration") return nil, err diff --git a/internal/workflow/manager/manager.go b/internal/workflow/manager/manager.go index 8b2b3274..86f1480d 100644 --- a/internal/workflow/manager/manager.go +++ b/internal/workflow/manager/manager.go @@ -5,21 +5,18 @@ import ( "strings" "github.com/artefactual-labs/enduro/internal/collection" - "github.com/artefactual-labs/enduro/internal/pipeline" ) // Manager carries workflow and activity dependencies. type Manager struct { Collection collection.Service - Pipelines *pipeline.Registry Hooks map[string]map[string]interface{} } // NewManager returns a pointer to a new Manager. -func NewManager(colsvc collection.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager { +func NewManager(colsvc collection.Service, hooks map[string]map[string]interface{}) *Manager { return &Manager{ Collection: colsvc, - Pipelines: pipelines, Hooks: hooks, } } diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 8db22d3f..9757c222 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -26,12 +26,13 @@ import ( ) type ProcessingWorkflow struct { - manager *manager.Manager - logger logr.Logger + manager *manager.Manager + pipelineRegistry *pipeline.Registry + logger logr.Logger } -func NewProcessingWorkflow(m *manager.Manager, l logr.Logger) *ProcessingWorkflow { - return &ProcessingWorkflow{manager: m, logger: l} +func NewProcessingWorkflow(m *manager.Manager, pipelineRegistry *pipeline.Registry, l logr.Logger) *ProcessingWorkflow { + return &ProcessingWorkflow{manager: m, pipelineRegistry: pipelineRegistry, logger: l} } // TransferInfo is shared state that is passed down to activities. It can be @@ -284,7 +285,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll if err := temporalsdk_workflow.SideEffect(ctx, func(ctx temporalsdk_workflow.Context) interface{} { names := req.PipelineNames if len(names) < 1 { - names = w.manager.Pipelines.Names() + names = w.pipelineRegistry.Names() if len(names) < 1 { return "" } @@ -302,7 +303,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll // Load pipeline configuration and hooks. { activityOpts := withLocalActivityWithoutRetriesOpts(ctx) - err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, loadConfigLocalActivity, w.manager, w.logger, tinfo.PipelineName, tinfo).Get(activityOpts, &tinfo) + err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, loadConfigLocalActivity, w.manager, w.pipelineRegistry, w.logger, tinfo.PipelineName, tinfo).Get(activityOpts, &tinfo) if err != nil { return fmt.Errorf("error loading configuration: %v", err) } @@ -431,7 +432,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context { var acquired bool var err error - acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.manager.Pipelines, tinfo.PipelineName, tinfo.CollectionID) + acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.pipelineRegistry, tinfo.PipelineName, tinfo.CollectionID) if acquired { defer func() { _ = release(sessCtx) diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 45bbaa21..65f7f88e 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -33,7 +33,8 @@ type ProcessingWorkflowTestSuite struct { func (s *ProcessingWorkflowTestSuite) SetupTest() { s.env = s.NewTestWorkflowEnvironment() s.manager = buildManager(s.T(), gomock.NewController(s.T())) - s.workflow = NewProcessingWorkflow(s.manager, logr.Discard()) + pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{}) + s.workflow = NewProcessingWorkflow(s.manager, pipelineRegistry, logr.Discard()) } func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) { @@ -52,7 +53,7 @@ func (s *ProcessingWorkflowTestSuite) TestParseErrorIsIgnored() { s.env.OnActivity(nha_activities.ParseNameLocalActivity, mock.Anything, "key").Return(nil, errors.New("parse error")).Once() // loadConfig is executed (workflow continued), returning an error. - s.env.OnActivity(loadConfigLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("pipeline is unavailable")).Once() + s.env.OnActivity(loadConfigLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("pipeline is unavailable")).Once() // Defer updates the package with the error status before returning. s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, &updatePackageLocalActivityParams{ @@ -124,7 +125,6 @@ func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager { return manager.NewManager( collectionfake.NewMockService(ctrl), - &pipeline.Registry{}, map[string]map[string]interface{}{ "prod": {"disabled": "false"}, "hari": {"disabled": "false"}, diff --git a/internal/workflow/receipts_test.go b/internal/workflow/receipts_test.go index a6a7447c..e8ef2acf 100644 --- a/internal/workflow/receipts_test.go +++ b/internal/workflow/receipts_test.go @@ -15,6 +15,7 @@ import ( "github.com/artefactual-labs/enduro/internal/nha" nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities" + "github.com/artefactual-labs/enduro/internal/pipeline" ) // sendReceipts exits immediately after an activity error, ensuring that @@ -23,6 +24,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) { wts := temporalsdk_testsuite.WorkflowTestSuite{} env := wts.NewTestWorkflowEnvironment() m := buildManager(t, gomock.NewController(t)) + pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{}) AsyncCompletionActivityName = uuid.New().String() + "-async-completion" env.RegisterActivityWithOptions(NewAsyncCompletionActivity(m.Collection).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName}) @@ -58,7 +60,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) { uint(12345), ).Return("ABANDON", nil).Once() - env.ExecuteWorkflow(NewProcessingWorkflow(m, logr.Discard()).sendReceipts, ¶ms) + env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, ¶ms) assert.Equal(t, env.IsWorkflowCompleted(), true) assert.ErrorContains(t, env.GetWorkflowError(), "error sending hari receipt: user abandoned") @@ -69,6 +71,7 @@ func TestSendReceipts(t *testing.T) { wts := temporalsdk_testsuite.WorkflowTestSuite{} env := wts.NewTestWorkflowEnvironment() m := buildManager(t, gomock.NewController(t)) + pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{}) nha_activities.UpdateHARIActivityName = uuid.New().String() env.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName}) @@ -108,7 +111,7 @@ func TestSendReceipts(t *testing.T) { }, ).Return(nil).Once() - env.ExecuteWorkflow(NewProcessingWorkflow(m, logr.Discard()).sendReceipts, ¶ms) + env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, ¶ms) assert.Equal(t, env.IsWorkflowCompleted(), true) assert.NilError(t, env.GetWorkflowError()) diff --git a/main.go b/main.go index 5ee929bb..52b3db26 100644 --- a/main.go +++ b/main.go @@ -250,7 +250,7 @@ func main() { // TODO: this is a temporary workaround for dependency injection until we // figure out what's the depdencency tree is going to look like after POC. // The share-everything pattern should be avoided. - m := manager.NewManager(colsvc, pipelineRegistry, config.Hooks) + m := manager.NewManager(colsvc, config.Hooks) done := make(chan struct{}) w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{ @@ -262,16 +262,16 @@ func main() { os.Exit(1) } - w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName}) + w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, pipelineRegistry, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName}) w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName}) - w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}) + w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, pipelineRegistry, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}) w.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName}) w.RegisterActivityWithOptions(activities.NewValidateTransferActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ValidateTransferActivityName}) - w.RegisterActivityWithOptions(activities.NewTransferActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName}) + w.RegisterActivityWithOptions(activities.NewTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName}) w.RegisterActivityWithOptions(activities.NewPollTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollTransferActivityName}) w.RegisterActivityWithOptions(activities.NewPollIngestActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollIngestActivityName}) w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName}) - w.RegisterActivityWithOptions(activities.NewHidePackageActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName}) + w.RegisterActivityWithOptions(activities.NewHidePackageActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName}) w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName}) w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName}) w.RegisterActivityWithOptions(activities.NewPopulateMetadataActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PopulateMetadataActivityName})