diff --git a/internal/nha/activities/hari_test.go b/internal/nha/activities/hari_test.go index 51d396ba..382f0f76 100644 --- a/internal/nha/activities/hari_test.go +++ b/internal/nha/activities/hari_test.go @@ -12,11 +12,9 @@ import ( "time" temporalsdk_temporal "go.temporal.io/sdk/temporal" - "go.uber.org/mock/gomock" "gotest.tools/v3/assert" "gotest.tools/v3/fs" - collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake" "github.com/artefactual-labs/enduro/internal/nha" "github.com/artefactual-labs/enduro/internal/workflow/manager" ) @@ -497,14 +495,11 @@ func testError(t *testing.T, err error, wantErr string, wantNonRetryable bool) { func createHariActivity(t *testing.T, hariConfig map[string]interface{}) *UpdateHARIActivity { t.Helper() - ctrl := gomock.NewController(t) - hooks := map[string]map[string]interface{}{ "hari": hariConfig, } manager := manager.NewManager( - collectionfake.NewMockService(ctrl), hooks, ) diff --git a/internal/nha/activities/prod_test.go b/internal/nha/activities/prod_test.go index 7598def1..34ea1fdb 100644 --- a/internal/nha/activities/prod_test.go +++ b/internal/nha/activities/prod_test.go @@ -7,11 +7,9 @@ import ( "testing" "time" - "go.uber.org/mock/gomock" "gotest.tools/v3/assert" "gotest.tools/v3/fs" - collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake" "github.com/artefactual-labs/enduro/internal/nha" "github.com/artefactual-labs/enduro/internal/workflow/manager" ) @@ -157,10 +155,7 @@ func TestProdActivity(t *testing.T) { func createProdActivity(t *testing.T, hookConfig map[string]interface{}) *UpdateProductionSystemActivity { t.Helper() - ctrl := gomock.NewController(t) - manager := manager.NewManager( - collectionfake.NewMockService(ctrl), map[string]map[string]interface{}{ "prod": hookConfig, }, diff --git a/internal/workflow/manager/manager.go b/internal/workflow/manager/manager.go index 86f1480d..096c40d0 100644 --- a/internal/workflow/manager/manager.go +++ b/internal/workflow/manager/manager.go @@ -3,21 +3,17 @@ package manager import ( "fmt" "strings" - - "github.com/artefactual-labs/enduro/internal/collection" ) // Manager carries workflow and activity dependencies. type Manager struct { - Collection collection.Service - Hooks map[string]map[string]interface{} + Hooks map[string]map[string]interface{} } // NewManager returns a pointer to a new Manager. -func NewManager(colsvc collection.Service, hooks map[string]map[string]interface{}) *Manager { +func NewManager(hooks map[string]map[string]interface{}) *Manager { return &Manager{ - Collection: colsvc, - Hooks: hooks, + Hooks: hooks, } } diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 9757c222..ed52ef98 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -27,12 +27,13 @@ import ( type ProcessingWorkflow struct { manager *manager.Manager + colsvc collection.Service pipelineRegistry *pipeline.Registry logger logr.Logger } -func NewProcessingWorkflow(m *manager.Manager, pipelineRegistry *pipeline.Registry, l logr.Logger) *ProcessingWorkflow { - return &ProcessingWorkflow{manager: m, pipelineRegistry: pipelineRegistry, logger: l} +func NewProcessingWorkflow(m *manager.Manager, colsvc collection.Service, pipelineRegistry *pipeline.Registry, l logr.Logger) *ProcessingWorkflow { + return &ProcessingWorkflow{manager: m, colsvc: colsvc, pipelineRegistry: pipelineRegistry, logger: l} } // TransferInfo is shared state that is passed down to activities. It can be @@ -201,13 +202,13 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll var err error if req.CollectionID == 0 { - err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.logger, w.manager.Collection, &createPackageLocalActivityParams{ + err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.logger, w.colsvc, &createPackageLocalActivityParams{ Key: req.Key, Status: status, }).Get(activityOpts, &tinfo.CollectionID) } else { // TODO: investigate better way to reset the collection. - err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{ + err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{ CollectionID: req.CollectionID, Key: req.Key, PipelineID: "", @@ -234,7 +235,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll // Use disconnected context so it also runs after cancellation. dctx, _ := temporalsdk_workflow.NewDisconnectedContext(ctx) activityOpts := withLocalActivityOpts(dctx) - _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{ + _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{ CollectionID: tinfo.CollectionID, Key: tinfo.Key, PipelineID: tinfo.PipelineID, @@ -250,7 +251,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll if req.RejectDuplicates { var exists bool activityOpts := withLocalActivityOpts(ctx) - err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, checkDuplicatePackageLocalActivity, w.logger, w.manager.Collection, tinfo.CollectionID).Get(activityOpts, &exists) + err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, checkDuplicatePackageLocalActivity, w.logger, w.colsvc, tinfo.CollectionID).Get(activityOpts, &exists) if err != nil { return fmt.Errorf("error checking duplicate: %v", err) } @@ -274,7 +275,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll if nameInfo.Identifier != "" { activityOpts = withLocalActivityOpts(ctx) - _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, setOriginalIDLocalActivity, w.manager.Collection, tinfo.CollectionID, nameInfo.Identifier).Get(activityOpts, nil) + _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, setOriginalIDLocalActivity, w.colsvc, tinfo.CollectionID, nameInfo.Identifier).Get(activityOpts, nil) } } @@ -432,7 +433,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context { var acquired bool var err error - acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.pipelineRegistry, tinfo.PipelineName, tinfo.CollectionID) + acquired, release, err = acquirePipeline(sessCtx, w.colsvc, w.pipelineRegistry, tinfo.PipelineName, tinfo.CollectionID) if acquired { defer func() { _ = release(sessCtx) @@ -596,7 +597,7 @@ func (w *ProcessingWorkflow) transfer(sessCtx temporalsdk_workflow.Context, tinf // Persist TransferID + PipelineID. { activityOpts := withLocalActivityOpts(sessCtx) - _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{ + _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{ CollectionID: tinfo.CollectionID, Key: tinfo.Key, Status: collection.StatusInProgress, @@ -622,7 +623,7 @@ func (w *ProcessingWorkflow) transfer(sessCtx temporalsdk_workflow.Context, tinf // Persist SIPID. { activityOpts := withLocalActivityOpts(sessCtx) - _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{ + _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{ CollectionID: tinfo.CollectionID, Key: tinfo.Key, TransferID: tinfo.TransferID, diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 65f7f88e..8178d0ad 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -31,10 +31,11 @@ type ProcessingWorkflowTestSuite struct { } func (s *ProcessingWorkflowTestSuite) SetupTest() { + ctrl := gomock.NewController(s.T()) s.env = s.NewTestWorkflowEnvironment() - s.manager = buildManager(s.T(), gomock.NewController(s.T())) + s.manager = buildManager(s.T(), ctrl) pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{}) - s.workflow = NewProcessingWorkflow(s.manager, pipelineRegistry, logr.Discard()) + s.workflow = NewProcessingWorkflow(s.manager, collectionfake.NewMockService(ctrl), pipelineRegistry, logr.Discard()) } func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) { @@ -124,7 +125,6 @@ func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager { t.Helper() return manager.NewManager( - collectionfake.NewMockService(ctrl), map[string]map[string]interface{}{ "prod": {"disabled": "false"}, "hari": {"disabled": "false"}, diff --git a/internal/workflow/receipts.go b/internal/workflow/receipts.go index cf271c23..cd9c2058 100644 --- a/internal/workflow/receipts.go +++ b/internal/workflow/receipts.go @@ -29,7 +29,7 @@ func (w *ProcessingWorkflow) sendReceipts(ctx temporalsdk_workflow.Context, para MaximumAttempts: 1, }, } - err := executeActivityWithAsyncErrorHandling(ctx, w.manager.Collection, params.CollectionID, opts, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{ + err := executeActivityWithAsyncErrorHandling(ctx, w.colsvc, params.CollectionID, opts, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{ SIPID: params.SIPID, StoredAt: params.StoredAt, FullPath: params.FullPath, @@ -48,7 +48,7 @@ func (w *ProcessingWorkflow) sendReceipts(ctx temporalsdk_workflow.Context, para MaximumAttempts: 1, }, } - err := executeActivityWithAsyncErrorHandling(ctx, w.manager.Collection, params.CollectionID, opts, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{ + err := executeActivityWithAsyncErrorHandling(ctx, w.colsvc, params.CollectionID, opts, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{ StoredAt: params.StoredAt, PipelineName: params.PipelineName, NameInfo: params.NameInfo, diff --git a/internal/workflow/receipts_test.go b/internal/workflow/receipts_test.go index e8ef2acf..89defa86 100644 --- a/internal/workflow/receipts_test.go +++ b/internal/workflow/receipts_test.go @@ -13,6 +13,7 @@ import ( "go.uber.org/mock/gomock" "gotest.tools/v3/assert" + collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake" "github.com/artefactual-labs/enduro/internal/nha" nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities" "github.com/artefactual-labs/enduro/internal/pipeline" @@ -24,10 +25,12 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) { wts := temporalsdk_testsuite.WorkflowTestSuite{} env := wts.NewTestWorkflowEnvironment() m := buildManager(t, gomock.NewController(t)) + ctrl := gomock.NewController(t) + colsvc := collectionfake.NewMockService(ctrl) pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{}) AsyncCompletionActivityName = uuid.New().String() + "-async-completion" - env.RegisterActivityWithOptions(NewAsyncCompletionActivity(m.Collection).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName}) + env.RegisterActivityWithOptions(NewAsyncCompletionActivity(colsvc).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName}) nha_activities.UpdateHARIActivityName = uuid.New().String() + "-update-hary" env.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName}) @@ -60,7 +63,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) { uint(12345), ).Return("ABANDON", nil).Once() - env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, ¶ms) + env.ExecuteWorkflow(NewProcessingWorkflow(m, colsvc, pipelineRegistry, logr.Discard()).sendReceipts, ¶ms) assert.Equal(t, env.IsWorkflowCompleted(), true) assert.ErrorContains(t, env.GetWorkflowError(), "error sending hari receipt: user abandoned") @@ -71,6 +74,8 @@ func TestSendReceipts(t *testing.T) { wts := temporalsdk_testsuite.WorkflowTestSuite{} env := wts.NewTestWorkflowEnvironment() m := buildManager(t, gomock.NewController(t)) + ctrl := gomock.NewController(t) + colsvc := collectionfake.NewMockService(ctrl) pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{}) nha_activities.UpdateHARIActivityName = uuid.New().String() @@ -111,7 +116,7 @@ func TestSendReceipts(t *testing.T) { }, ).Return(nil).Once() - env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, ¶ms) + env.ExecuteWorkflow(NewProcessingWorkflow(m, colsvc, pipelineRegistry, logr.Discard()).sendReceipts, ¶ms) assert.Equal(t, env.IsWorkflowCompleted(), true) assert.NilError(t, env.GetWorkflowError()) diff --git a/main.go b/main.go index 52b3db26..38d21e1e 100644 --- a/main.go +++ b/main.go @@ -250,7 +250,7 @@ func main() { // TODO: this is a temporary workaround for dependency injection until we // figure out what's the depdencency tree is going to look like after POC. // The share-everything pattern should be avoided. - m := manager.NewManager(colsvc, config.Hooks) + m := manager.NewManager(config.Hooks) done := make(chan struct{}) w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{ @@ -262,7 +262,7 @@ func main() { os.Exit(1) } - w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, pipelineRegistry, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName}) + w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, colsvc, pipelineRegistry, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName}) w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName}) w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, pipelineRegistry, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}) w.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})