From dc329c16e9bfdc5039e82fa9f3baeb5c91d30318 Mon Sep 17 00:00:00 2001 From: camlyall Date: Thu, 21 Sep 2023 17:00:48 +0000 Subject: [PATCH] Remove pipelineRegistry from manager --- internal/nha/activities/hari_test.go | 2 -- internal/nha/activities/prod_test.go | 2 -- internal/workflow/activities/download.go | 12 +++++++----- internal/workflow/activities/hide_package.go | 10 +++++----- internal/workflow/activities/transfer.go | 10 +++++----- internal/workflow/local_activities.go | 5 +++-- internal/workflow/manager/manager.go | 5 +---- internal/workflow/processing.go | 13 +++++++------ internal/workflow/processing_test.go | 6 +++--- internal/workflow/receipts_test.go | 8 ++++++-- main.go | 10 +++++----- 11 files changed, 42 insertions(+), 41 deletions(-) diff --git a/internal/nha/activities/hari_test.go b/internal/nha/activities/hari_test.go index a38e8272..e10a3818 100644 --- a/internal/nha/activities/hari_test.go +++ b/internal/nha/activities/hari_test.go @@ -19,7 +19,6 @@ import ( collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake" "github.com/artefactual-labs/enduro/internal/nha" - "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/workflow/manager" ) @@ -508,7 +507,6 @@ func createHariActivity(t *testing.T, hariConfig map[string]interface{}) *Update manager := manager.NewManager( logr.Discard(), collectionfake.NewMockService(ctrl), - &pipeline.Registry{}, hooks, ) diff --git a/internal/nha/activities/prod_test.go b/internal/nha/activities/prod_test.go index 84d1566b..1c71c550 100644 --- a/internal/nha/activities/prod_test.go +++ b/internal/nha/activities/prod_test.go @@ -14,7 +14,6 @@ import ( collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake" "github.com/artefactual-labs/enduro/internal/nha" - "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/workflow/manager" ) @@ -164,7 +163,6 @@ func createProdActivity(t *testing.T, hookConfig map[string]interface{}) *Update manager := manager.NewManager( logr.Discard(), collectionfake.NewMockService(ctrl), - &pipeline.Registry{}, map[string]map[string]interface{}{ "prod": hookConfig, }, diff --git a/internal/workflow/activities/download.go b/internal/workflow/activities/download.go index 88c363d2..f961c1ac 100644 --- a/internal/workflow/activities/download.go +++ b/internal/workflow/activities/download.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/temporal" "github.com/artefactual-labs/enduro/internal/watcher" "github.com/artefactual-labs/enduro/internal/workflow/manager" @@ -11,16 +12,17 @@ import ( // DownloadActivity downloads the blob into the pipeline processing directory. type DownloadActivity struct { - manager *manager.Manager - wsvc watcher.Service + manager *manager.Manager + wsvc watcher.Service + pipelineRegistry *pipeline.Registry } -func NewDownloadActivity(m *manager.Manager, wsvc watcher.Service) *DownloadActivity { - return &DownloadActivity{manager: m, wsvc: wsvc} +func NewDownloadActivity(m *manager.Manager, pipelineRegistry *pipeline.Registry, wsvc watcher.Service) *DownloadActivity { + return &DownloadActivity{manager: m, pipelineRegistry: pipelineRegistry, wsvc: wsvc} } func (a *DownloadActivity) Execute(ctx context.Context, pipelineName, watcherName, key string) (string, error) { - p, err := a.manager.Pipelines.ByName(pipelineName) + p, err := a.pipelineRegistry.ByName(pipelineName) if err != nil { return "", temporal.NewNonRetryableError(err) } diff --git a/internal/workflow/activities/hide_package.go b/internal/workflow/activities/hide_package.go index 6a73dcc9..ea4a7db6 100644 --- a/internal/workflow/activities/hide_package.go +++ b/internal/workflow/activities/hide_package.go @@ -4,20 +4,20 @@ import ( "context" "fmt" + "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/temporal" - "github.com/artefactual-labs/enduro/internal/workflow/manager" ) type HidePackageActivity struct { - manager *manager.Manager + pipelineRegistry *pipeline.Registry } -func NewHidePackageActivity(m *manager.Manager) *HidePackageActivity { - return &HidePackageActivity{manager: m} +func NewHidePackageActivity(pipelineRegistry *pipeline.Registry) *HidePackageActivity { + return &HidePackageActivity{pipelineRegistry: pipelineRegistry} } func (a *HidePackageActivity) Execute(ctx context.Context, unitID, unitType, pipelineName string) error { - p, err := a.manager.Pipelines.ByName(pipelineName) + p, err := a.pipelineRegistry.ByName(pipelineName) if err != nil { return temporal.NewNonRetryableError(err) } diff --git a/internal/workflow/activities/transfer.go b/internal/workflow/activities/transfer.go index 6d7b5bc0..dacdc8bb 100644 --- a/internal/workflow/activities/transfer.go +++ b/internal/workflow/activities/transfer.go @@ -7,8 +7,8 @@ import ( "go.artefactual.dev/amclient" + "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/temporal" - "github.com/artefactual-labs/enduro/internal/workflow/manager" ) // TransferActivity submits the transfer to Archivematica and returns its ID. @@ -16,11 +16,11 @@ import ( // This is our first interaction with Archivematica. The workflow ends here // after authentication errors. type TransferActivity struct { - manager *manager.Manager + pipelineRegistry *pipeline.Registry } -func NewTransferActivity(m *manager.Manager) *TransferActivity { - return &TransferActivity{manager: m} +func NewTransferActivity(pipelineRegistry *pipeline.Registry) *TransferActivity { + return &TransferActivity{pipelineRegistry: pipelineRegistry} } type TransferActivityParams struct { @@ -40,7 +40,7 @@ type TransferActivityResponse struct { } func (a *TransferActivity) Execute(ctx context.Context, params *TransferActivityParams) (*TransferActivityResponse, error) { - p, err := a.manager.Pipelines.ByName(params.PipelineName) + p, err := a.pipelineRegistry.ByName(params.PipelineName) if err != nil { return nil, temporal.NewNonRetryableError(err) } diff --git a/internal/workflow/local_activities.go b/internal/workflow/local_activities.go index e88731da..ea12cf31 100644 --- a/internal/workflow/local_activities.go +++ b/internal/workflow/local_activities.go @@ -8,6 +8,7 @@ import ( temporalsdk_activity "go.temporal.io/sdk/activity" "github.com/artefactual-labs/enduro/internal/collection" + "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/workflow/manager" ) @@ -64,8 +65,8 @@ func checkDuplicatePackageLocalActivity(ctx context.Context, logger logr.Logger, return colsvc.CheckDuplicate(ctx, id) } -func loadConfigLocalActivity(ctx context.Context, m *manager.Manager, pipeline string, tinfo *TransferInfo) (*TransferInfo, error) { - p, err := m.Pipelines.ByName(pipeline) +func loadConfigLocalActivity(ctx context.Context, m *manager.Manager, pipelineRegistry *pipeline.Registry, pipeline string, tinfo *TransferInfo) (*TransferInfo, error) { + p, err := pipelineRegistry.ByName(pipeline) if err != nil { m.Logger.Error(err, "Error loading local configuration") return nil, err diff --git a/internal/workflow/manager/manager.go b/internal/workflow/manager/manager.go index a51a9f99..911bfdeb 100644 --- a/internal/workflow/manager/manager.go +++ b/internal/workflow/manager/manager.go @@ -7,23 +7,20 @@ import ( "github.com/go-logr/logr" "github.com/artefactual-labs/enduro/internal/collection" - "github.com/artefactual-labs/enduro/internal/pipeline" ) // Manager carries workflow and activity dependencies. type Manager struct { Logger logr.Logger Collection collection.Service - Pipelines *pipeline.Registry Hooks map[string]map[string]interface{} } // NewManager returns a pointer to a new Manager. -func NewManager(logger logr.Logger, colsvc collection.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager { +func NewManager(logger logr.Logger, colsvc collection.Service, hooks map[string]map[string]interface{}) *Manager { return &Manager{ Logger: logger, Collection: colsvc, - Pipelines: pipelines, Hooks: hooks, } } diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 7b2558ed..a66b374b 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -25,11 +25,12 @@ import ( ) type ProcessingWorkflow struct { - manager *manager.Manager + manager *manager.Manager + pipelineRegistry *pipeline.Registry } -func NewProcessingWorkflow(m *manager.Manager) *ProcessingWorkflow { - return &ProcessingWorkflow{manager: m} +func NewProcessingWorkflow(m *manager.Manager, pipelineRegistry *pipeline.Registry) *ProcessingWorkflow { + return &ProcessingWorkflow{manager: m, pipelineRegistry: pipelineRegistry} } // TransferInfo is shared state that is passed down to activities. It can be @@ -282,7 +283,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll if err := temporalsdk_workflow.SideEffect(ctx, func(ctx temporalsdk_workflow.Context) interface{} { names := req.PipelineNames if len(names) < 1 { - names = w.manager.Pipelines.Names() + names = w.pipelineRegistry.Names() if len(names) < 1 { return "" } @@ -300,7 +301,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll // Load pipeline configuration and hooks. { activityOpts := withLocalActivityWithoutRetriesOpts(ctx) - err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, loadConfigLocalActivity, w.manager, tinfo.PipelineName, tinfo).Get(activityOpts, &tinfo) + err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, loadConfigLocalActivity, w.manager, w.pipelineRegistry, tinfo.PipelineName, tinfo).Get(activityOpts, &tinfo) if err != nil { return fmt.Errorf("error loading configuration: %v", err) } @@ -429,7 +430,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context { var acquired bool var err error - acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.manager.Pipelines, tinfo.PipelineName, tinfo.CollectionID) + acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.pipelineRegistry, tinfo.PipelineName, tinfo.CollectionID) if acquired { defer func() { _ = release(sessCtx) diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 50e0e65c..b09945a6 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -33,7 +33,8 @@ type ProcessingWorkflowTestSuite struct { func (s *ProcessingWorkflowTestSuite) SetupTest() { s.env = s.NewTestWorkflowEnvironment() s.manager = buildManager(s.T(), gomock.NewController(s.T())) - s.workflow = NewProcessingWorkflow(s.manager) + pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{}) + s.workflow = NewProcessingWorkflow(s.manager, pipelineRegistry) } func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) { @@ -52,7 +53,7 @@ func (s *ProcessingWorkflowTestSuite) TestParseErrorIsIgnored() { s.env.OnActivity(nha_activities.ParseNameLocalActivity, mock.Anything, "key").Return(nil, errors.New("parse error")).Once() // loadConfig is executed (workflow continued), returning an error. - s.env.OnActivity(loadConfigLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("pipeline is unavailable")).Once() + s.env.OnActivity(loadConfigLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("pipeline is unavailable")).Once() // Defer updates the package with the error status before returning. s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, &updatePackageLocalActivityParams{ @@ -125,7 +126,6 @@ func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager { return manager.NewManager( logr.Discard(), collectionfake.NewMockService(ctrl), - &pipeline.Registry{}, map[string]map[string]interface{}{ "prod": {"disabled": "false"}, "hari": {"disabled": "false"}, diff --git a/internal/workflow/receipts_test.go b/internal/workflow/receipts_test.go index b07b7eeb..0d926602 100644 --- a/internal/workflow/receipts_test.go +++ b/internal/workflow/receipts_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/go-logr/logr" "github.com/google/uuid" "github.com/stretchr/testify/mock" temporalsdk_activity "go.temporal.io/sdk/activity" @@ -14,6 +15,7 @@ import ( "github.com/artefactual-labs/enduro/internal/nha" nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities" + "github.com/artefactual-labs/enduro/internal/pipeline" ) // sendReceipts exits immediately after an activity error, ensuring that @@ -22,6 +24,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) { wts := temporalsdk_testsuite.WorkflowTestSuite{} env := wts.NewTestWorkflowEnvironment() m := buildManager(t, gomock.NewController(t)) + pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{}) AsyncCompletionActivityName = uuid.New().String() + "-async-completion" env.RegisterActivityWithOptions(NewAsyncCompletionActivity(m.Collection).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName}) @@ -57,7 +60,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) { uint(12345), ).Return("ABANDON", nil).Once() - env.ExecuteWorkflow(NewProcessingWorkflow(m).sendReceipts, ¶ms) + env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry).sendReceipts, ¶ms) assert.Equal(t, env.IsWorkflowCompleted(), true) assert.ErrorContains(t, env.GetWorkflowError(), "error sending hari receipt: user abandoned") @@ -68,6 +71,7 @@ func TestSendReceipts(t *testing.T) { wts := temporalsdk_testsuite.WorkflowTestSuite{} env := wts.NewTestWorkflowEnvironment() m := buildManager(t, gomock.NewController(t)) + pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{}) nha_activities.UpdateHARIActivityName = uuid.New().String() env.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName}) @@ -107,7 +111,7 @@ func TestSendReceipts(t *testing.T) { }, ).Return(nil).Once() - env.ExecuteWorkflow(NewProcessingWorkflow(m).sendReceipts, ¶ms) + env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry).sendReceipts, ¶ms) assert.Equal(t, env.IsWorkflowCompleted(), true) assert.NilError(t, env.GetWorkflowError()) diff --git a/main.go b/main.go index 849b9097..33266d32 100644 --- a/main.go +++ b/main.go @@ -250,7 +250,7 @@ func main() { // TODO: this is a temporary workaround for dependency injection until we // figure out what's the depdencency tree is going to look like after POC. // The share-everything pattern should be avoided. - m := manager.NewManager(logger, colsvc, pipelineRegistry, config.Hooks) + m := manager.NewManager(logger, colsvc, config.Hooks) done := make(chan struct{}) w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{ @@ -262,16 +262,16 @@ func main() { os.Exit(1) } - w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName}) + w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, pipelineRegistry).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName}) w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName}) - w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}) + w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, pipelineRegistry, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}) w.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName}) w.RegisterActivityWithOptions(activities.NewValidateTransferActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ValidateTransferActivityName}) - w.RegisterActivityWithOptions(activities.NewTransferActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName}) + w.RegisterActivityWithOptions(activities.NewTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName}) w.RegisterActivityWithOptions(activities.NewPollTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollTransferActivityName}) w.RegisterActivityWithOptions(activities.NewPollIngestActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollIngestActivityName}) w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName}) - w.RegisterActivityWithOptions(activities.NewHidePackageActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName}) + w.RegisterActivityWithOptions(activities.NewHidePackageActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName}) w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName}) w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName}) w.RegisterActivityWithOptions(activities.NewPopulateMetadataActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PopulateMetadataActivityName})