Skip to content

Commit

Permalink
Use Yicheng-Lu-llll's thought
Browse files Browse the repository at this point in the history
Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: Yicheng-Lu-llll <[email protected]>
  • Loading branch information
Future-Outlier and Yicheng-Lu-llll committed Oct 7, 2024
1 parent 0f3190f commit 2fdb802
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 29 deletions.
12 changes: 12 additions & 0 deletions flyteadmin/dataproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ func (s Service) CreateDownloadLink(ctx context.Context, req *service.CreateDown
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "no deckUrl found for request [%+v]", req)
}

// Check if the native url exists
metadata, err := s.dataStore.Head(ctx, storage.DataReference(nativeURL))
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Failed to check the existence of the URL [%s]. Error: %v", nativeURL, err)
}
if !metadata.Exists() {
return nil, errors.NewFlyteAdminErrorf(
codes.NotFound,
"URL [%s] does not exist yet. Please try again later. If you are using the real-time deck, this could be because the 'persist' function has not been called yet.",
nativeURL)
}

signedURLResp, err := s.dataStore.CreateSignedURL(ctx, storage.DataReference(nativeURL), storage.SignedURLProperties{
Scope: stow.ClientMethodGet,
ExpiresIn: req.ExpiresIn.AsDuration(),
Expand Down
1 change: 1 addition & 0 deletions flyteadmin/pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution
"failed to marshal occurredAt into a timestamp proto with error: %v", err)
}
closure.StartedAt = startedAtProto
closure.DeckUri = request.Event.DeckUri
return nil
}

Expand Down
96 changes: 67 additions & 29 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func getPluginMetricKey(pluginID, taskType string) string {
return taskType + "_" + pluginID
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) {
func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) {
p.ttype = handler.TransitionTypeEphemeral
p.pInfo = pluginCore.PhaseInfoSuccess(nil)
p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
}

func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) {
Expand Down Expand Up @@ -144,10 +144,13 @@ func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInp
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
DeckURI: deckPath,
func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
}
} else {
p.execInfo.OutputInfo.OutputURI = outputPath
}

p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{
Expand All @@ -171,7 +174,43 @@ func (p *pluginRequestedTransition) FinalTransition(ctx context.Context) (handle
}

logger.Debugf(ctx, "Task still running")
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil
}

// AddDeckURI incorporates the deck URI into the plugin execution info regardless of whether the URI exists in remote storage or not.
func (p *pluginRequestedTransition) AddDeckURI(ctx context.Context, tCtx *taskExecutionContext) {
var deckURI *storage.DataReference
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{
DeckURI: deckURI,
}
} else {
p.execInfo.OutputInfo.DeckURI = deckURI
}

}

// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage.
func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error {
reader := tCtx.ow.GetReader()
if reader == nil && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
return nil
}

exists, err := reader.DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return regErrors.Wrapf(err, "failed to check existence of deck file")
}

if !exists && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
}

return nil
}

// The plugin interface available especially for testing.
Expand Down Expand Up @@ -463,8 +502,20 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
}
}

// Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality.
// The deck should be accessible even if the task is still running or has failed.
// It's possible that the deck URI may not exist in remote storage yet or will never be exist.
// So, it is console's responsibility to handle the case when the deck URI actually does not exist.
pluginTrns.AddDeckURI(ctx, tCtx)

switch pluginTrns.pInfo.Phase() {
case pluginCore.PhaseSuccess:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
if err != nil {
return pluginTrns, err
}

// -------------------------------------
// TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes
// This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute
Expand All @@ -490,6 +541,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
outputCommitter := ioutils.NewRemoteFileOutputWriter(ctx, tCtx.DataStore(), tCtx.OutputWriter())
ee, err := t.ValidateOutput(ctx, tCtx.NodeID(), tCtx.InputReader(), tCtx.ow.GetReader(),
outputCommitter, tCtx.ExecutionContext().GetExecutionConfig(), tCtx.tr)

if err != nil {
return nil, err
}
Expand All @@ -500,39 +552,25 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
} else {
var deckURI *storage.DataReference
if tCtx.ow.GetReader() != nil {
exists, err := tCtx.ow.GetReader().DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file")
} else if exists {
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue
}
}
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI,
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(),
&event.TaskNodeMetadata{
CacheStatus: pluginTrns.execInfo.TaskNodeInfo.TaskNodeMetadata.GetCacheStatus(),
CatalogKey: pluginTrns.execInfo.TaskNodeInfo.TaskNodeMetadata.GetCatalogKey(),
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
}
case pluginCore.PhaseRetryableFailure:
fallthrough
case pluginCore.PhasePermanentFailure:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseFailure).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
if err != nil {
return pluginTrns, err
}
pluginTrns.ObservedFailure(
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
// s3://my-s3-bucket/metadata/propeller/flytesnacks-development-a5sj4d6vs9s8r2ltn6hr/n0/data/0/deck.html
case pluginCore.PhaseRunning:
var deckURI *storage.DataReference
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue

pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI,
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
}

return pluginTrns, nil
Expand Down

0 comments on commit 2fdb802

Please sign in to comment.