Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove pipelineRegistry from manager #583

Merged
merged 1 commit into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions internal/nha/activities/hari_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -506,7 +505,6 @@ func createHariActivity(t *testing.T, hariConfig map[string]interface{}) *Update

manager := manager.NewManager(
collectionfake.NewMockService(ctrl),
&pipeline.Registry{},
hooks,
)

Expand Down
2 changes: 0 additions & 2 deletions internal/nha/activities/prod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -162,7 +161,6 @@ func createProdActivity(t *testing.T, hookConfig map[string]interface{}) *Update

manager := manager.NewManager(
collectionfake.NewMockService(ctrl),
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": hookConfig,
},
Expand Down
12 changes: 7 additions & 5 deletions internal/workflow/activities/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@
"context"
"fmt"

"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/temporal"
"github.com/artefactual-labs/enduro/internal/watcher"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

// DownloadActivity downloads the blob into the pipeline processing directory.
type DownloadActivity struct {
manager *manager.Manager
wsvc watcher.Service
manager *manager.Manager
wsvc watcher.Service
pipelineRegistry *pipeline.Registry
}

func NewDownloadActivity(m *manager.Manager, wsvc watcher.Service) *DownloadActivity {
return &DownloadActivity{manager: m, wsvc: wsvc}
func NewDownloadActivity(m *manager.Manager, pipelineRegistry *pipeline.Registry, wsvc watcher.Service) *DownloadActivity {
return &DownloadActivity{manager: m, pipelineRegistry: pipelineRegistry, wsvc: wsvc}

Check warning on line 21 in internal/workflow/activities/download.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/download.go#L21

Added line #L21 was not covered by tests
}

func (a *DownloadActivity) Execute(ctx context.Context, pipelineName, watcherName, key string) (string, error) {
p, err := a.manager.Pipelines.ByName(pipelineName)
p, err := a.pipelineRegistry.ByName(pipelineName)

Check warning on line 25 in internal/workflow/activities/download.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/download.go#L25

Added line #L25 was not covered by tests
if err != nil {
return "", temporal.NewNonRetryableError(err)
}
Expand Down
10 changes: 5 additions & 5 deletions internal/workflow/activities/hide_package.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
"context"
"fmt"

"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/temporal"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

type HidePackageActivity struct {
manager *manager.Manager
pipelineRegistry *pipeline.Registry
}

func NewHidePackageActivity(m *manager.Manager) *HidePackageActivity {
return &HidePackageActivity{manager: m}
func NewHidePackageActivity(pipelineRegistry *pipeline.Registry) *HidePackageActivity {
return &HidePackageActivity{pipelineRegistry: pipelineRegistry}

Check warning on line 16 in internal/workflow/activities/hide_package.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/hide_package.go#L16

Added line #L16 was not covered by tests
}

func (a *HidePackageActivity) Execute(ctx context.Context, unitID, unitType, pipelineName string) error {
p, err := a.manager.Pipelines.ByName(pipelineName)
p, err := a.pipelineRegistry.ByName(pipelineName)

Check warning on line 20 in internal/workflow/activities/hide_package.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/hide_package.go#L20

Added line #L20 was not covered by tests
if err != nil {
return temporal.NewNonRetryableError(err)
}
Expand Down
10 changes: 5 additions & 5 deletions internal/workflow/activities/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@

"go.artefactual.dev/amclient"

"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/temporal"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

// TransferActivity submits the transfer to Archivematica and returns its ID.
//
// This is our first interaction with Archivematica. The workflow ends here
// after authentication errors.
type TransferActivity struct {
manager *manager.Manager
pipelineRegistry *pipeline.Registry
}

func NewTransferActivity(m *manager.Manager) *TransferActivity {
return &TransferActivity{manager: m}
func NewTransferActivity(pipelineRegistry *pipeline.Registry) *TransferActivity {
return &TransferActivity{pipelineRegistry: pipelineRegistry}

Check warning on line 23 in internal/workflow/activities/transfer.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/transfer.go#L23

Added line #L23 was not covered by tests
}

type TransferActivityParams struct {
Expand All @@ -40,7 +40,7 @@
}

func (a *TransferActivity) Execute(ctx context.Context, params *TransferActivityParams) (*TransferActivityResponse, error) {
p, err := a.manager.Pipelines.ByName(params.PipelineName)
p, err := a.pipelineRegistry.ByName(params.PipelineName)

Check warning on line 43 in internal/workflow/activities/transfer.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/transfer.go#L43

Added line #L43 was not covered by tests
if err != nil {
return nil, temporal.NewNonRetryableError(err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/workflow/local_activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
temporalsdk_activity "go.temporal.io/sdk/activity"

"github.com/artefactual-labs/enduro/internal/collection"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -64,8 +65,8 @@
return colsvc.CheckDuplicate(ctx, id)
}

func loadConfigLocalActivity(ctx context.Context, m *manager.Manager, logger logr.Logger, pipeline string, tinfo *TransferInfo) (*TransferInfo, error) {
p, err := m.Pipelines.ByName(pipeline)
func loadConfigLocalActivity(ctx context.Context, m *manager.Manager, pipelineRegistry *pipeline.Registry, logger logr.Logger, pipeline string, tinfo *TransferInfo) (*TransferInfo, error) {
p, err := pipelineRegistry.ByName(pipeline)

Check warning on line 69 in internal/workflow/local_activities.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/local_activities.go#L69

Added line #L69 was not covered by tests
if err != nil {
logger.Error(err, "Error loading local configuration")
return nil, err
Expand Down
5 changes: 1 addition & 4 deletions internal/workflow/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,18 @@ import (
"strings"

"github.com/artefactual-labs/enduro/internal/collection"
"github.com/artefactual-labs/enduro/internal/pipeline"
)

// Manager carries workflow and activity dependencies.
type Manager struct {
Collection collection.Service
Pipelines *pipeline.Registry
Hooks map[string]map[string]interface{}
}

// NewManager returns a pointer to a new Manager.
func NewManager(colsvc collection.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager {
func NewManager(colsvc collection.Service, hooks map[string]map[string]interface{}) *Manager {
return &Manager{
Collection: colsvc,
Pipelines: pipelines,
Hooks: hooks,
}
}
Expand Down
15 changes: 8 additions & 7 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
)

type ProcessingWorkflow struct {
manager *manager.Manager
logger logr.Logger
manager *manager.Manager
pipelineRegistry *pipeline.Registry
logger logr.Logger
}

func NewProcessingWorkflow(m *manager.Manager, l logr.Logger) *ProcessingWorkflow {
return &ProcessingWorkflow{manager: m, logger: l}
func NewProcessingWorkflow(m *manager.Manager, pipelineRegistry *pipeline.Registry, l logr.Logger) *ProcessingWorkflow {
return &ProcessingWorkflow{manager: m, pipelineRegistry: pipelineRegistry, logger: l}
}

// TransferInfo is shared state that is passed down to activities. It can be
Expand Down Expand Up @@ -284,7 +285,7 @@
if err := temporalsdk_workflow.SideEffect(ctx, func(ctx temporalsdk_workflow.Context) interface{} {
names := req.PipelineNames
if len(names) < 1 {
names = w.manager.Pipelines.Names()
names = w.pipelineRegistry.Names()

Check warning on line 288 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L288

Added line #L288 was not covered by tests
if len(names) < 1 {
return ""
}
Expand All @@ -302,7 +303,7 @@
// Load pipeline configuration and hooks.
{
activityOpts := withLocalActivityWithoutRetriesOpts(ctx)
err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, loadConfigLocalActivity, w.manager, w.logger, tinfo.PipelineName, tinfo).Get(activityOpts, &tinfo)
err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, loadConfigLocalActivity, w.manager, w.pipelineRegistry, w.logger, tinfo.PipelineName, tinfo).Get(activityOpts, &tinfo)
if err != nil {
return fmt.Errorf("error loading configuration: %v", err)
}
Expand Down Expand Up @@ -431,7 +432,7 @@
{
var acquired bool
var err error
acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.manager.Pipelines, tinfo.PipelineName, tinfo.CollectionID)
acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.pipelineRegistry, tinfo.PipelineName, tinfo.CollectionID)

Check warning on line 435 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L435

Added line #L435 was not covered by tests
if acquired {
defer func() {
_ = release(sessCtx)
Expand Down
6 changes: 3 additions & 3 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ type ProcessingWorkflowTestSuite struct {
func (s *ProcessingWorkflowTestSuite) SetupTest() {
s.env = s.NewTestWorkflowEnvironment()
s.manager = buildManager(s.T(), gomock.NewController(s.T()))
s.workflow = NewProcessingWorkflow(s.manager, logr.Discard())
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})
s.workflow = NewProcessingWorkflow(s.manager, pipelineRegistry, logr.Discard())
}

func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand All @@ -52,7 +53,7 @@ func (s *ProcessingWorkflowTestSuite) TestParseErrorIsIgnored() {
s.env.OnActivity(nha_activities.ParseNameLocalActivity, mock.Anything, "key").Return(nil, errors.New("parse error")).Once()

// loadConfig is executed (workflow continued), returning an error.
s.env.OnActivity(loadConfigLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("pipeline is unavailable")).Once()
s.env.OnActivity(loadConfigLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("pipeline is unavailable")).Once()

// Defer updates the package with the error status before returning.
s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, &updatePackageLocalActivityParams{
Expand Down Expand Up @@ -124,7 +125,6 @@ func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager {

return manager.NewManager(
collectionfake.NewMockService(ctrl),
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": {"disabled": "false"},
"hari": {"disabled": "false"},
Expand Down
7 changes: 5 additions & 2 deletions internal/workflow/receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/artefactual-labs/enduro/internal/nha"
nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities"
"github.com/artefactual-labs/enduro/internal/pipeline"
)

// sendReceipts exits immediately after an activity error, ensuring that
Expand All @@ -23,6 +24,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) {
wts := temporalsdk_testsuite.WorkflowTestSuite{}
env := wts.NewTestWorkflowEnvironment()
m := buildManager(t, gomock.NewController(t))
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})

AsyncCompletionActivityName = uuid.New().String() + "-async-completion"
env.RegisterActivityWithOptions(NewAsyncCompletionActivity(m.Collection).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName})
Expand Down Expand Up @@ -58,7 +60,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) {
uint(12345),
).Return("ABANDON", nil).Once()

env.ExecuteWorkflow(NewProcessingWorkflow(m, logr.Discard()).sendReceipts, &params)
env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, &params)

assert.Equal(t, env.IsWorkflowCompleted(), true)
assert.ErrorContains(t, env.GetWorkflowError(), "error sending hari receipt: user abandoned")
Expand All @@ -69,6 +71,7 @@ func TestSendReceipts(t *testing.T) {
wts := temporalsdk_testsuite.WorkflowTestSuite{}
env := wts.NewTestWorkflowEnvironment()
m := buildManager(t, gomock.NewController(t))
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})

nha_activities.UpdateHARIActivityName = uuid.New().String()
env.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName})
Expand Down Expand Up @@ -108,7 +111,7 @@ func TestSendReceipts(t *testing.T) {
},
).Return(nil).Once()

env.ExecuteWorkflow(NewProcessingWorkflow(m, logr.Discard()).sendReceipts, &params)
env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, &params)

assert.Equal(t, env.IsWorkflowCompleted(), true)
assert.NilError(t, env.GetWorkflowError())
Expand Down
10 changes: 5 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func main() {
// TODO: this is a temporary workaround for dependency injection until we
// figure out what's the depdencency tree is going to look like after POC.
// The share-everything pattern should be avoided.
m := manager.NewManager(colsvc, pipelineRegistry, config.Hooks)
m := manager.NewManager(colsvc, config.Hooks)

done := make(chan struct{})
w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{
Expand All @@ -262,16 +262,16 @@ func main() {
os.Exit(1)
}

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, pipelineRegistry, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName})
w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, pipelineRegistry, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
w.RegisterActivityWithOptions(activities.NewValidateTransferActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ValidateTransferActivityName})
w.RegisterActivityWithOptions(activities.NewTransferActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName})
w.RegisterActivityWithOptions(activities.NewTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName})
w.RegisterActivityWithOptions(activities.NewPollTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollTransferActivityName})
w.RegisterActivityWithOptions(activities.NewPollIngestActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollIngestActivityName})
w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName})
w.RegisterActivityWithOptions(activities.NewHidePackageActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName})
w.RegisterActivityWithOptions(activities.NewHidePackageActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName})
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewPopulateMetadataActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PopulateMetadataActivityName})
Expand Down
Loading