Skip to content

Commit

Permalink
Remove collection service from manager
Browse files Browse the repository at this point in the history
  • Loading branch information
camlyall committed Sep 25, 2023
1 parent 2dc58c5 commit f28ee4d
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 40 deletions.
5 changes: 0 additions & 5 deletions internal/nha/activities/hari_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ import (
"time"

temporalsdk_temporal "go.temporal.io/sdk/temporal"
"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"
"gotest.tools/v3/fs"

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)
Expand Down Expand Up @@ -497,14 +495,11 @@ func testError(t *testing.T, err error, wantErr string, wantNonRetryable bool) {
func createHariActivity(t *testing.T, hariConfig map[string]interface{}) *UpdateHARIActivity {
t.Helper()

ctrl := gomock.NewController(t)

hooks := map[string]map[string]interface{}{
"hari": hariConfig,
}

manager := manager.NewManager(
collectionfake.NewMockService(ctrl),
hooks,
)

Expand Down
5 changes: 0 additions & 5 deletions internal/nha/activities/prod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"testing"
"time"

"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"
"gotest.tools/v3/fs"

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)
Expand Down Expand Up @@ -157,10 +155,7 @@ func TestProdActivity(t *testing.T) {
func createProdActivity(t *testing.T, hookConfig map[string]interface{}) *UpdateProductionSystemActivity {
t.Helper()

ctrl := gomock.NewController(t)

manager := manager.NewManager(
collectionfake.NewMockService(ctrl),
map[string]map[string]interface{}{
"prod": hookConfig,
},
Expand Down
10 changes: 3 additions & 7 deletions internal/workflow/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,17 @@ package manager
import (
"fmt"
"strings"

"github.com/artefactual-labs/enduro/internal/collection"
)

// Manager carries workflow and activity dependencies.
type Manager struct {
Collection collection.Service
Hooks map[string]map[string]interface{}
Hooks map[string]map[string]interface{}
}

// NewManager returns a pointer to a new Manager.
func NewManager(colsvc collection.Service, hooks map[string]map[string]interface{}) *Manager {
func NewManager(hooks map[string]map[string]interface{}) *Manager {
return &Manager{
Collection: colsvc,
Hooks: hooks,
Hooks: hooks,
}
}

Expand Down
21 changes: 11 additions & 10 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import (

type ProcessingWorkflow struct {
manager *manager.Manager
colsvc collection.Service
pipelineRegistry *pipeline.Registry
logger logr.Logger
}

func NewProcessingWorkflow(m *manager.Manager, pipelineRegistry *pipeline.Registry, l logr.Logger) *ProcessingWorkflow {
return &ProcessingWorkflow{manager: m, pipelineRegistry: pipelineRegistry, logger: l}
func NewProcessingWorkflow(m *manager.Manager, colsvc collection.Service, pipelineRegistry *pipeline.Registry, l logr.Logger) *ProcessingWorkflow {
return &ProcessingWorkflow{manager: m, colsvc: colsvc, pipelineRegistry: pipelineRegistry, logger: l}
}

// TransferInfo is shared state that is passed down to activities. It can be
Expand Down Expand Up @@ -201,13 +202,13 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll
var err error

if req.CollectionID == 0 {
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.logger, w.manager.Collection, &createPackageLocalActivityParams{
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.logger, w.colsvc, &createPackageLocalActivityParams{
Key: req.Key,
Status: status,
}).Get(activityOpts, &tinfo.CollectionID)
} else {
// TODO: investigate better way to reset the collection.
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{

Check warning on line 211 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L211

Added line #L211 was not covered by tests
CollectionID: req.CollectionID,
Key: req.Key,
PipelineID: "",
Expand All @@ -234,7 +235,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll
// Use disconnected context so it also runs after cancellation.
dctx, _ := temporalsdk_workflow.NewDisconnectedContext(ctx)
activityOpts := withLocalActivityOpts(dctx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{
CollectionID: tinfo.CollectionID,
Key: tinfo.Key,
PipelineID: tinfo.PipelineID,
Expand All @@ -250,7 +251,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll
if req.RejectDuplicates {
var exists bool
activityOpts := withLocalActivityOpts(ctx)
err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, checkDuplicatePackageLocalActivity, w.logger, w.manager.Collection, tinfo.CollectionID).Get(activityOpts, &exists)
err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, checkDuplicatePackageLocalActivity, w.logger, w.colsvc, tinfo.CollectionID).Get(activityOpts, &exists)

Check warning on line 254 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L254

Added line #L254 was not covered by tests
if err != nil {
return fmt.Errorf("error checking duplicate: %v", err)
}
Expand All @@ -274,7 +275,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll

if nameInfo.Identifier != "" {
activityOpts = withLocalActivityOpts(ctx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, setOriginalIDLocalActivity, w.manager.Collection, tinfo.CollectionID, nameInfo.Identifier).Get(activityOpts, nil)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, setOriginalIDLocalActivity, w.colsvc, tinfo.CollectionID, nameInfo.Identifier).Get(activityOpts, nil)

Check warning on line 278 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L278

Added line #L278 was not covered by tests
}
}

Expand Down Expand Up @@ -432,7 +433,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
{
var acquired bool
var err error
acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.pipelineRegistry, tinfo.PipelineName, tinfo.CollectionID)
acquired, release, err = acquirePipeline(sessCtx, w.colsvc, w.pipelineRegistry, tinfo.PipelineName, tinfo.CollectionID)

Check warning on line 436 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L436

Added line #L436 was not covered by tests
if acquired {
defer func() {
_ = release(sessCtx)
Expand Down Expand Up @@ -596,7 +597,7 @@ func (w *ProcessingWorkflow) transfer(sessCtx temporalsdk_workflow.Context, tinf
// Persist TransferID + PipelineID.
{
activityOpts := withLocalActivityOpts(sessCtx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{

Check warning on line 600 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L600

Added line #L600 was not covered by tests
CollectionID: tinfo.CollectionID,
Key: tinfo.Key,
Status: collection.StatusInProgress,
Expand All @@ -622,7 +623,7 @@ func (w *ProcessingWorkflow) transfer(sessCtx temporalsdk_workflow.Context, tinf
// Persist SIPID.
{
activityOpts := withLocalActivityOpts(sessCtx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{

Check warning on line 626 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L626

Added line #L626 was not covered by tests
CollectionID: tinfo.CollectionID,
Key: tinfo.Key,
TransferID: tinfo.TransferID,
Expand Down
6 changes: 3 additions & 3 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ type ProcessingWorkflowTestSuite struct {
}

func (s *ProcessingWorkflowTestSuite) SetupTest() {
ctrl := gomock.NewController(s.T())
s.env = s.NewTestWorkflowEnvironment()
s.manager = buildManager(s.T(), gomock.NewController(s.T()))
s.manager = buildManager(s.T(), ctrl)
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})
s.workflow = NewProcessingWorkflow(s.manager, pipelineRegistry, logr.Discard())
s.workflow = NewProcessingWorkflow(s.manager, collectionfake.NewMockService(ctrl), pipelineRegistry, logr.Discard())
}

func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand Down Expand Up @@ -124,7 +125,6 @@ func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager {
t.Helper()

return manager.NewManager(
collectionfake.NewMockService(ctrl),
map[string]map[string]interface{}{
"prod": {"disabled": "false"},
"hari": {"disabled": "false"},
Expand Down
4 changes: 2 additions & 2 deletions internal/workflow/receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (w *ProcessingWorkflow) sendReceipts(ctx temporalsdk_workflow.Context, para
MaximumAttempts: 1,
},
}
err := executeActivityWithAsyncErrorHandling(ctx, w.manager.Collection, params.CollectionID, opts, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{
err := executeActivityWithAsyncErrorHandling(ctx, w.colsvc, params.CollectionID, opts, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{
SIPID: params.SIPID,
StoredAt: params.StoredAt,
FullPath: params.FullPath,
Expand All @@ -48,7 +48,7 @@ func (w *ProcessingWorkflow) sendReceipts(ctx temporalsdk_workflow.Context, para
MaximumAttempts: 1,
},
}
err := executeActivityWithAsyncErrorHandling(ctx, w.manager.Collection, params.CollectionID, opts, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{
err := executeActivityWithAsyncErrorHandling(ctx, w.colsvc, params.CollectionID, opts, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{
StoredAt: params.StoredAt,
PipelineName: params.PipelineName,
NameInfo: params.NameInfo,
Expand Down
11 changes: 8 additions & 3 deletions internal/workflow/receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities"
"github.com/artefactual-labs/enduro/internal/pipeline"
Expand All @@ -24,10 +25,12 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) {
wts := temporalsdk_testsuite.WorkflowTestSuite{}
env := wts.NewTestWorkflowEnvironment()
m := buildManager(t, gomock.NewController(t))
ctrl := gomock.NewController(t)
colsvc := collectionfake.NewMockService(ctrl)
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})

AsyncCompletionActivityName = uuid.New().String() + "-async-completion"
env.RegisterActivityWithOptions(NewAsyncCompletionActivity(m.Collection).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName})
env.RegisterActivityWithOptions(NewAsyncCompletionActivity(colsvc).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName})

nha_activities.UpdateHARIActivityName = uuid.New().String() + "-update-hary"
env.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName})
Expand Down Expand Up @@ -60,7 +63,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) {
uint(12345),
).Return("ABANDON", nil).Once()

env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, &params)
env.ExecuteWorkflow(NewProcessingWorkflow(m, colsvc, pipelineRegistry, logr.Discard()).sendReceipts, &params)

assert.Equal(t, env.IsWorkflowCompleted(), true)
assert.ErrorContains(t, env.GetWorkflowError(), "error sending hari receipt: user abandoned")
Expand All @@ -71,6 +74,8 @@ func TestSendReceipts(t *testing.T) {
wts := temporalsdk_testsuite.WorkflowTestSuite{}
env := wts.NewTestWorkflowEnvironment()
m := buildManager(t, gomock.NewController(t))
ctrl := gomock.NewController(t)
colsvc := collectionfake.NewMockService(ctrl)
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})

nha_activities.UpdateHARIActivityName = uuid.New().String()
Expand Down Expand Up @@ -111,7 +116,7 @@ func TestSendReceipts(t *testing.T) {
},
).Return(nil).Once()

env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, &params)
env.ExecuteWorkflow(NewProcessingWorkflow(m, colsvc, pipelineRegistry, logr.Discard()).sendReceipts, &params)

assert.Equal(t, env.IsWorkflowCompleted(), true)
assert.NilError(t, env.GetWorkflowError())
Expand Down
7 changes: 2 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,7 @@ func main() {

// Workflow and activity worker.
{
// TODO: this is a temporary workaround for dependency injection until we
// figure out what's the depdencency tree is going to look like after POC.
// The share-everything pattern should be avoided.
m := manager.NewManager(colsvc, config.Hooks)
m := manager.NewManager(config.Hooks)

done := make(chan struct{})
w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{
Expand All @@ -262,7 +259,7 @@ func main() {
os.Exit(1)
}

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, pipelineRegistry, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, colsvc, pipelineRegistry, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName})
w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, pipelineRegistry, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
Expand Down

0 comments on commit f28ee4d

Please sign in to comment.