Skip to content

Commit

Permalink
Remove pipelineRegistry from manager
Browse files Browse the repository at this point in the history
  • Loading branch information
camlyall authored and sevein committed Sep 22, 2023
1 parent 5910139 commit 2dc58c5
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 42 deletions.
2 changes: 0 additions & 2 deletions internal/nha/activities/hari_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -506,7 +505,6 @@ func createHariActivity(t *testing.T, hariConfig map[string]interface{}) *Update

manager := manager.NewManager(
collectionfake.NewMockService(ctrl),
&pipeline.Registry{},
hooks,
)

Expand Down
2 changes: 0 additions & 2 deletions internal/nha/activities/prod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -162,7 +161,6 @@ func createProdActivity(t *testing.T, hookConfig map[string]interface{}) *Update

manager := manager.NewManager(
collectionfake.NewMockService(ctrl),
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": hookConfig,
},
Expand Down
12 changes: 7 additions & 5 deletions internal/workflow/activities/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ import (
"context"
"fmt"

"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/temporal"
"github.com/artefactual-labs/enduro/internal/watcher"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

// DownloadActivity downloads the blob into the pipeline processing directory.
type DownloadActivity struct {
manager *manager.Manager
wsvc watcher.Service
manager *manager.Manager
wsvc watcher.Service
pipelineRegistry *pipeline.Registry
}

func NewDownloadActivity(m *manager.Manager, wsvc watcher.Service) *DownloadActivity {
return &DownloadActivity{manager: m, wsvc: wsvc}
func NewDownloadActivity(m *manager.Manager, pipelineRegistry *pipeline.Registry, wsvc watcher.Service) *DownloadActivity {
return &DownloadActivity{manager: m, pipelineRegistry: pipelineRegistry, wsvc: wsvc}
}

func (a *DownloadActivity) Execute(ctx context.Context, pipelineName, watcherName, key string) (string, error) {
p, err := a.manager.Pipelines.ByName(pipelineName)
p, err := a.pipelineRegistry.ByName(pipelineName)
if err != nil {
return "", temporal.NewNonRetryableError(err)
}
Expand Down
10 changes: 5 additions & 5 deletions internal/workflow/activities/hide_package.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ import (
"context"
"fmt"

"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/temporal"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

type HidePackageActivity struct {
manager *manager.Manager
pipelineRegistry *pipeline.Registry
}

func NewHidePackageActivity(m *manager.Manager) *HidePackageActivity {
return &HidePackageActivity{manager: m}
func NewHidePackageActivity(pipelineRegistry *pipeline.Registry) *HidePackageActivity {
return &HidePackageActivity{pipelineRegistry: pipelineRegistry}
}

func (a *HidePackageActivity) Execute(ctx context.Context, unitID, unitType, pipelineName string) error {
p, err := a.manager.Pipelines.ByName(pipelineName)
p, err := a.pipelineRegistry.ByName(pipelineName)
if err != nil {
return temporal.NewNonRetryableError(err)
}
Expand Down
10 changes: 5 additions & 5 deletions internal/workflow/activities/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ import (

"go.artefactual.dev/amclient"

"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/temporal"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

// TransferActivity submits the transfer to Archivematica and returns its ID.
//
// This is our first interaction with Archivematica. The workflow ends here
// after authentication errors.
type TransferActivity struct {
manager *manager.Manager
pipelineRegistry *pipeline.Registry
}

func NewTransferActivity(m *manager.Manager) *TransferActivity {
return &TransferActivity{manager: m}
func NewTransferActivity(pipelineRegistry *pipeline.Registry) *TransferActivity {
return &TransferActivity{pipelineRegistry: pipelineRegistry}
}

type TransferActivityParams struct {
Expand All @@ -40,7 +40,7 @@ type TransferActivityResponse struct {
}

func (a *TransferActivity) Execute(ctx context.Context, params *TransferActivityParams) (*TransferActivityResponse, error) {
p, err := a.manager.Pipelines.ByName(params.PipelineName)
p, err := a.pipelineRegistry.ByName(params.PipelineName)
if err != nil {
return nil, temporal.NewNonRetryableError(err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/workflow/local_activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
temporalsdk_activity "go.temporal.io/sdk/activity"

"github.com/artefactual-labs/enduro/internal/collection"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -64,8 +65,8 @@ func checkDuplicatePackageLocalActivity(ctx context.Context, logger logr.Logger,
return colsvc.CheckDuplicate(ctx, id)
}

func loadConfigLocalActivity(ctx context.Context, m *manager.Manager, logger logr.Logger, pipeline string, tinfo *TransferInfo) (*TransferInfo, error) {
p, err := m.Pipelines.ByName(pipeline)
func loadConfigLocalActivity(ctx context.Context, m *manager.Manager, pipelineRegistry *pipeline.Registry, logger logr.Logger, pipeline string, tinfo *TransferInfo) (*TransferInfo, error) {
p, err := pipelineRegistry.ByName(pipeline)
if err != nil {
logger.Error(err, "Error loading local configuration")
return nil, err
Expand Down
5 changes: 1 addition & 4 deletions internal/workflow/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,18 @@ import (
"strings"

"github.com/artefactual-labs/enduro/internal/collection"
"github.com/artefactual-labs/enduro/internal/pipeline"
)

// Manager carries workflow and activity dependencies.
type Manager struct {
Collection collection.Service
Pipelines *pipeline.Registry
Hooks map[string]map[string]interface{}
}

// NewManager returns a pointer to a new Manager.
func NewManager(colsvc collection.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager {
func NewManager(colsvc collection.Service, hooks map[string]map[string]interface{}) *Manager {
return &Manager{
Collection: colsvc,
Pipelines: pipelines,
Hooks: hooks,
}
}
Expand Down
15 changes: 8 additions & 7 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
)

type ProcessingWorkflow struct {
manager *manager.Manager
logger logr.Logger
manager *manager.Manager
pipelineRegistry *pipeline.Registry
logger logr.Logger
}

func NewProcessingWorkflow(m *manager.Manager, l logr.Logger) *ProcessingWorkflow {
return &ProcessingWorkflow{manager: m, logger: l}
func NewProcessingWorkflow(m *manager.Manager, pipelineRegistry *pipeline.Registry, l logr.Logger) *ProcessingWorkflow {
return &ProcessingWorkflow{manager: m, pipelineRegistry: pipelineRegistry, logger: l}
}

// TransferInfo is shared state that is passed down to activities. It can be
Expand Down Expand Up @@ -284,7 +285,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll
if err := temporalsdk_workflow.SideEffect(ctx, func(ctx temporalsdk_workflow.Context) interface{} {
names := req.PipelineNames
if len(names) < 1 {
names = w.manager.Pipelines.Names()
names = w.pipelineRegistry.Names()
if len(names) < 1 {
return ""
}
Expand All @@ -302,7 +303,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll
// Load pipeline configuration and hooks.
{
activityOpts := withLocalActivityWithoutRetriesOpts(ctx)
err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, loadConfigLocalActivity, w.manager, w.logger, tinfo.PipelineName, tinfo).Get(activityOpts, &tinfo)
err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, loadConfigLocalActivity, w.manager, w.pipelineRegistry, w.logger, tinfo.PipelineName, tinfo).Get(activityOpts, &tinfo)
if err != nil {
return fmt.Errorf("error loading configuration: %v", err)
}
Expand Down Expand Up @@ -431,7 +432,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
{
var acquired bool
var err error
acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.manager.Pipelines, tinfo.PipelineName, tinfo.CollectionID)
acquired, release, err = acquirePipeline(sessCtx, w.manager.Collection, w.pipelineRegistry, tinfo.PipelineName, tinfo.CollectionID)
if acquired {
defer func() {
_ = release(sessCtx)
Expand Down
6 changes: 3 additions & 3 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ type ProcessingWorkflowTestSuite struct {
func (s *ProcessingWorkflowTestSuite) SetupTest() {
s.env = s.NewTestWorkflowEnvironment()
s.manager = buildManager(s.T(), gomock.NewController(s.T()))
s.workflow = NewProcessingWorkflow(s.manager, logr.Discard())
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})
s.workflow = NewProcessingWorkflow(s.manager, pipelineRegistry, logr.Discard())
}

func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand All @@ -52,7 +53,7 @@ func (s *ProcessingWorkflowTestSuite) TestParseErrorIsIgnored() {
s.env.OnActivity(nha_activities.ParseNameLocalActivity, mock.Anything, "key").Return(nil, errors.New("parse error")).Once()

// loadConfig is executed (workflow continued), returning an error.
s.env.OnActivity(loadConfigLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("pipeline is unavailable")).Once()
s.env.OnActivity(loadConfigLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("pipeline is unavailable")).Once()

// Defer updates the package with the error status before returning.
s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, &updatePackageLocalActivityParams{
Expand Down Expand Up @@ -124,7 +125,6 @@ func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager {

return manager.NewManager(
collectionfake.NewMockService(ctrl),
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": {"disabled": "false"},
"hari": {"disabled": "false"},
Expand Down
7 changes: 5 additions & 2 deletions internal/workflow/receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/artefactual-labs/enduro/internal/nha"
nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities"
"github.com/artefactual-labs/enduro/internal/pipeline"
)

// sendReceipts exits immediately after an activity error, ensuring that
Expand All @@ -23,6 +24,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) {
wts := temporalsdk_testsuite.WorkflowTestSuite{}
env := wts.NewTestWorkflowEnvironment()
m := buildManager(t, gomock.NewController(t))
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})

AsyncCompletionActivityName = uuid.New().String() + "-async-completion"
env.RegisterActivityWithOptions(NewAsyncCompletionActivity(m.Collection).Execute, temporalsdk_activity.RegisterOptions{Name: AsyncCompletionActivityName})
Expand Down Expand Up @@ -58,7 +60,7 @@ func TestSendReceiptsSequentialBehavior(t *testing.T) {
uint(12345),
).Return("ABANDON", nil).Once()

env.ExecuteWorkflow(NewProcessingWorkflow(m, logr.Discard()).sendReceipts, &params)
env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, &params)

assert.Equal(t, env.IsWorkflowCompleted(), true)
assert.ErrorContains(t, env.GetWorkflowError(), "error sending hari receipt: user abandoned")
Expand All @@ -69,6 +71,7 @@ func TestSendReceipts(t *testing.T) {
wts := temporalsdk_testsuite.WorkflowTestSuite{}
env := wts.NewTestWorkflowEnvironment()
m := buildManager(t, gomock.NewController(t))
pipelineRegistry, _ := pipeline.NewPipelineRegistry(logr.Discard(), []pipeline.Config{})

nha_activities.UpdateHARIActivityName = uuid.New().String()
env.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName})
Expand Down Expand Up @@ -108,7 +111,7 @@ func TestSendReceipts(t *testing.T) {
},
).Return(nil).Once()

env.ExecuteWorkflow(NewProcessingWorkflow(m, logr.Discard()).sendReceipts, &params)
env.ExecuteWorkflow(NewProcessingWorkflow(m, pipelineRegistry, logr.Discard()).sendReceipts, &params)

assert.Equal(t, env.IsWorkflowCompleted(), true)
assert.NilError(t, env.GetWorkflowError())
Expand Down
10 changes: 5 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func main() {
// TODO: this is a temporary workaround for dependency injection until we
// figure out what's the depdencency tree is going to look like after POC.
// The share-everything pattern should be avoided.
m := manager.NewManager(colsvc, pipelineRegistry, config.Hooks)
m := manager.NewManager(colsvc, config.Hooks)

done := make(chan struct{})
w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{
Expand All @@ -262,16 +262,16 @@ func main() {
os.Exit(1)
}

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m, pipelineRegistry, logger).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName})
w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, pipelineRegistry, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
w.RegisterActivityWithOptions(activities.NewValidateTransferActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ValidateTransferActivityName})
w.RegisterActivityWithOptions(activities.NewTransferActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName})
w.RegisterActivityWithOptions(activities.NewTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName})
w.RegisterActivityWithOptions(activities.NewPollTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollTransferActivityName})
w.RegisterActivityWithOptions(activities.NewPollIngestActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollIngestActivityName})
w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName})
w.RegisterActivityWithOptions(activities.NewHidePackageActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName})
w.RegisterActivityWithOptions(activities.NewHidePackageActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName})
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewPopulateMetadataActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PopulateMetadataActivityName})
Expand Down

0 comments on commit 2dc58c5

Please sign in to comment.