Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Retry fetching subworkflow output data on failure #4602

Merged
merged 18 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,21 @@
logger.Errorf(ctx, "failed to initialize Admin client, err :%s", err.Error())
return nil, err
}

sCfg := storage.GetConfig()
if sCfg == nil {
logger.Errorf(ctx, "Storage configuration missing.")
}

Check warning on line 331 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L328-L331

Added lines #L328 - L331 were not covered by tests

store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore"))
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Metadata storage")
}

Check warning on line 336 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L333-L336

Added lines #L333 - L336 were not covered by tests

var launchPlanActor launchplan.FlyteAdmin
if cfg.EnableAdminLauncher {
launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, cfg.DownstreamEval.Duration,
launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher"))
launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher"), store)

Check warning on line 341 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L341

Added line #L341 was not covered by tests
if err != nil {
logger.Errorf(ctx, "failed to create Admin workflow Launcher, err: %v", err.Error())
return nil, err
Expand Down Expand Up @@ -401,16 +412,6 @@

flytek8s.DefaultPodTemplateStore.SetDefaultNamespace(podNamespace)

sCfg := storage.GetConfig()
if sCfg == nil {
logger.Errorf(ctx, "Storage configuration missing.")
}

store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore"))
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Metadata storage")
}

logger.Info(ctx, "Setting up Catalog client.")
catalogClient, err := catalog.NewCatalogClient(ctx, authOpts...)
if err != nil {
Expand Down
43 changes: 28 additions & 15 deletions flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
stdErr "github.com/flyteorg/flyte/flytestdlib/errors"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

var isRecovery = true
Expand All @@ -33,6 +34,7 @@ func IsWorkflowTerminated(p core.WorkflowExecution_Phase) bool {
type adminLaunchPlanExecutor struct {
adminClient service.AdminServiceClient
cache cache.AutoRefresh
store *storage.DataStore
}

type executionCacheItem struct {
Expand Down Expand Up @@ -258,29 +260,39 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
continue
}

var outputs *core.LiteralMap
var outputs = &core.LiteralMap{}
// Retrieve potential outputs only when the workflow succeeded.
// TODO: We can optimize further by only retrieving the outputs when the workflow has output variables in the
// interface.
if res.GetClosure().GetPhase() == core.WorkflowExecution_SUCCEEDED {
execData, err := a.adminClient.GetExecutionData(ctx, &admin.WorkflowExecutionGetDataRequest{
Id: &exec.WorkflowExecutionIdentifier,
})
if err != nil || execData.GetFullOutputs() == nil || execData.GetFullOutputs().GetLiterals() == nil {
outputURI := res.GetClosure().GetOutputs().GetUri()
// attempt remote storage read on GetExecutionData failure
if outputURI != "" {
err = a.store.ReadProtobuf(ctx, storage.DataReference(outputURI), outputs)
if err != nil {
logger.Errorf(ctx, "Failed to read outputs from URI [%s] with err: %v", outputURI, err)
}
}
if err != nil {
resp = append(resp, cache.ItemSyncResponse{
ID: obj.GetID(),
Item: executionCacheItem{
WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier,
SyncError: err,
},
Action: cache.Update,
})

continue
}

if err != nil {
resp = append(resp, cache.ItemSyncResponse{
ID: obj.GetID(),
Item: executionCacheItem{
WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier,
SyncError: err,
},
Action: cache.Update,
})

continue
} else {
outputs = execData.GetFullOutputs()
}

outputs = execData.GetFullOutputs()
}

// Update the cache with the retrieved status
Expand All @@ -299,9 +311,10 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
}

func NewAdminLaunchPlanExecutor(_ context.Context, client service.AdminServiceClient,
syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope) (FlyteAdmin, error) {
syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope, store *storage.DataStore) (FlyteAdmin, error) {
exec := &adminLaunchPlanExecutor{
adminClient: client,
store: store,
}

rateLimiter := &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(cfg.TPS), cfg.Burst)}
Expand Down
Loading
Loading