Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artifacts shell 2 #4649

Merged
merged 14 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flyteidl-buf-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Publish flyteidl Buf Package
on:
push:
branches:
- artifacts-shell
- artifacts-shell-2
- artifacts
- master
paths:
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/pkg/artifacts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func NewArtifactRegistry(ctx context.Context, connCfg *admin2.Config, _ ...grpc.
client: nil,
}
}
var cfg = connCfg
clients, err := admin2.NewClientsetBuilder().WithConfig(cfg).Build(ctx)

clients, err := admin2.NewClientsetBuilder().WithConfig(connCfg).Build(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to create Artifact client")
// too many calls to this function to update, just panic for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,7 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
// For now, don't append any additional information unless succeeded
if rawEvent.Phase != core.WorkflowExecution_SUCCEEDED {
return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: nil,
OutputInterface: nil,
RawEvent: rawEvent,
}, nil
}

Expand Down Expand Up @@ -181,39 +179,16 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
}
}

// Get inputs to the workflow execution
var inputs *core.LiteralMap
inputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, executionModel.InputsURI.String())
if err != nil {
logger.Warningf(ctx, "Error fetching input literal map %s", executionModel.InputsURI.String())
}
// The spec is used to retrieve metadata fields
spec := &admin.ExecutionSpec{}
err = proto.Unmarshal(executionModel.Spec, spec)
if err != nil {
fmt.Printf("there was an error with spec %v %v", err, executionModel.Spec)
}

// Get outputs from the workflow execution
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig, c.storageClient, rawEvent.GetOutputUri())
if err != nil {
// gatepr: metric this
logger.Warningf(ctx, "Error fetching output literal map %v", rawEvent)
return nil, err
}
}

return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: &workflowInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
Principal: spec.GetMetadata().Principal,
Expand Down Expand Up @@ -303,40 +278,6 @@ func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Con
fmt.Printf("there was an error with spec %v %v", err, executionModel.Spec)
}

// Get inputs/outputs
// This will likely need to move to the artifact service side, given message size limits.
// Replace with call to GetNodeExecutionData
var inputs *core.LiteralMap
logger.Debugf(ctx, "RawEvent id %v", rawEvent.Id)
if len(rawEvent.GetInputUri()) > 0 {
inputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, rawEvent.GetInputUri())
logger.Debugf(ctx, "RawEvent input uri %v, %v", rawEvent.GetInputUri(), inputs)

if err != nil {
fmt.Printf("Error fetching input literal map %v", rawEvent)
}
} else if rawEvent.GetInputData() != nil {
inputs = rawEvent.GetInputData()
logger.Debugf(ctx, "RawEvent input data %v, %v", rawEvent.GetInputData(), inputs)
} else {
logger.Infof(ctx, "Node execution for node exec [%+v] has no input data", rawEvent.Id)
}

// This will likely need to move to the artifact service side, given message size limits.
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, rawEvent.GetOutputUri())
if err != nil {
fmt.Printf("Error fetching output literal map %v", rawEvent)
return nil, err
}
}

// Fetch the latest task execution if any, and pull out the task interface, if applicable.
// These are optional fields... if the node execution doesn't have a task execution then these will be empty.
var taskExecID *core.TaskExecutionIdentifier
Expand Down Expand Up @@ -369,13 +310,10 @@ func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Con
taskExecID = lte.Id
}

logger.Debugf(ctx, "RawEvent inputs %v", inputs)
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
TaskExecId: taskExecID,
OutputData: outputs,
OutputInterface: typedInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
Principal: spec.GetMetadata().Principal,
LaunchPlanId: spec.LaunchPlan,
Expand Down
12 changes: 8 additions & 4 deletions flyteadmin/pkg/manager/impl/exec_manager_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,16 @@ func TestTrackingBitExtract(t *testing.T) {
},
}

trackers := execManager.ExtractArtifactKeys(&lit)
var trackers = make(map[string]string)
execManager.ExtractArtifactTrackers(trackers, &lit)
assert.Equal(t, 1, len(trackers))

trackers = execManager.ExtractArtifactKeys(&core.Literal{Value: &core.Literal_Map{Map: &inputMap}})
trackers = make(map[string]string)
execManager.ExtractArtifactTrackers(trackers, &core.Literal{Value: &core.Literal_Map{Map: &inputMap}})
assert.Equal(t, 1, len(trackers))
trackers = execManager.ExtractArtifactKeys(&core.Literal{Value: &core.Literal_Collection{Collection: &inputColl}})

trackers = make(map[string]string)
execManager.ExtractArtifactTrackers(trackers, &core.Literal{Value: &core.Literal_Collection{Collection: &inputColl}})
assert.Equal(t, 1, len(trackers))
assert.Equal(t, "proj/domain/name@version", trackers[0])
assert.Equal(t, "", trackers["proj/domain/name@version"])
}
52 changes: 27 additions & 25 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
)

const childContainerQueueKey = "child_queue"
const artifactTrackerKey = "_ua"

// Map of [project] -> map of [domain] -> stop watch
type projectDomainScopedStopWatchMap = map[string]map[string]*promutils.StopWatch
Expand Down Expand Up @@ -700,31 +701,26 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se
}
}

