Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove collection service from manager #582

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions internal/nha/activities/hari_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ import (
"time"

temporalsdk_temporal "go.temporal.io/sdk/temporal"
"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"
"gotest.tools/v3/fs"

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)
Expand Down Expand Up @@ -497,14 +495,11 @@ func testError(t *testing.T, err error, wantErr string, wantNonRetryable bool) {
func createHariActivity(t *testing.T, hariConfig map[string]interface{}) *UpdateHARIActivity {
t.Helper()

ctrl := gomock.NewController(t)

hooks := map[string]map[string]interface{}{
"hari": hariConfig,
}

manager := manager.NewManager(
collectionfake.NewMockService(ctrl),
hooks,
)

Expand Down
5 changes: 0 additions & 5 deletions internal/nha/activities/prod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"testing"
"time"

"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"
"gotest.tools/v3/fs"

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)
Expand Down Expand Up @@ -157,10 +155,7 @@ func TestProdActivity(t *testing.T) {
func createProdActivity(t *testing.T, hookConfig map[string]interface{}) *UpdateProductionSystemActivity {
t.Helper()

ctrl := gomock.NewController(t)

manager := manager.NewManager(
collectionfake.NewMockService(ctrl),
map[string]map[string]interface{}{
"prod": hookConfig,
},
Expand Down
10 changes: 3 additions & 7 deletions internal/workflow/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,17 @@ package manager
import (
"fmt"
"strings"

"github.com/artefactual-labs/enduro/internal/collection"
)

// Manager carries workflow and activity dependencies.
type Manager struct {
Collection collection.Service
Hooks map[string]map[string]interface{}
Hooks map[string]map[string]interface{}
}

// NewManager returns a pointer to a new Manager.
func NewManager(colsvc collection.Service, hooks map[string]map[string]interface{}) *Manager {
func NewManager(hooks map[string]map[string]interface{}) *Manager {
return &Manager{
Collection: colsvc,
Hooks: hooks,
Hooks: hooks,
}
}

Expand Down
21 changes: 11 additions & 10 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@

type ProcessingWorkflow struct {
manager *manager.Manager
colsvc collection.Service
pipelineRegistry *pipeline.Registry
logger logr.Logger
}

func NewProcessingWorkflow(m *manager.Manager, pipelineRegistry *pipeline.Registry, l logr.Logger) *ProcessingWorkflow {
return &ProcessingWorkflow{manager: m, pipelineRegistry: pipelineRegistry, logger: l}
func NewProcessingWorkflow(m *manager.Manager, colsvc collection.Service, pipelineRegistry *pipeline.Registry, l logr.Logger) *ProcessingWorkflow {
return &ProcessingWorkflow{manager: m, colsvc: colsvc, pipelineRegistry: pipelineRegistry, logger: l}
}

// TransferInfo is shared state that is passed down to activities. It can be
Expand Down Expand Up @@ -201,13 +202,13 @@
var err error

if req.CollectionID == 0 {
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.logger, w.manager.Collection, &createPackageLocalActivityParams{
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.logger, w.colsvc, &createPackageLocalActivityParams{
Key: req.Key,
Status: status,
}).Get(activityOpts, &tinfo.CollectionID)
} else {
// TODO: investigate better way to reset the collection.
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{
err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{

Check warning on line 211 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L211

Added line #L211 was not covered by tests
CollectionID: req.CollectionID,
Key: req.Key,
PipelineID: "",
Expand All @@ -234,7 +235,7 @@
// Use disconnected context so it also runs after cancellation.
dctx, _ := temporalsdk_workflow.NewDisconnectedContext(ctx)
activityOpts := withLocalActivityOpts(dctx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{
CollectionID: tinfo.CollectionID,
Key: tinfo.Key,
PipelineID: tinfo.PipelineID,
Expand All @@ -250,7 +251,7 @@
if req.RejectDuplicates {
var exists bool
activityOpts := withLocalActivityOpts(ctx)
err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, checkDuplicatePackageLocalActivity, w.logger, w.manager.Collection, tinfo.CollectionID).Get(activityOpts, &exists)
err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, checkDuplicatePackageLocalActivity, w.logger, w.colsvc, tinfo.CollectionID).Get(activityOpts, &exists)

Check warning on line 254 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L254

Added line #L254 was not covered by tests
if err != nil {
return fmt.Errorf("error checking duplicate: %v", err)
}
Expand All @@ -274,7 +275,7 @@

if nameInfo.Identifier != "" {
activityOpts = withLocalActivityOpts(ctx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, setOriginalIDLocalActivity, w.manager.Collection, tinfo.CollectionID, nameInfo.Identifier).Get(activityOpts, nil)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, setOriginalIDLocalActivity, w.colsvc, tinfo.CollectionID, nameInfo.Identifier).Get(activityOpts, nil)

Check warning on line 278 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L278

Added line #L278 was not covered by tests
}
}

Expand Down Expand Up @@ -432,7 +433,7 @@
{
var acquired bool
var err error
acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.pipelineRegistry, tinfo.PipelineName, tinfo.CollectionID)
acquired, release, err = acquirePipeline(sessCtx, w.colsvc, w.pipelineRegistry, tinfo.PipelineName, tinfo.CollectionID)

Check warning on line 436 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L436

Added line #L436 was not covered by tests
if acquired {
defer func() {
_ = release(sessCtx)
Expand Down Expand Up @@ -596,7 +597,7 @@
// Persist TransferID + PipelineID.
{
activityOpts := withLocalActivityOpts(sessCtx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{

Check warning on line 600 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L600

Added line #L600 was not covered by tests
CollectionID: tinfo.CollectionID,
Key: tinfo.Key,
Status: collection.StatusInProgress,
Expand All @@ -622,7 +623,7 @@
// Persist SIPID.
{
activityOpts := withLocalActivityOpts(sessCtx)
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.manager.Collection, &updatePackageLocalActivityParams{
_ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.colsvc, &updatePackageLocalActivityParams{

Check warning on line 626 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L626

Added line #L626 was not covered by tests
CollectionID: tinfo.CollectionID,
Key: tinfo.Key,
TransferID: tinfo.TransferID,
Expand Down
6 changes: 3 additions & 3 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ type ProcessingWorkflowTestSuite struct {
}

func (s *ProcessingWorkflowTestSuite) SetupTest() {
ctrl := gomock.NewController(s.T())
s.env = s.NewTestWorkflowEnvironment()
s.manager = buildManager(s.T(), gomock.NewController(s.T()))
s.manager = buildManager(s.T(), ctrl)
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})
s.workflow = NewProcessingWorkflow(s.manager, pipelineRegistry, logr.Discard())
s.workflow = NewProcessingWorkflow(s.manager, collectionfake.NewMockService(ctrl), pipelineRegistry, logr.Discard())
}

func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand Down Expand Up @@ -124,7 +125,6 @@ func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager {
t.Helper()

return manager.NewManager(
collectionfake.NewMockService(ctrl),
map[string]map[string]interface{}{
"prod": {"disabled": "false"},
"hari": {"disabled": "false"},
Expand Down
4 changes: 2 additions & 2 deletions internal/workflow/receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (w *ProcessingWorkflow) sendReceipts(ctx temporalsdk_workflow.Context, para
MaximumAttempts: 1,
},
}
err := executeActivityWithAsyncErrorHandling(ctx, w.manager.Collection, params.CollectionID, opts, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{
err := executeActivityWithAsyncErrorHandling(ctx, w.colsvc, params.CollectionID, opts, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{
SIPID: params.SIPID,
StoredAt: params.StoredAt,
FullPath: params.FullPath,
Expand All @@ -48,7 +48,7 @@ func (w *ProcessingWorkflow) sendReceipts(ctx temporalsdk_workflow.Context, para
MaximumAttempts: 1,
},
}
err := executeActivityWithAsyncErrorHandling(ctx, w.manager.Collection, params.CollectionID, opts, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{
err := executeActivityWithAsyncErrorHandling(ctx, w.colsvc, params.CollectionID, opts, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{
StoredAt: params.StoredAt,
PipelineName: params.PipelineName,
NameInfo: params.NameInfo,
Expand Down
11 changes: 8 additions & 3 deletions internal/workflow/receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities"
"github.com/artefactual-labs/enduro/internal/pipeline"
Expand All @@ -24,10 +25,12 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) {
wts := temporalsdk_testsuite.WorkflowTestSuite{}
env := wts.NewTestWorkflowEnvironment()
m := buildManager(t, gomock.NewController(t))
ctrl := gomock.NewController(t)
colsvc := collectionfake.NewMockService(ctrl)
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})

AsyncCompletionActivityName = uuid.New().String() + "-async-completion"
env.RegisterActivityWithOptions(NewAsyncCompletionActivity(m.Collection).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName})
env.RegisterActivityWithOptions(NewAsyncCompletionActivity(colsvc).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName})

nha_activities.UpdateHARIActivityName = uuid.New().String() + "-update-hary"
env.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName})
Expand Down Expand Up @@ -60,7 +63,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) {
uint(12345),
).Return("ABANDON", nil).Once()

env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, &params)
env.ExecuteWorkflow(NewProcessingWorkflow(m, colsvc, pipelineRegistry, logr.Discard()).sendReceipts, &params)

assert.Equal(t, env.IsWorkflowCompleted(), true)
assert.ErrorContains(t, env.GetWorkflowError(), "error sending hari receipt: user abandoned")
Expand All @@ -71,6 +74,8 @@ func TestSendReceipts(t *testing.T) {
wts := temporalsdk_testsuite.WorkflowTestSuite{}
env := wts.NewTestWorkflowEnvironment()
m := buildManager(t, gomock.NewController(t))
ctrl := gomock.NewController(t)
colsvc := collectionfake.NewMockService(ctrl)
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})

nha_activities.UpdateHARIActivityName = uuid.New().String()
Expand Down Expand Up @@ -111,7 +116,7 @@ func TestSendReceipts(t *testing.T) {
},
).Return(nil).Once()

env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, &params)
env.ExecuteWorkflow(NewProcessingWorkflow(m, colsvc, pipelineRegistry, logr.Discard()).sendReceipts, &params)

assert.Equal(t, env.IsWorkflowCompleted(), true)
assert.NilError(t, env.GetWorkflowError())
Expand Down
7 changes: 2 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,7 @@ func main() {

// Workflow and activity worker.
{
// TODO: this is a temporary workaround for dependency injection until we
// figure out what's the depdencency tree is going to look like after POC.
// The share-everything pattern should be avoided.
m := manager.NewManager(colsvc, config.Hooks)
m := manager.NewManager(config.Hooks)

done := make(chan struct{})
w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{
Expand All @@ -262,7 +259,7 @@ func main() {
os.Exit(1)
}

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, pipelineRegistry, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, colsvc, pipelineRegistry, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName})
w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, pipelineRegistry, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
Expand Down
Loading