From 2fdb802c688d2e87b072b5ab8bf52769277192ab Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 7 Oct 2024 14:37:42 +0800 Subject: [PATCH] Use Yicheng-Lu-llll's thought Signed-off-by: Future-Outlier Co-authored-by: Yicheng-Lu-llll --- flyteadmin/dataproxy/service.go | 12 +++ .../transformers/node_execution.go | 1 + .../pkg/controller/nodes/task/handler.go | 96 +++++++++++++------ 3 files changed, 80 insertions(+), 29 deletions(-) diff --git a/flyteadmin/dataproxy/service.go b/flyteadmin/dataproxy/service.go index d61998835f..c6b098fca3 100644 --- a/flyteadmin/dataproxy/service.go +++ b/flyteadmin/dataproxy/service.go @@ -181,6 +181,18 @@ func (s Service) CreateDownloadLink(ctx context.Context, req *service.CreateDown return nil, errors.NewFlyteAdminErrorf(codes.Internal, "no deckUrl found for request [%+v]", req) } + // Check if the native url exists + metadata, err := s.dataStore.Head(ctx, storage.DataReference(nativeURL)) + if err != nil { + return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Failed to check the existence of the URL [%s]. Error: %v", nativeURL, err) + } + if !metadata.Exists() { + return nil, errors.NewFlyteAdminErrorf( + codes.NotFound, + "URL [%s] does not exist yet. Please try again later. If you are using the real-time deck, this could be because the 'persist' function has not been called yet.", + nativeURL) + } + signedURLResp, err := s.dataStore.CreateSignedURL(ctx, storage.DataReference(nativeURL), storage.SignedURLProperties{ Scope: stow.ClientMethodGet, ExpiresIn: req.ExpiresIn.AsDuration(), diff --git a/flyteadmin/pkg/repositories/transformers/node_execution.go b/flyteadmin/pkg/repositories/transformers/node_execution.go index 817f53290a..9b1932efeb 100644 --- a/flyteadmin/pkg/repositories/transformers/node_execution.go +++ b/flyteadmin/pkg/repositories/transformers/node_execution.go @@ -42,6 +42,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution "failed to marshal occurredAt into a timestamp proto with error: %v", err) } closure.StartedAt = startedAtProto + closure.DeckUri = request.Event.DeckUri return nil } diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 4ac6576241..0f0825c5e1 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -71,10 +71,10 @@ func getPluginMetricKey(pluginID, taskType string) string { return taskType + "_" + pluginID } -func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) { +func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) { p.ttype = handler.TransitionTypeEphemeral p.pInfo = pluginCore.PhaseInfoSuccess(nil) - p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) + p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) } func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) { @@ -144,10 +144,13 @@ func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInp return ToTaskExecutionEvent(input) } -func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) { - p.execInfo.OutputInfo = &handler.OutputInfo{ - OutputURI: outputPath, - DeckURI: deckPath, +func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) { + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{ + OutputURI: outputPath, + } + } else { + p.execInfo.OutputInfo.OutputURI = outputPath } p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{ @@ -171,7 +174,43 @@ func (p *pluginRequestedTransition) FinalTransition(ctx context.Context) (handle } logger.Debugf(ctx, "Task still running") - return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil + return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil +} + +// AddDeckURI incorporates the deck URI into the plugin execution info regardless of whether the URI exists in remote storage or not. +func (p *pluginRequestedTransition) AddDeckURI(ctx context.Context, tCtx *taskExecutionContext) { + var deckURI *storage.DataReference + deckURIValue := tCtx.ow.GetDeckPath() + deckURI = &deckURIValue + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{ + DeckURI: deckURI, + } + } else { + p.execInfo.OutputInfo.DeckURI = deckURI + } + +} + +// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage. +func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error { + reader := tCtx.ow.GetReader() + if reader == nil && p.execInfo.OutputInfo != nil { + p.execInfo.OutputInfo.DeckURI = nil + return nil + } + + exists, err := reader.DeckExists(ctx) + if err != nil { + logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) + return regErrors.Wrapf(err, "failed to check existence of deck file") + } + + if !exists && p.execInfo.OutputInfo != nil { + p.execInfo.OutputInfo.DeckURI = nil + } + + return nil } // The plugin interface available especially for testing. @@ -463,8 +502,20 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta } } + // Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality. + // The deck should be accessible even if the task is still running or has failed. + // It's possible that the deck URI may not exist in remote storage yet or will never be exist. + // So, it is console's responsibility to handle the case when the deck URI actually does not exist. + pluginTrns.AddDeckURI(ctx, tCtx) + switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: + // This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess). + err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if err != nil { + return pluginTrns, err + } + // ------------------------------------- // TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes // This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute @@ -490,6 +541,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta outputCommitter := ioutils.NewRemoteFileOutputWriter(ctx, tCtx.DataStore(), tCtx.OutputWriter()) ee, err := t.ValidateOutput(ctx, tCtx.NodeID(), tCtx.InputReader(), tCtx.ow.GetReader(), outputCommitter, tCtx.ExecutionContext().GetExecutionConfig(), tCtx.tr) + if err != nil { return nil, err } @@ -500,39 +552,25 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) } else { - var deckURI *storage.DataReference - if tCtx.ow.GetReader() != nil { - exists, err := tCtx.ow.GetReader().DeckExists(ctx) - if err != nil { - logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) - return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file") - } else if exists { - deckURIValue := tCtx.ow.GetDeckPath() - deckURI = &deckURIValue - } - } - pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI, + pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), &event.TaskNodeMetadata{ + CacheStatus: pluginTrns.execInfo.TaskNodeInfo.TaskNodeMetadata.GetCacheStatus(), + CatalogKey: pluginTrns.execInfo.TaskNodeInfo.TaskNodeMetadata.GetCatalogKey(), CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) } case pluginCore.PhaseRetryableFailure: fallthrough case pluginCore.PhasePermanentFailure: + // This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseFailure). + err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if err != nil { + return pluginTrns, err + } pluginTrns.ObservedFailure( &event.TaskNodeMetadata{ CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) - // s3://my-s3-bucket/metadata/propeller/flytesnacks-development-a5sj4d6vs9s8r2ltn6hr/n0/data/0/deck.html - case pluginCore.PhaseRunning: - var deckURI *storage.DataReference - deckURIValue := tCtx.ow.GetDeckPath() - deckURI = &deckURIValue - - pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI, - &event.TaskNodeMetadata{ - CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), - }) } return pluginTrns, nil