diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index 6b36dc05db..de28612c54 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -324,10 +324,21 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter logger.Errorf(ctx, "failed to initialize Admin client, err :%s", err.Error()) return nil, err } + + sCfg := storage.GetConfig() + if sCfg == nil { + logger.Errorf(ctx, "Storage configuration missing.") + } + + store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore")) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create Metadata storage") + } + var launchPlanActor launchplan.FlyteAdmin if cfg.EnableAdminLauncher { launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, cfg.DownstreamEval.Duration, - launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher")) + launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher"), store) if err != nil { logger.Errorf(ctx, "failed to create Admin workflow Launcher, err: %v", err.Error()) return nil, err @@ -401,16 +412,6 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter flytek8s.DefaultPodTemplateStore.SetDefaultNamespace(podNamespace) - sCfg := storage.GetConfig() - if sCfg == nil { - logger.Errorf(ctx, "Storage configuration missing.") - } - - store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore")) - if err != nil { - return nil, errors.Wrapf(err, "Failed to create Metadata storage") - } - logger.Info(ctx, "Setting up Catalog client.") catalogClient, err := catalog.NewCatalogClient(ctx, authOpts...) if err != nil { diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go index 0c552560a9..336f095e90 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -19,6 +19,7 @@ import ( stdErr "github.com/flyteorg/flyte/flytestdlib/errors" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/flyteorg/flyte/flytestdlib/storage" ) var isRecovery = true @@ -33,6 +34,7 @@ func IsWorkflowTerminated(p core.WorkflowExecution_Phase) bool { type adminLaunchPlanExecutor struct { adminClient service.AdminServiceClient cache cache.AutoRefresh + store *storage.DataStore } type executionCacheItem struct { @@ -258,7 +260,7 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc continue } - var outputs *core.LiteralMap + var outputs = &core.LiteralMap{} // Retrieve potential outputs only when the workflow succeeded. // TODO: We can optimize further by only retrieving the outputs when the workflow has output variables in the // interface. @@ -266,21 +268,31 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc execData, err := a.adminClient.GetExecutionData(ctx, &admin.WorkflowExecutionGetDataRequest{ Id: &exec.WorkflowExecutionIdentifier, }) + if err != nil || execData.GetFullOutputs() == nil || execData.GetFullOutputs().GetLiterals() == nil { + outputURI := res.GetClosure().GetOutputs().GetUri() + // attempt remote storage read on GetExecutionData failure + if outputURI != "" { + err = a.store.ReadProtobuf(ctx, storage.DataReference(outputURI), outputs) + if err != nil { + logger.Errorf(ctx, "Failed to read outputs from URI [%s] with err: %v", outputURI, err) + } + } + if err != nil { + resp = append(resp, cache.ItemSyncResponse{ + ID: obj.GetID(), + Item: executionCacheItem{ + WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier, + SyncError: err, + }, + Action: cache.Update, + }) + + continue + } - if err != nil { - resp = append(resp, cache.ItemSyncResponse{ - ID: obj.GetID(), - Item: executionCacheItem{ - WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier, - SyncError: err, - }, - Action: cache.Update, - }) - - continue + } else { + outputs = execData.GetFullOutputs() } - - outputs = execData.GetFullOutputs() } // Update the cache with the retrieved status @@ -299,9 +311,10 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc } func NewAdminLaunchPlanExecutor(_ context.Context, client service.AdminServiceClient, - syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope) (FlyteAdmin, error) { + syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope, store *storage.DataStore) (FlyteAdmin, error) { exec := &adminLaunchPlanExecutor{ adminClient: client, + store: store, } rateLimiter := &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(cfg.TPS), cfg.Burst)} diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go index 0af8d1eb16..1f359c1d6a 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go @@ -12,11 +12,16 @@ import ( "google.golang.org/grpc/status" "github.com/flyteorg/flyte/flyteidl/clients/go/admin/mocks" + "github.com/flyteorg/flyte/flyteidl/clients/go/coreutils" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/cache" mocks2 "github.com/flyteorg/flyte/flytestdlib/cache/mocks" + "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" + "github.com/flyteorg/flyte/flytestdlib/storage" + storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks" ) func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { @@ -28,9 +33,12 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { } var result *admin.ExecutionClosure + memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + t.Run("happy", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) mockClient.On("GetExecution", ctx, @@ -42,25 +50,6 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { assert.Equal(t, result, s) }) - t.Run("terminal-sync", func(t *testing.T) { - mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope()) - assert.NoError(t, err) - iwMock := &mocks2.ItemWrapper{} - i := executionCacheItem{ExecutionClosure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED, WorkflowId: &core.Identifier{Project: "p"}}} - iwMock.OnGetItem().Return(i) - iwMock.OnGetID().Return("id") - adminExec := exec.(*adminLaunchPlanExecutor) - v, err := adminExec.syncItem(ctx, cache.Batch{ - iwMock, - }) - assert.NoError(t, err) - assert.NotNil(t, v) - assert.Len(t, v, 1) - assert.Equal(t, v[0].ID, "id") - assert.Equal(t, v[0].Item, i) - }) - t.Run("notFound", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} @@ -76,7 +65,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }), ).Return(nil, status.Error(codes.NotFound, "")) - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) assert.NoError(t, exec.Initialize(ctx)) @@ -122,7 +111,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }), ).Return(nil, status.Error(codes.Canceled, "")) - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) assert.NoError(t, exec.Initialize(ctx)) @@ -162,11 +151,13 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { Domain: "d", Project: "p", } + memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) t.Run("happy", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("CreateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { @@ -204,7 +195,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { Name: "orig", }, } - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("RecoverExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionRecoverRequest) bool { @@ -240,7 +231,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { Name: "orig", }, } - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) recoveryErr := status.Error(codes.NotFound, "foo") @@ -282,7 +273,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { t.Run("notFound", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("CreateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { return true }), @@ -310,7 +301,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { t.Run("other", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("CreateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { return true }), @@ -343,12 +334,14 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { Domain: "d", Project: "p", } + memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) const reason = "reason" t.Run("happy", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("TerminateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }), @@ -361,7 +354,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { t.Run("notFound", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("TerminateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }), @@ -374,7 +367,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { t.Run("other", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) mockClient.On("TerminateExecution", ctx, mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }), @@ -395,10 +388,12 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) { Project: "p", Version: "v", } + memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) t.Run("launch plan found", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) mockClient.OnGetLaunchPlanMatch( ctx, @@ -411,7 +406,7 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) { t.Run("launch plan not found", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore) assert.NoError(t, err) mockClient.OnGetLaunchPlanMatch( ctx, @@ -423,6 +418,165 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) { }) } +type test struct { + name string + cacheItem executionCacheItem + getExecutionResp *admin.Execution + getExecutionError error + getExecutionDataResp *admin.WorkflowExecutionGetDataResponse + getExecutionDataError error + storageReadError error + expectSuccess bool + expectError bool + expectedOutputs *core.LiteralMap + expectedErrorContains string +} + +func TestAdminLaunchPlanExecutorScenarios(t *testing.T) { + ctx := context.TODO() + + mockExecutionRespWithOutputs := &admin.Execution{ + Closure: &admin.ExecutionClosure{ + Phase: core.WorkflowExecution_SUCCEEDED, + OutputResult: &admin.ExecutionClosure_Outputs{ + Outputs: &admin.LiteralMapBlob{ + Data: &admin.LiteralMapBlob_Uri{ + Uri: "s3://foo/bar", + }, + }, + }, + }, + } + mockExecutionRespWithoutOutputs := &admin.Execution{ + Closure: &admin.ExecutionClosure{ + Phase: core.WorkflowExecution_SUCCEEDED, + }, + } + outputLiteral := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo-1": coreutils.MustMakeLiteral("foo-value-1"), + }, + } + + tests := []test{ + { + name: "terminal-sync", + expectSuccess: true, + cacheItem: executionCacheItem{ + ExecutionClosure: &admin.ExecutionClosure{ + Phase: core.WorkflowExecution_SUCCEEDED, + WorkflowId: &core.Identifier{Project: "p"}}, + ExecutionOutputs: outputLiteral, + }, + expectedOutputs: outputLiteral, + }, + { + name: "GetExecution-fails", + expectError: true, + cacheItem: executionCacheItem{}, + getExecutionError: status.Error(codes.NotFound, ""), + expectedErrorContains: RemoteErrorNotFound, + }, + { + name: "GetExecution-fails-system", + expectError: true, + cacheItem: executionCacheItem{}, + getExecutionError: status.Error(codes.Internal, ""), + expectedErrorContains: RemoteErrorSystem, + }, + { + name: "GetExecutionData-succeeds", + expectSuccess: true, + cacheItem: executionCacheItem{}, + expectedOutputs: outputLiteral, + getExecutionDataResp: &admin.WorkflowExecutionGetDataResponse{ + FullOutputs: outputLiteral, + }, + getExecutionDataError: nil, + getExecutionResp: mockExecutionRespWithOutputs, + }, + { + name: "GetExecutionData-error-no-retry", + expectError: true, + cacheItem: executionCacheItem{}, + getExecutionDataError: status.Error(codes.NotFound, ""), + expectedErrorContains: codes.NotFound.String(), + getExecutionResp: mockExecutionRespWithoutOutputs, + }, + { + name: "GetExecutionData-error-retry-fails", + expectError: true, + cacheItem: executionCacheItem{}, + getExecutionDataError: status.Error(codes.NotFound, ""), + storageReadError: status.Error(codes.Internal, ""), + expectedErrorContains: codes.Internal.String(), + getExecutionResp: mockExecutionRespWithOutputs, + }, + { + name: "GetExecutionData-empty-retry-fails", + expectError: true, + cacheItem: executionCacheItem{}, + getExecutionDataResp: &admin.WorkflowExecutionGetDataResponse{ + FullOutputs: &core.LiteralMap{}, + }, + getExecutionDataError: nil, + storageReadError: status.Error(codes.Internal, ""), + expectedErrorContains: codes.Internal.String(), + getExecutionResp: mockExecutionRespWithOutputs, + }, + { + name: "GetExecutionData-empty-retry-succeeds", + expectSuccess: true, + cacheItem: executionCacheItem{}, + getExecutionDataResp: &admin.WorkflowExecutionGetDataResponse{ + FullOutputs: &core.LiteralMap{}, + }, + getExecutionDataError: nil, + expectedOutputs: &core.LiteralMap{}, + getExecutionResp: mockExecutionRespWithOutputs, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mockClient := &mocks.AdminServiceClient{} + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.On("ReadProtobuf", mock.Anything, mock.Anything, mock.Anything).Return(tc.storageReadError) + storageClient := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient) + assert.NoError(t, err) + + iwMock := &mocks2.ItemWrapper{} + i := tc.cacheItem + iwMock.OnGetItem().Return(i) + iwMock.OnGetID().Return("id") + + mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(tc.getExecutionResp, tc.getExecutionError) + mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(tc.getExecutionDataResp, tc.getExecutionDataError) + + adminExec := exec.(*adminLaunchPlanExecutor) + + v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) + assert.NoError(t, err) + assert.Len(t, v, 1) + item, ok := v[0].Item.(executionCacheItem) + assert.True(t, ok) + + if tc.expectSuccess { + assert.Nil(t, item.SyncError) + assert.Equal(t, tc.expectedOutputs, item.ExecutionOutputs) + } + if tc.expectError { + assert.NotNil(t, item.SyncError) + assert.Contains(t, item.SyncError.Error(), tc.expectedErrorContains) + } + }) + } +} + func TestIsWorkflowTerminated(t *testing.T) { assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_SUCCEEDED)) assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_ABORTED)) @@ -434,3 +588,7 @@ func TestIsWorkflowTerminated(t *testing.T) { assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_QUEUED)) assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_UNDEFINED)) } + +func init() { + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) +}