Skip to content

Commit

Permalink
Remove watcher from manager
Browse files Browse the repository at this point in the history
  • Loading branch information
camlyall authored and sevein committed Sep 21, 2023
1 parent ff4c1cb commit f25ec6b
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 56 deletions.
2 changes: 0 additions & 2 deletions internal/nha/activities/hari_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/pipeline"
watcherfake "github.com/artefactual-labs/enduro/internal/watcher/fake"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -509,7 +508,6 @@ func createHariActivity(t *testing.T, hariConfig map[string]interface{}) *Update
manager := manager.NewManager(
logr.Discard(),
collectionfake.NewMockService(ctrl),
watcherfake.NewMockService(ctrl),
&pipeline.Registry{},
hooks,
)
Expand Down
2 changes: 0 additions & 2 deletions internal/nha/activities/prod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/pipeline"
watcherfake "github.com/artefactual-labs/enduro/internal/watcher/fake"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -165,7 +164,6 @@ func createProdActivity(t *testing.T, hookConfig map[string]interface{}) *Update
manager := manager.NewManager(
logr.Discard(),
collectionfake.NewMockService(ctrl),
watcherfake.NewMockService(ctrl),
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": hookConfig,
Expand Down
9 changes: 4 additions & 5 deletions internal/workflow/activities/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ import (
"github.com/artefactual-labs/enduro/internal/bundler"
"github.com/artefactual-labs/enduro/internal/temporal"
"github.com/artefactual-labs/enduro/internal/watcher"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

type BundleActivity struct {
manager *manager.Manager
wsvc watcher.Service
}

func NewBundleActivity(m *manager.Manager) *BundleActivity {
return &BundleActivity{manager: m}
func NewBundleActivity(wsvc watcher.Service) *BundleActivity {
return &BundleActivity{wsvc: wsvc}
}

type BundleActivityParams struct {
Expand Down Expand Up @@ -74,7 +73,7 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara
}
} else if params.IsDir {
var w watcher.Watcher
w, err = a.manager.Watcher.ByName(params.WatcherName)
w, err = a.wsvc.ByName(params.WatcherName)
if err == nil {
src := filepath.Join(w.Path(), params.Key)
dst := params.TransferDir
Expand Down
16 changes: 1 addition & 15 deletions internal/workflow/activities/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@ import (
"syscall"
"testing"

"github.com/go-logr/logr"
temporalsdk_testsuite "go.temporal.io/sdk/testsuite"
"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"
"gotest.tools/v3/fs"

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/watcher"
watcherfake "github.com/artefactual-labs/enduro/internal/watcher/fake"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

func TestBundleActivity(t *testing.T) {
Expand All @@ -26,17 +22,7 @@ func TestBundleActivity(t *testing.T) {
t.Run("Excludes hidden files", func(t *testing.T) {
ctrl := gomock.NewController(t)
wsvc := watcherfake.NewMockService(ctrl)
m := manager.NewManager(
logr.Discard(),
collectionfake.NewMockService(ctrl),
wsvc,
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": {"disabled": "false"},
"hari": {"disabled": "false"},
},
)
activity := NewBundleActivity(m)
activity := NewBundleActivity(wsvc)
ts := &temporalsdk_testsuite.WorkflowTestSuite{}
env := ts.NewTestActivityEnvironment()
env.RegisterActivity(activity.Execute)
Expand Down
10 changes: 3 additions & 7 deletions internal/workflow/activities/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ import (
"context"
"fmt"
"os"

"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

// CleanUpActivity removes the contents that we've created in the TS location.
type CleanUpActivity struct {
manager *manager.Manager
}
type CleanUpActivity struct{}

func NewCleanUpActivity(m *manager.Manager) *CleanUpActivity {
return &CleanUpActivity{manager: m}
func NewCleanUpActivity() *CleanUpActivity {
return &CleanUpActivity{}
}

type CleanUpActivityParams struct {
Expand Down
10 changes: 5 additions & 5 deletions internal/workflow/activities/delete_original.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@ import (
"os"
"path/filepath"

"github.com/artefactual-labs/enduro/internal/workflow/manager"
"github.com/artefactual-labs/enduro/internal/watcher"
)

type DeleteOriginalActivity struct {
manager *manager.Manager
wsvc watcher.Service
}

func NewDeleteOriginalActivity(m *manager.Manager) *DeleteOriginalActivity {
return &DeleteOriginalActivity{manager: m}
func NewDeleteOriginalActivity(wsvc watcher.Service) *DeleteOriginalActivity {
return &DeleteOriginalActivity{wsvc: wsvc}
}

func (a *DeleteOriginalActivity) Execute(ctx context.Context, watcherName, batchDir, key string) error {
if batchDir != "" {
return deleteOriginalFromBatch(batchDir, key)
}
return a.manager.Watcher.Delete(ctx, watcherName, key)
return a.wsvc.Delete(ctx, watcherName, key)
}

func deleteOriginalFromBatch(batchDir, key string) error {
Expand Down
10 changes: 5 additions & 5 deletions internal/workflow/activities/dispose_original.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@ import (
"path/filepath"

"github.com/artefactual-labs/enduro/internal/fsutil"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
"github.com/artefactual-labs/enduro/internal/watcher"
)

type DisposeOriginalActivity struct {
manager *manager.Manager
wsvc watcher.Service
}

func NewDisposeOriginalActivity(m *manager.Manager) *DisposeOriginalActivity {
return &DisposeOriginalActivity{manager: m}
func NewDisposeOriginalActivity(wsvc watcher.Service) *DisposeOriginalActivity {
return &DisposeOriginalActivity{wsvc: wsvc}
}

func (a *DisposeOriginalActivity) Execute(ctx context.Context, watcherName, completedDir, batchDir, key string) error {
if batchDir != "" {
return disposeOriginalFromBatch(completedDir, batchDir, key)
}
return a.manager.Watcher.Dispose(ctx, watcherName, key)
return a.wsvc.Dispose(ctx, watcherName, key)
}

func disposeOriginalFromBatch(completedDir, batchDir, key string) error {
Expand Down
8 changes: 5 additions & 3 deletions internal/workflow/activities/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (
"fmt"

"github.com/artefactual-labs/enduro/internal/temporal"
"github.com/artefactual-labs/enduro/internal/watcher"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

// DownloadActivity downloads the blob into the pipeline processing directory.
type DownloadActivity struct {
manager *manager.Manager
wsvc watcher.Service
}

func NewDownloadActivity(m *manager.Manager) *DownloadActivity {
return &DownloadActivity{manager: m}
func NewDownloadActivity(m *manager.Manager, wsvc watcher.Service) *DownloadActivity {
return &DownloadActivity{manager: m, wsvc: wsvc}
}

func (a *DownloadActivity) Execute(ctx context.Context, pipelineName, watcherName, key string) (string, error) {
Expand All @@ -29,7 +31,7 @@ func (a *DownloadActivity) Execute(ctx context.Context, pipelineName, watcherNam
}
defer file.Close()

if err := a.manager.Watcher.Download(ctx, file, watcherName, key); err != nil {
if err := a.wsvc.Download(ctx, file, watcherName, key); err != nil {
return "", temporal.NewNonRetryableError(fmt.Errorf("error downloading blob: %v", err))
}

Expand Down
5 changes: 1 addition & 4 deletions internal/workflow/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,21 @@ import (

"github.com/artefactual-labs/enduro/internal/collection"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/watcher"
)

// Manager carries workflow and activity dependencies.
type Manager struct {
Logger logr.Logger
Collection collection.Service
Watcher watcher.Service
Pipelines *pipeline.Registry
Hooks map[string]map[string]interface{}
}

// NewManager returns a pointer to a new Manager.
func NewManager(logger logr.Logger, colsvc collection.Service, wsvc watcher.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager {
func NewManager(logger logr.Logger, colsvc collection.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager {
return &Manager{
Logger: logger,
Collection: colsvc,
Watcher: wsvc,
Pipelines: pipelines,
Hooks: hooks,
}
Expand Down
2 changes: 0 additions & 2 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities"
"github.com/artefactual-labs/enduro/internal/pipeline"
watcherfake "github.com/artefactual-labs/enduro/internal/watcher/fake"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -126,7 +125,6 @@ func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager {
return manager.NewManager(
logr.Discard(),
collectionfake.NewMockService(ctrl),
watcherfake.NewMockService(ctrl),
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": {"disabled": "false"},
Expand Down
12 changes: 6 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func main() {
// TODO: this is a temporary workaround for dependency injection until we
// figure out what's the depdencency tree is going to look like after POC.
// The share-everything pattern should be avoided.
m := manager.NewManager(logger, colsvc, wsvc, pipelineRegistry, config.Hooks)
m := manager.NewManager(logger, colsvc, pipelineRegistry, config.Hooks)

done := make(chan struct{})
w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{
Expand All @@ -264,16 +264,16 @@ func main() {

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName})
w.RegisterActivityWithOptions(activities.NewDownloadActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewBundleActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
w.RegisterActivityWithOptions(activities.NewValidateTransferActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ValidateTransferActivityName})
w.RegisterActivityWithOptions(activities.NewTransferActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName})
w.RegisterActivityWithOptions(activities.NewPollTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollTransferActivityName})
w.RegisterActivityWithOptions(activities.NewPollIngestActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollIngestActivityName})
w.RegisterActivityWithOptions(activities.NewCleanUpActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName})
w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName})
w.RegisterActivityWithOptions(activities.NewHidePackageActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName})
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewPopulateMetadataActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PopulateMetadataActivityName})

w.RegisterActivityWithOptions(workflow.NewAsyncCompletionActivity(colsvc).Execute, temporalsdk_activity.RegisterOptions{Name: workflow.AsyncCompletionActivityName})
Expand Down

0 comments on commit f25ec6b

Please sign in to comment.