// ExtractArtifactKeys pulls out artifact keys from Literals for lineage
// todo: rename this function to be less confusing
func (m *ExecutionManager) ExtractArtifactKeys(input *core.Literal) []string {
var artifactKeys []string
// ExtractArtifactTrackers pulls out artifact tracker strings from Literals for lineage
func (m *ExecutionManager) ExtractArtifactTrackers(artifactTrackers map[string]string, input *core.Literal) {

if input == nil {
return artifactKeys
return
}
if input.GetMetadata() != nil {
if artifactKey, ok := input.GetMetadata()["_ua"]; ok {
artifactKeys = append(artifactKeys, artifactKey)
if tracker, ok := input.GetMetadata()[artifactTrackerKey]; ok {
artifactTrackers[tracker] = ""
}
}
if input.GetCollection() != nil {
for _, v := range input.GetCollection().Literals {
mapKeys := m.ExtractArtifactKeys(v)
artifactKeys = append(artifactKeys, mapKeys...)
m.ExtractArtifactTrackers(artifactTrackers, v)
}
} else if input.GetMap() != nil {
for _, v := range input.GetMap().Literals {
mapKeys := m.ExtractArtifactKeys(v)
artifactKeys = append(artifactKeys, mapKeys...)
m.ExtractArtifactTrackers(artifactTrackers, v)
}
}
return artifactKeys
}

// getStringFromInput should be called when a tag or partition value is a binding to an input. the input is looked up
Expand Down Expand Up @@ -976,7 +972,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

// TODO: Artifact feature gate, remove when ready
var lpExpectedInputs *core.ParameterMap
var artifactTrackers []string
var artifactTrackers = make(map[string]string)
var usedArtifactIDs []*core.ArtifactID
if m.artifactRegistry.GetClient() != nil {
// Literals may have an artifact key in the metadata field. This is something the artifact service should have
Expand All @@ -988,9 +984,8 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
fixedInputMap := &core.Literal{
Value: &core.Literal_Map{Map: launchPlan.Spec.FixedInputs},
}
artifactTrackers = m.ExtractArtifactKeys(requestInputMap)
fixedInputArtifactKeys := m.ExtractArtifactKeys(fixedInputMap)
artifactTrackers = append(artifactTrackers, fixedInputArtifactKeys...)
m.ExtractArtifactTrackers(artifactTrackers, requestInputMap)
m.ExtractArtifactTrackers(artifactTrackers, fixedInputMap)

// Put together the inputs that we've already resolved so that the artifact querying bit can fill them in.
// This is to support artifact queries that depend on other inputs using the {{ .inputs.var }} construct.
Expand Down Expand Up @@ -1018,7 +1013,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
}

logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, lpExpectedInputs)
logger.Debugf(ctx, "Found artifact keys: %v", artifactTrackers)
logger.Debugf(ctx, "Found artifact trackers: %v", artifactTrackers)
logger.Debugf(ctx, "Found artifact IDs: %v", usedArtifactIDs)

} else {
Expand Down Expand Up @@ -1173,6 +1168,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
// Publish of event is also gated on the artifact client being available, even though it's not directly required.
// TODO: Artifact feature gate, remove when ready
if m.artifactRegistry.GetClient() != nil {
// TODO: Add principal
m.publishExecutionStart(ctx, workflowExecutionID, request.Spec.LaunchPlan, workflow.Id, artifactTrackers, usedArtifactIDs)
}

Expand Down Expand Up @@ -1227,17 +1223,23 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

// publishExecutionStart is an event that Admin publishes for artifact lineage.
func (m *ExecutionManager) publishExecutionStart(ctx context.Context, executionID core.WorkflowExecutionIdentifier,
launchPlanID *core.Identifier, workflowID *core.Identifier, inputArtifactKeys []string, usedArtifactIDs []*core.ArtifactID) {
launchPlanID *core.Identifier, workflowID *core.Identifier, artifactTrackers map[string]string, usedArtifactIDs []*core.ArtifactID) {

var artifactTrackerList []string
// Use a list instead of the fake set
for k := range artifactTrackers {
artifactTrackerList = append(artifactTrackerList, k)
}

if len(inputArtifactKeys) > 0 || len(usedArtifactIDs) > 0 {
logger.Debugf(ctx, "Sending execution start event for execution [%+v] with input artifact keys [%+v] and used artifact ids [%+v]", executionID, inputArtifactKeys, usedArtifactIDs)
if len(artifactTrackerList) > 0 || len(usedArtifactIDs) > 0 {
logger.Debugf(ctx, "Sending execution start event for execution [%+v] with trackers [%+v] and artifact ids [%+v]", executionID, artifactTrackerList, usedArtifactIDs)

request := event.CloudEventExecutionStart{
ExecutionId: &executionID,
LaunchPlanId: launchPlanID,
WorkflowId: workflowID,
ArtifactIds: usedArtifactIDs,
ArtifactKeys: inputArtifactKeys,
ExecutionId: &executionID,
LaunchPlanId: launchPlanID,
WorkflowId: workflowID,
ArtifactIds: usedArtifactIDs,
ArtifactTrackers: artifactTrackerList,
}
go func() {
ceCtx := context.TODO()
Expand Down
Loading
Loading