Skip to content

Commit

Permalink
Remove collection service from manager
Browse files Browse the repository at this point in the history
  • Loading branch information
camlyall committed Sep 21, 2023
1 parent f25ec6b commit bda2417
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 40 deletions.
5 changes: 0 additions & 5 deletions internal/nha/activities/hari_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ import (

"github.com/go-logr/logr"
temporalsdk_temporal "go.temporal.io/sdk/temporal"
"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"
"gotest.tools/v3/fs"

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
Expand Down Expand Up @@ -499,15 +497,12 @@ func testError(t *testing.T, err error, wantErr string, wantNonRetryable bool) {
func createHariActivity(t *testing.T, hariConfig map[string]interface{}) *UpdateHARIActivity {
t.Helper()

ctrl := gomock.NewController(t)

hooks := map[string]map[string]interface{}{
"hari": hariConfig,
}

manager := manager.NewManager(
logr.Discard(),
collectionfake.NewMockService(ctrl),
&pipeline.Registry{},
hooks,
)
Expand Down
5 changes: 0 additions & 5 deletions internal/nha/activities/prod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import (
"time"

"github.com/go-logr/logr"
"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"
"gotest.tools/v3/fs"

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
Expand Down Expand Up @@ -159,11 +157,8 @@ func TestProdActivity(t *testing.T) {
func createProdActivity(t *testing.T, hookConfig map[string]interface{}) *UpdateProductionSystemActivity {
t.Helper()

ctrl := gomock.NewController(t)

manager := manager.NewManager(
logr.Discard(),
collectionfake.NewMockService(ctrl),
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": hookConfig,
Expand Down
17 changes: 7 additions & 10 deletions internal/workflow/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,22 @@ import (

"github.com/go-logr/logr"

"github.com/artefactual-labs/enduro/internal/collection"
"github.com/artefactual-labs/enduro/internal/pipeline"
)

// Manager carries workflow and activity dependencies.
type Manager struct {
Logger logr.Logger
Collection collection.Service
Pipelines *pipeline.Registry
Hooks map[string]map[string]interface{}
Logger logr.Logger
Pipelines *pipeline.Registry
Hooks map[string]map[string]interface{}
}

// NewManager returns a pointer to a new Manager.
func NewManager(logger logr.Logger, colsvc collection.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager {
func NewManager(logger logr.Logger, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager {
return &Manager{
Logger: logger,
Collection: colsvc,
Pipelines: pipelines,
Hooks: hooks,
Logger: logger,
Pipelines: pipelines,
Hooks: hooks,
}
}

Expand Down
21 changes: 11 additions & 10 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (

type ProcessingWorkflow struct {
manager *manager.Manager
colsvc collection.Service
}

func NewProcessingWorkflow(m *manager.Manager) *ProcessingWorkflow {
return &ProcessingWorkflow{manager: m}
func NewProcessingWorkflow(m *manager.Manager, colsvc collection.Service) *ProcessingWorkflow {
return &ProcessingWorkflow{manager: m, colsvc: colsvc}
}

// TransferInfo is shared state that is passed down to activities. It can be
Expand Down Expand Up @@ -198,13 +199,13 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll
var err error

if req.CollectionID == 0 {
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.manager.Logger, w.manager.Collection, &createPackageLocalActivityParams{
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.manager.Logger, w.colsvc, &createPackageLocalActivityParams{
Key: req.Key,
Status: status,
}).Get(activityOpts, &tinfo.CollectionID)
} else {
// TODO: investigate better way to reset the collection.
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.manager.Collection, &updatePackageLocalActivityParams{
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.colsvc, &updatePackageLocalActivityParams{

Check warning on line 208 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L208

Added line #L208 was not covered by tests
CollectionID: req.CollectionID,
Key: req.Key,
PipelineID: "",
Expand All @@ -231,7 +232,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll
// Use disconnected context so it also runs after cancellation.
dctx, _ := temporalsdk_workflow.NewDisconnectedContext(ctx)
activityOpts := withLocalActivityOpts(dctx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.manager.Collection, &updatePackageLocalActivityParams{
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.colsvc, &updatePackageLocalActivityParams{
CollectionID: tinfo.CollectionID,
Key: tinfo.Key,
PipelineID: tinfo.PipelineID,
Expand All @@ -247,7 +248,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll
if req.RejectDuplicates {
var exists bool
activityOpts := withLocalActivityOpts(ctx)
err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, checkDuplicatePackageLocalActivity, w.manager.Logger, w.manager.Collection, tinfo.CollectionID).Get(activityOpts, &exists)
err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, checkDuplicatePackageLocalActivity, w.manager.Logger, w.colsvc, tinfo.CollectionID).Get(activityOpts, &exists)

Check warning on line 251 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L251

Added line #L251 was not covered by tests
if err != nil {
return fmt.Errorf("error checking duplicate: %v", err)
}
Expand All @@ -271,7 +272,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll

if nameInfo.Identifier != "" {
activityOpts = withLocalActivityOpts(ctx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, setOriginalIDLocalActivity, w.manager.Collection, tinfo.CollectionID, nameInfo.Identifier).Get(activityOpts, nil)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, setOriginalIDLocalActivity, w.colsvc, tinfo.CollectionID, nameInfo.Identifier).Get(activityOpts, nil)

Check warning on line 275 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L275

Added line #L275 was not covered by tests
}
}

Expand Down Expand Up @@ -429,7 +430,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
{
var acquired bool
var err error
acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.manager.Pipelines, tinfo.PipelineName, tinfo.CollectionID)
acquired, release, err = acquirePipeline(sessCtx, w.colsvc, w.manager.Pipelines, tinfo.PipelineName, tinfo.CollectionID)

Check warning on line 433 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L433

Added line #L433 was not covered by tests
if acquired {
defer func() {
_ = release(sessCtx)
Expand Down Expand Up @@ -593,7 +594,7 @@ func (w *ProcessingWorkflow) transfer(sessCtx temporalsdk_workflow.Context, tinf
// Persist TransferID + PipelineID.
{
activityOpts := withLocalActivityOpts(sessCtx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.manager.Collection, &updatePackageLocalActivityParams{
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.colsvc, &updatePackageLocalActivityParams{

Check warning on line 597 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L597

Added line #L597 was not covered by tests
CollectionID: tinfo.CollectionID,
Key: tinfo.Key,
Status: collection.StatusInProgress,
Expand All @@ -619,7 +620,7 @@ func (w *ProcessingWorkflow) transfer(sessCtx temporalsdk_workflow.Context, tinf
// Persist SIPID.
{
activityOpts := withLocalActivityOpts(sessCtx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.manager.Collection, &updatePackageLocalActivityParams{
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.manager.Logger, w.colsvc, &updatePackageLocalActivityParams{

Check warning on line 623 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L623

Added line #L623 was not covered by tests
CollectionID: tinfo.CollectionID,
Key: tinfo.Key,
TransferID: tinfo.TransferID,
Expand Down
6 changes: 3 additions & 3 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ type ProcessingWorkflowTestSuite struct {
}

func (s *ProcessingWorkflowTestSuite) SetupTest() {
ctrl := gomock.NewController(s.T())
s.env = s.NewTestWorkflowEnvironment()
s.manager = buildManager(s.T(), gomock.NewController(s.T()))
s.workflow = NewProcessingWorkflow(s.manager)
s.manager = buildManager(s.T(), ctrl)
s.workflow = NewProcessingWorkflow(s.manager, collectionfake.NewMockService(ctrl))
}

func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand Down Expand Up @@ -124,7 +125,6 @@ func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager {

return manager.NewManager(
logr.Discard(),
collectionfake.NewMockService(ctrl),
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": {"disabled": "false"},
Expand Down
4 changes: 2 additions & 2 deletions internal/workflow/receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (w *ProcessingWorkflow) sendReceipts(ctx temporalsdk_workflow.Context, para
MaximumAttempts: 1,
},
}
err := executeActivityWithAsyncErrorHandling(ctx, w.manager.Collection, params.CollectionID, opts, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{
err := executeActivityWithAsyncErrorHandling(ctx, w.colsvc, params.CollectionID, opts, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{
SIPID: params.SIPID,
StoredAt: params.StoredAt,
FullPath: params.FullPath,
Expand All @@ -48,7 +48,7 @@ func (w *ProcessingWorkflow) sendReceipts(ctx temporalsdk_workflow.Context, para
MaximumAttempts: 1,
},
}
err := executeActivityWithAsyncErrorHandling(ctx, w.manager.Collection, params.CollectionID, opts, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{
err := executeActivityWithAsyncErrorHandling(ctx, w.colsvc, params.CollectionID, opts, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{
StoredAt: params.StoredAt,
PipelineName: params.PipelineName,
NameInfo: params.NameInfo,
Expand Down
11 changes: 8 additions & 3 deletions internal/workflow/receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities"
)
Expand All @@ -22,9 +23,11 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) {
wts := temporalsdk_testsuite.WorkflowTestSuite{}
env := wts.NewTestWorkflowEnvironment()
m := buildManager(t, gomock.NewController(t))
ctrl := gomock.NewController(t)
colsvc := collectionfake.NewMockService(ctrl)

AsyncCompletionActivityName = uuid.New().String() + "-async-completion"
env.RegisterActivityWithOptions(NewAsyncCompletionActivity(m.Collection).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName})
env.RegisterActivityWithOptions(NewAsyncCompletionActivity(colsvc).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName})

nha_activities.UpdateHARIActivityName = uuid.New().String() + "-update-hary"
env.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName})
Expand Down Expand Up @@ -57,7 +60,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) {
uint(12345),
).Return("ABANDON", nil).Once()

env.ExecuteWorkflow(NewProcessingWorkflow(m).sendReceipts, &params)
env.ExecuteWorkflow(NewProcessingWorkflow(m, colsvc).sendReceipts, &params)

assert.Equal(t, env.IsWorkflowCompleted(), true)
assert.ErrorContains(t, env.GetWorkflowError(), "error sending hari receipt: user abandoned")
Expand All @@ -68,6 +71,8 @@ func TestSendReceipts(t *testing.T) {
wts := temporalsdk_testsuite.WorkflowTestSuite{}
env := wts.NewTestWorkflowEnvironment()
m := buildManager(t, gomock.NewController(t))
ctrl := gomock.NewController(t)
colsvc := collectionfake.NewMockService(ctrl)

nha_activities.UpdateHARIActivityName = uuid.New().String()
env.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName})
Expand Down Expand Up @@ -107,7 +112,7 @@ func TestSendReceipts(t *testing.T) {
},
).Return(nil).Once()

env.ExecuteWorkflow(NewProcessingWorkflow(m).sendReceipts, &params)
env.ExecuteWorkflow(NewProcessingWorkflow(m, colsvc).sendReceipts, &params)

assert.Equal(t, env.IsWorkflowCompleted(), true)
assert.NilError(t, env.GetWorkflowError())
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func main() {
// TODO: this is a temporary workaround for dependency injection until we
// figure out what's the depdencency tree is going to look like after POC.
// The share-everything pattern should be avoided.
m := manager.NewManager(logger, colsvc, pipelineRegistry, config.Hooks)
m := manager.NewManager(logger, pipelineRegistry, config.Hooks)

done := make(chan struct{})
w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{
Expand All @@ -262,7 +262,7 @@ func main() {
os.Exit(1)
}

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, colsvc).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName})
w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
Expand Down

0 comments on commit bda2417

Please sign in to comment.