Skip to content

Commit

Permalink
[flytepropeller][flyteadmin] Streaming Decks
Browse files Browse the repository at this point in the history
Signed-off-by: Future-Outlier <[email protected]>
  • Loading branch information
Future-Outlier committed Oct 2, 2024
1 parent 66ff152 commit d3cf4b8
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,27 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
// s3://my-s3-bucket/metadata/propeller/flytesnacks-development-a5sj4d6vs9s8r2ltn6hr/n0/data/0/deck.html
case pluginCore.PhaseRunning:
var deckURI *storage.DataReference
if tCtx.ow.GetReader() != nil {
exists, err := tCtx.ow.GetReader().DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file")
} else if exists {
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue

Check warning on line 536 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L530-L536

Added lines #L530 - L536 were not covered by tests
}
}

deckURIValue := storage.DataReference("s3://my-s3-bucket/metadata/propeller/flytesnacks-development-a5sj4d6vs9s8r2ltn6hr/n0/data/0/deck.html")
deckURI = &deckURIValue

pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI,
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
}

return pluginTrns, nil
Expand Down

0 comments on commit d3cf4b8

Please sign in to comment.