From bda2417de66b0b390d5b688193fda8061e37e102 Mon Sep 17 00:00:00 2001 From: camlyall Date: Thu, 21 Sep 2023 16:35:37 +0000 Subject: [PATCH] Remove collection service from manager --- internal/nha/activities/hari_test.go | 5 ----- internal/nha/activities/prod_test.go | 5 ----- internal/workflow/manager/manager.go | 17 +++++++---------- internal/workflow/processing.go | 21 +++++++++++---------- internal/workflow/processing_test.go | 6 +++--- internal/workflow/receipts.go | 4 ++-- internal/workflow/receipts_test.go | 11 ++++++++--- main.go | 4 ++-- 8 files changed, 33 insertions(+), 40 deletions(-) diff --git a/internal/nha/activities/hari_test.go b/internal/nha/activities/hari_test.go index a38e8272..b9e072ad 100644 --- a/internal/nha/activities/hari_test.go +++ b/internal/nha/activities/hari_test.go @@ -13,11 +13,9 @@ import ( "github.com/go-logr/logr" temporalsdk_temporal "go.temporal.io/sdk/temporal" - "go.uber.org/mock/gomock" "gotest.tools/v3/assert" "gotest.tools/v3/fs" - collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake" "github.com/artefactual-labs/enduro/internal/nha" "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/workflow/manager" @@ -499,15 +497,12 @@ func testError(t *testing.T, err error, wantErr string, wantNonRetryable bool) { func createHariActivity(t *testing.T, hariConfig map[string]interface{}) *UpdateHARIActivity { t.Helper() - ctrl := gomock.NewController(t) - hooks := map[string]map[string]interface{}{ "hari": hariConfig, } manager := manager.NewManager( logr.Discard(), - collectionfake.NewMockService(ctrl), &pipeline.Registry{}, hooks, ) diff --git a/internal/nha/activities/prod_test.go b/internal/nha/activities/prod_test.go index 84d1566b..c3258745 100644 --- a/internal/nha/activities/prod_test.go +++ b/internal/nha/activities/prod_test.go @@ -8,11 +8,9 @@ import ( "time" "github.com/go-logr/logr" - "go.uber.org/mock/gomock" "gotest.tools/v3/assert" "gotest.tools/v3/fs" - collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake" "github.com/artefactual-labs/enduro/internal/nha" "github.com/artefactual-labs/enduro/internal/pipeline" "github.com/artefactual-labs/enduro/internal/workflow/manager" @@ -159,11 +157,8 @@ func TestProdActivity(t *testing.T) { func createProdActivity(t *testing.T, hookConfig map[string]interface{}) *UpdateProductionSystemActivity { t.Helper() - ctrl := gomock.NewController(t) - manager := manager.NewManager( logr.Discard(), - collectionfake.NewMockService(ctrl), &pipeline.Registry{}, map[string]map[string]interface{}{ "prod": hookConfig, diff --git a/internal/workflow/manager/manager.go b/internal/workflow/manager/manager.go index a51a9f99..e85545e4 100644 --- a/internal/workflow/manager/manager.go +++ b/internal/workflow/manager/manager.go @@ -6,25 +6,22 @@ import ( "github.com/go-logr/logr" - "github.com/artefactual-labs/enduro/internal/collection" "github.com/artefactual-labs/enduro/internal/pipeline" ) // Manager carries workflow and activity dependencies. type Manager struct { - Logger logr.Logger - Collection collection.Service - Pipelines *pipeline.Registry - Hooks map[string]map[string]interface{} + Logger logr.Logger + Pipelines *pipeline.Registry + Hooks map[string]map[string]interface{} } // NewManager returns a pointer to a new Manager. -func NewManager(logger logr.Logger, colsvc collection.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager { +func NewManager(logger logr.Logger, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager { return &Manager{ - Logger: logger, - Collection: colsvc, - Pipelines: pipelines, - Hooks: hooks, + Logger: logger, + Pipelines: pipelines, + Hooks: hooks, } } diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 7b2558ed..5d74655f 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -26,10 +26,11 @@ import ( type ProcessingWorkflow struct { manager *manager.Manager + colsvc collection.Service } -func NewProcessingWorkflow(m *manager.Manager) *ProcessingWorkflow { - return &ProcessingWorkflow{manager: m} +func NewProcessingWorkflow(m *manager.Manager, colsvc collection.Service) *ProcessingWorkflow { + return &ProcessingWorkflow{manager: m, colsvc: colsvc} } // TransferInfo is shared state that is passed down to activities. It can be @@ -198,13 +199,13 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll var err error if req.CollectionID == 0 { - err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.manager.Logger, w.manager.Collection, &createPackageLocalActivityParams{ + err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.manager.Logger, w.colsvc, &createPackageLocalActivityParams{ Key: req.Key, Status: status, }).Get(activityOpts, &tinfo.CollectionID) } else { // TODO: investigate better way to reset the collection. - err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.manager.Collection, &updatePackageLocalActivityParams{ + err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.colsvc, &updatePackageLocalActivityParams{ CollectionID: req.CollectionID, Key: req.Key, PipelineID: "", @@ -231,7 +232,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll // Use disconnected context so it also runs after cancellation. dctx, _ := temporalsdk_workflow.NewDisconnectedContext(ctx) activityOpts := withLocalActivityOpts(dctx) - _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.manager.Collection, &updatePackageLocalActivityParams{ + _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.colsvc, &updatePackageLocalActivityParams{ CollectionID: tinfo.CollectionID, Key: tinfo.Key, PipelineID: tinfo.PipelineID, @@ -247,7 +248,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll if req.RejectDuplicates { var exists bool activityOpts := withLocalActivityOpts(ctx) - err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, checkDuplicatePackageLocalActivity, w.manager.Logger, w.manager.Collection, tinfo.CollectionID).Get(activityOpts, &exists) + err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, checkDuplicatePackageLocalActivity, w.manager.Logger, w.colsvc, tinfo.CollectionID).Get(activityOpts, &exists) if err != nil { return fmt.Errorf("error checking duplicate: %v", err) } @@ -271,7 +272,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll if nameInfo.Identifier != "" { activityOpts = withLocalActivityOpts(ctx) - _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, setOriginalIDLocalActivity, w.manager.Collection, tinfo.CollectionID, nameInfo.Identifier).Get(activityOpts, nil) + _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, setOriginalIDLocalActivity, w.colsvc, tinfo.CollectionID, nameInfo.Identifier).Get(activityOpts, nil) } } @@ -429,7 +430,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context { var acquired bool var err error - acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.manager.Pipelines, tinfo.PipelineName, tinfo.CollectionID) + acquired, release, err = acquirePipeline(sessCtx, w.colsvc, w.manager.Pipelines, tinfo.PipelineName, tinfo.CollectionID) if acquired { defer func() { _ = release(sessCtx) @@ -593,7 +594,7 @@ func (w *ProcessingWorkflow) transfer(sessCtx temporalsdk_workflow.Context, tinf // Persist TransferID + PipelineID. { activityOpts := withLocalActivityOpts(sessCtx) - _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.manager.Collection, &updatePackageLocalActivityParams{ + _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.colsvc, &updatePackageLocalActivityParams{ CollectionID: tinfo.CollectionID, Key: tinfo.Key, Status: collection.StatusInProgress, @@ -619,7 +620,7 @@ func (w *ProcessingWorkflow) transfer(sessCtx temporalsdk_workflow.Context, tinf // Persist SIPID. { activityOpts := withLocalActivityOpts(sessCtx) - _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.manager.Collection, &updatePackageLocalActivityParams{ + _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.colsvc, &updatePackageLocalActivityParams{ CollectionID: tinfo.CollectionID, Key: tinfo.Key, TransferID: tinfo.TransferID, diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 50e0e65c..7eadcf01 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -31,9 +31,10 @@ type ProcessingWorkflowTestSuite struct { } func (s *ProcessingWorkflowTestSuite) SetupTest() { + ctrl := gomock.NewController(s.T()) s.env = s.NewTestWorkflowEnvironment() - s.manager = buildManager(s.T(), gomock.NewController(s.T())) - s.workflow = NewProcessingWorkflow(s.manager) + s.manager = buildManager(s.T(), ctrl) + s.workflow = NewProcessingWorkflow(s.manager, collectionfake.NewMockService(ctrl)) } func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) { @@ -124,7 +125,6 @@ func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager { return manager.NewManager( logr.Discard(), - collectionfake.NewMockService(ctrl), &pipeline.Registry{}, map[string]map[string]interface{}{ "prod": {"disabled": "false"}, diff --git a/internal/workflow/receipts.go b/internal/workflow/receipts.go index cf271c23..cd9c2058 100644 --- a/internal/workflow/receipts.go +++ b/internal/workflow/receipts.go @@ -29,7 +29,7 @@ func (w *ProcessingWorkflow) sendReceipts(ctx temporalsdk_workflow.Context, para MaximumAttempts: 1, }, } - err := executeActivityWithAsyncErrorHandling(ctx, w.manager.Collection, params.CollectionID, opts, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{ + err := executeActivityWithAsyncErrorHandling(ctx, w.colsvc, params.CollectionID, opts, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{ SIPID: params.SIPID, StoredAt: params.StoredAt, FullPath: params.FullPath, @@ -48,7 +48,7 @@ func (w *ProcessingWorkflow) sendReceipts(ctx temporalsdk_workflow.Context, para MaximumAttempts: 1, }, } - err := executeActivityWithAsyncErrorHandling(ctx, w.manager.Collection, params.CollectionID, opts, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{ + err := executeActivityWithAsyncErrorHandling(ctx, w.colsvc, params.CollectionID, opts, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{ StoredAt: params.StoredAt, PipelineName: params.PipelineName, NameInfo: params.NameInfo, diff --git a/internal/workflow/receipts_test.go b/internal/workflow/receipts_test.go index b07b7eeb..75778a2f 100644 --- a/internal/workflow/receipts_test.go +++ b/internal/workflow/receipts_test.go @@ -12,6 +12,7 @@ import ( "go.uber.org/mock/gomock" "gotest.tools/v3/assert" + collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake" "github.com/artefactual-labs/enduro/internal/nha" nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities" ) @@ -22,9 +23,11 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) { wts := temporalsdk_testsuite.WorkflowTestSuite{} env := wts.NewTestWorkflowEnvironment() m := buildManager(t, gomock.NewController(t)) + ctrl := gomock.NewController(t) + colsvc := collectionfake.NewMockService(ctrl) AsyncCompletionActivityName = uuid.New().String() + "-async-completion" - env.RegisterActivityWithOptions(NewAsyncCompletionActivity(m.Collection).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName}) + env.RegisterActivityWithOptions(NewAsyncCompletionActivity(colsvc).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName}) nha_activities.UpdateHARIActivityName = uuid.New().String() + "-update-hary" env.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName}) @@ -57,7 +60,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) { uint(12345), ).Return("ABANDON", nil).Once() - env.ExecuteWorkflow(NewProcessingWorkflow(m).sendReceipts, ¶ms) + env.ExecuteWorkflow(NewProcessingWorkflow(m, colsvc).sendReceipts, ¶ms) assert.Equal(t, env.IsWorkflowCompleted(), true) assert.ErrorContains(t, env.GetWorkflowError(), "error sending hari receipt: user abandoned") @@ -68,6 +71,8 @@ func TestSendReceipts(t *testing.T) { wts := temporalsdk_testsuite.WorkflowTestSuite{} env := wts.NewTestWorkflowEnvironment() m := buildManager(t, gomock.NewController(t)) + ctrl := gomock.NewController(t) + colsvc := collectionfake.NewMockService(ctrl) nha_activities.UpdateHARIActivityName = uuid.New().String() env.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName}) @@ -107,7 +112,7 @@ func TestSendReceipts(t *testing.T) { }, ).Return(nil).Once() - env.ExecuteWorkflow(NewProcessingWorkflow(m).sendReceipts, ¶ms) + env.ExecuteWorkflow(NewProcessingWorkflow(m, colsvc).sendReceipts, ¶ms) assert.Equal(t, env.IsWorkflowCompleted(), true) assert.NilError(t, env.GetWorkflowError()) diff --git a/main.go b/main.go index 849b9097..fc8f1f10 100644 --- a/main.go +++ b/main.go @@ -250,7 +250,7 @@ func main() { // TODO: this is a temporary workaround for dependency injection until we // figure out what's the depdencency tree is going to look like after POC. // The share-everything pattern should be avoided. - m := manager.NewManager(logger, colsvc, pipelineRegistry, config.Hooks) + m := manager.NewManager(logger, pipelineRegistry, config.Hooks) done := make(chan struct{}) w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{ @@ -262,7 +262,7 @@ func main() { os.Exit(1) } - w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName}) + w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, colsvc).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName}) w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName}) w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}) w.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})