From 69044be5fd34f8e59617810607fb5bb3f71ac561 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Thu, 11 Jan 2024 22:32:04 -0800 Subject: [PATCH] attempt remote storage output retrieval if fetching subworkflow execution data fails Signed-off-by: Paul Dittamo --- flytepropeller/pkg/controller/controller.go | 23 +++++----- .../nodes/subworkflow/launchplan/admin.go | 41 +++++++++++------ .../subworkflow/launchplan/admin_test.go | 44 +++++++++++++------ 3 files changed, 69 insertions(+), 39 deletions(-) diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index 6b36dc05db..de28612c54 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -324,10 +324,21 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter logger.Errorf(ctx, "failed to initialize Admin client, err :%s", err.Error()) return nil, err } + + sCfg := storage.GetConfig() + if sCfg == nil { + logger.Errorf(ctx, "Storage configuration missing.") + } + + store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore")) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create Metadata storage") + } + var launchPlanActor launchplan.FlyteAdmin if cfg.EnableAdminLauncher { launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, cfg.DownstreamEval.Duration, - launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher")) + launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher"), store) if err != nil { logger.Errorf(ctx, "failed to create Admin workflow Launcher, err: %v", err.Error()) return nil, err @@ -401,16 +412,6 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter flytek8s.DefaultPodTemplateStore.SetDefaultNamespace(podNamespace) - sCfg := storage.GetConfig() - if sCfg == nil { - logger.Errorf(ctx, "Storage configuration missing.") - } - - store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore")) - if err != nil { - return nil, errors.Wrapf(err, "Failed to create Metadata storage") - } - logger.Info(ctx, "Setting up Catalog client.") catalogClient, err := catalog.NewCatalogClient(ctx, authOpts...) if err != nil { diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go index 0c552560a9..b6fc03b8c9 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -19,6 +19,7 @@ import ( stdErr "github.com/flyteorg/flyte/flytestdlib/errors" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/flyteorg/flyte/flytestdlib/storage" ) var isRecovery = true @@ -33,6 +34,7 @@ func IsWorkflowTerminated(p core.WorkflowExecution_Phase) bool { type adminLaunchPlanExecutor struct { adminClient service.AdminServiceClient cache cache.AutoRefresh + store *storage.DataStore } type executionCacheItem struct { @@ -258,7 +260,7 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc continue } - var outputs *core.LiteralMap + var outputs = &core.LiteralMap{} // Retrieve potential outputs only when the workflow succeeded. // TODO: We can optimize further by only retrieving the outputs when the workflow has output variables in the // interface. @@ -266,21 +268,31 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc execData, err := a.adminClient.GetExecutionData(ctx, &admin.WorkflowExecutionGetDataRequest{ Id: &exec.WorkflowExecutionIdentifier, }) - if err != nil { - resp = append(resp, cache.ItemSyncResponse{ - ID: obj.GetID(), - Item: executionCacheItem{ - WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier, - SyncError: err, - }, - Action: cache.Update, - }) + outputUri := res.GetClosure().GetOutputs().GetUri() + // attempt remote storage read on GetExecutionData failure + if outputUri != "" { + err = a.store.ReadProtobuf(ctx, storage.DataReference(outputUri), outputs) + if err != nil { + logger.Errorf(ctx, "Failed to read outputs from URI [%s] with err: %v", outputUri, err) + } + } + if err != nil { + resp = append(resp, cache.ItemSyncResponse{ + ID: obj.GetID(), + Item: executionCacheItem{ + WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier, + SyncError: err, + }, + Action: cache.Update, + }) + + continue + } - continue + } else { + outputs = execData.GetFullOutputs() } - - outputs = execData.GetFullOutputs() } // Update the cache with the retrieved status @@ -299,9 +311,10 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc } func NewAdminLaunchPlanExecutor(_ context.Context, client service.AdminServiceClient, - syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope) (FlyteAdmin, error) { + syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope, store *storage.DataStore) (FlyteAdmin, error) { exec := &adminLaunchPlanExecutor{ adminClient: client, + store: store, } rateLimiter := &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(cfg.TPS), cfg.Burst)} diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go index 0af8d1eb16..e21013b3cd 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go @@ -16,7 +16,10 @@ import ( "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/cache" mocks2 "github.com/flyteorg/flyte/flytestdlib/cache/mocks" + "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" + "github.com/flyteorg/flyte/flytestdlib/storage" ) func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { @@ -28,9 +31,12 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { } var result *admin.ExecutionClosure + memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + t.Run("happy", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) mockClient.On("GetExecution", ctx, @@ -44,7 +50,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { t.Run("terminal-sync", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) iwMock := &mocks2.ItemWrapper{} i := executionCacheItem{ExecutionClosure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED, WorkflowId: &core.Identifier{Project: "p"}}} @@ -76,7 +82,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }), ).Return(nil, status.Error(codes.NotFound, "")) - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) assert.NoError(t, exec.Initialize(ctx)) @@ -122,7 +128,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }), ).Return(nil, status.Error(codes.Canceled, "")) - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) assert.NoError(t, exec.Initialize(ctx)) @@ -162,11 +168,13 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { Domain: "d", Project: "p", } + memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) t.Run("happy", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("CreateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { @@ -204,7 +212,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { Name: "orig", }, } - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("RecoverExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionRecoverRequest) bool { @@ -240,7 +248,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { Name: "orig", }, } - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) recoveryErr := status.Error(codes.NotFound, "foo") @@ -282,7 +290,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { t.Run("notFound", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("CreateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { return true }), @@ -310,7 +318,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { t.Run("other", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("CreateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { return true }), @@ -343,12 +351,14 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { Domain: "d", Project: "p", } + memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) const reason = "reason" t.Run("happy", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("TerminateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }), @@ -361,7 +371,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { t.Run("notFound", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("TerminateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }), @@ -374,7 +384,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { t.Run("other", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("TerminateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }), @@ -395,10 +405,12 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) { Project: "p", Version: "v", } + memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) t.Run("launch plan found", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) mockClient.OnGetLaunchPlanMatch( ctx, @@ -411,7 +423,7 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) { t.Run("launch plan not found", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) mockClient.OnGetLaunchPlanMatch( ctx, @@ -434,3 +446,7 @@ func TestIsWorkflowTerminated(t *testing.T) { assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_QUEUED)) assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_UNDEFINED)) } + +func init() { + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) +}