Skip to content

Commit

Permalink
Merge commit 'c639c98cc26616bc91da4e7374176cf8738596df' into artf/lin…
Browse files Browse the repository at this point in the history
…eage-and-fixes

Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor committed Jan 3, 2024
2 parents 858e274 + c639c98 commit d650b58
Show file tree
Hide file tree
Showing 13 changed files with 755 additions and 2,278 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flyteidl-buf-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Publish flyteidl Buf Package
on:
push:
branches:
- artifacts-shell
- artifacts-shell-2
- artifacts
- master
paths:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,7 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
// For now, don't append any additional information unless succeeded
if rawEvent.Phase != core.WorkflowExecution_SUCCEEDED {
return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: nil,
OutputInterface: nil,
RawEvent: rawEvent,
}, nil
}

Expand Down
52 changes: 27 additions & 25 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
)

const childContainerQueueKey = "child_queue"
const artifactTrackerKey = "_ua"

// Map of [project] -> map of [domain] -> stop watch
type projectDomainScopedStopWatchMap = map[string]map[string]*promutils.StopWatch
Expand Down Expand Up @@ -700,31 +701,26 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se
}
}

// ExtractArtifactKeys pulls out artifact keys from Literals for lineage
// todo: rename this function to be less confusing
func (m *ExecutionManager) ExtractArtifactKeys(input *core.Literal) []string {
var artifactKeys []string
// ExtractArtifactTrackers pulls out artifact tracker strings from Literals for lineage
func (m *ExecutionManager) ExtractArtifactTrackers(artifactTrackers map[string]string, input *core.Literal) {

if input == nil {
return artifactKeys
return
}
if input.GetMetadata() != nil {
if artifactKey, ok := input.GetMetadata()["_ua"]; ok {
artifactKeys = append(artifactKeys, artifactKey)
if tracker, ok := input.GetMetadata()[artifactTrackerKey]; ok {
artifactTrackers[tracker] = ""
}
}
if input.GetCollection() != nil {
for _, v := range input.GetCollection().Literals {
mapKeys := m.ExtractArtifactKeys(v)
artifactKeys = append(artifactKeys, mapKeys...)
m.ExtractArtifactTrackers(artifactTrackers, v)
}
} else if input.GetMap() != nil {
for _, v := range input.GetMap().Literals {
mapKeys := m.ExtractArtifactKeys(v)
artifactKeys = append(artifactKeys, mapKeys...)
m.ExtractArtifactTrackers(artifactTrackers, v)
}
}
return artifactKeys
}

// getStringFromInput should be called when a tag or partition value is a binding to an input. the input is looked up
Expand Down Expand Up @@ -976,7 +972,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

// TODO: Artifact feature gate, remove when ready
var lpExpectedInputs *core.ParameterMap
var artifactTrackers []string
var artifactTrackers = make(map[string]string)
var usedArtifactIDs []*core.ArtifactID
if m.artifactRegistry.GetClient() != nil {
// Literals may have an artifact key in the metadata field. This is something the artifact service should have
Expand All @@ -988,9 +984,8 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
fixedInputMap := &core.Literal{
Value: &core.Literal_Map{Map: launchPlan.Spec.FixedInputs},
}
artifactTrackers = m.ExtractArtifactKeys(requestInputMap)
fixedInputArtifactKeys := m.ExtractArtifactKeys(fixedInputMap)
artifactTrackers = append(artifactTrackers, fixedInputArtifactKeys...)
m.ExtractArtifactTrackers(artifactTrackers, requestInputMap)
m.ExtractArtifactTrackers(artifactTrackers, fixedInputMap)

// Put together the inputs that we've already resolved so that the artifact querying bit can fill them in.
// This is to support artifact queries that depend on other inputs using the {{ .inputs.var }} construct.
Expand Down Expand Up @@ -1018,7 +1013,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
}

logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, lpExpectedInputs)
logger.Debugf(ctx, "Found artifact keys: %v", artifactTrackers)
logger.Debugf(ctx, "Found artifact trackers: %v", artifactTrackers)
logger.Debugf(ctx, "Found artifact IDs: %v", usedArtifactIDs)

} else {
Expand Down Expand Up @@ -1173,6 +1168,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
// Publish of event is also gated on the artifact client being available, even though it's not directly required.
// TODO: Artifact feature gate, remove when ready
if m.artifactRegistry.GetClient() != nil {
// TODO: Add principal
m.publishExecutionStart(ctx, workflowExecutionID, request.Spec.LaunchPlan, workflow.Id, artifactTrackers, usedArtifactIDs)
}

Expand Down Expand Up @@ -1227,17 +1223,23 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

// publishExecutionStart is an event that Admin publishes for artifact lineage.
func (m *ExecutionManager) publishExecutionStart(ctx context.Context, executionID core.WorkflowExecutionIdentifier,
launchPlanID *core.Identifier, workflowID *core.Identifier, inputArtifactKeys []string, usedArtifactIDs []*core.ArtifactID) {
launchPlanID *core.Identifier, workflowID *core.Identifier, artifactTrackers map[string]string, usedArtifactIDs []*core.ArtifactID) {

var artifactTrackerList []string
// Use a list instead of the fake set
for k := range artifactTrackers {
artifactTrackerList = append(artifactTrackerList, k)
}

if len(inputArtifactKeys) > 0 || len(usedArtifactIDs) > 0 {
logger.Debugf(ctx, "Sending execution start event for execution [%+v] with input artifact keys [%+v] and used artifact ids [%+v]", executionID, inputArtifactKeys, usedArtifactIDs)
if len(artifactTrackerList) > 0 || len(usedArtifactIDs) > 0 {
logger.Debugf(ctx, "Sending execution start event for execution [%+v] with trackers [%+v] and artifact ids [%+v]", executionID, artifactTrackerList, usedArtifactIDs)

request := event.CloudEventExecutionStart{
ExecutionId: &executionID,
LaunchPlanId: launchPlanID,
WorkflowId: workflowID,
ArtifactIds: usedArtifactIDs,
ArtifactKeys: inputArtifactKeys,
ExecutionId: &executionID,
LaunchPlanId: launchPlanID,
WorkflowId: workflowID,
ArtifactIds: usedArtifactIDs,
ArtifactTrackers: artifactTrackerList,
}
go func() {
ceCtx := context.TODO()
Expand Down
Loading

0 comments on commit d650b58

Please sign in to comment.