diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index db02fabf07..dc36a308c4 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -47,6 +47,7 @@ import ( ) const childContainerQueueKey = "child_queue" +const artifactTrackerKey = "_ua" // Map of [project] -> map of [domain] -> stop watch type projectDomainScopedStopWatchMap = map[string]map[string]*promutils.StopWatch @@ -700,31 +701,26 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se } } -// ExtractArtifactKeys pulls out artifact keys from Literals for lineage -// todo: rename this function to be less confusing -func (m *ExecutionManager) ExtractArtifactKeys(input *core.Literal) []string { - var artifactKeys []string +// ExtractArtifactTrackers pulls out artifact tracker strings from Literals for lineage +func (m *ExecutionManager) ExtractArtifactTrackers(artifactTrackers map[string]string, input *core.Literal) { if input == nil { - return artifactKeys + return } if input.GetMetadata() != nil { - if artifactKey, ok := input.GetMetadata()["_ua"]; ok { - artifactKeys = append(artifactKeys, artifactKey) + if tracker, ok := input.GetMetadata()[artifactTrackerKey]; ok { + artifactTrackers[tracker] = "" } } if input.GetCollection() != nil { for _, v := range input.GetCollection().Literals { - mapKeys := m.ExtractArtifactKeys(v) - artifactKeys = append(artifactKeys, mapKeys...) + m.ExtractArtifactTrackers(artifactTrackers, v) } } else if input.GetMap() != nil { for _, v := range input.GetMap().Literals { - mapKeys := m.ExtractArtifactKeys(v) - artifactKeys = append(artifactKeys, mapKeys...) + m.ExtractArtifactTrackers(artifactTrackers, v) } } - return artifactKeys } // getStringFromInput should be called when a tag or partition value is a binding to an input. the input is looked up @@ -976,7 +972,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( // TODO: Artifact feature gate, remove when ready var lpExpectedInputs *core.ParameterMap - var artifactTrackers []string + var artifactTrackers = make(map[string]string) var usedArtifactIDs []*core.ArtifactID if m.artifactRegistry.GetClient() != nil { // Literals may have an artifact key in the metadata field. This is something the artifact service should have @@ -988,9 +984,8 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( fixedInputMap := &core.Literal{ Value: &core.Literal_Map{Map: launchPlan.Spec.FixedInputs}, } - artifactTrackers = m.ExtractArtifactKeys(requestInputMap) - fixedInputArtifactKeys := m.ExtractArtifactKeys(fixedInputMap) - artifactTrackers = append(artifactTrackers, fixedInputArtifactKeys...) + m.ExtractArtifactTrackers(artifactTrackers, requestInputMap) + m.ExtractArtifactTrackers(artifactTrackers, fixedInputMap) // Put together the inputs that we've already resolved so that the artifact querying bit can fill them in. // This is to support artifact queries that depend on other inputs using the {{ .inputs.var }} construct. @@ -1018,7 +1013,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( } logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, lpExpectedInputs) - logger.Debugf(ctx, "Found artifact keys: %v", artifactTrackers) + logger.Debugf(ctx, "Found artifact trackers: %v", artifactTrackers) logger.Debugf(ctx, "Found artifact IDs: %v", usedArtifactIDs) } else { @@ -1173,6 +1168,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( // Publish of event is also gated on the artifact client being available, even though it's not directly required. // TODO: Artifact feature gate, remove when ready if m.artifactRegistry.GetClient() != nil { + // TODO: Add principal m.publishExecutionStart(ctx, workflowExecutionID, request.Spec.LaunchPlan, workflow.Id, artifactTrackers, usedArtifactIDs) } @@ -1227,17 +1223,23 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( // publishExecutionStart is an event that Admin publishes for artifact lineage. func (m *ExecutionManager) publishExecutionStart(ctx context.Context, executionID core.WorkflowExecutionIdentifier, - launchPlanID *core.Identifier, workflowID *core.Identifier, inputArtifactKeys []string, usedArtifactIDs []*core.ArtifactID) { + launchPlanID *core.Identifier, workflowID *core.Identifier, artifactTrackers map[string]string, usedArtifactIDs []*core.ArtifactID) { + + var artifactTrackerList []string + // Use a list instead of the fake set + for k := range artifactTrackers { + artifactTrackerList = append(artifactTrackerList, k) + } - if len(inputArtifactKeys) > 0 || len(usedArtifactIDs) > 0 { - logger.Debugf(ctx, "Sending execution start event for execution [%+v] with input artifact keys [%+v] and used artifact ids [%+v]", executionID, inputArtifactKeys, usedArtifactIDs) + if len(artifactTrackerList) > 0 || len(usedArtifactIDs) > 0 { + logger.Debugf(ctx, "Sending execution start event for execution [%+v] with trackers [%+v] and artifact ids [%+v]", executionID, artifactTrackerList, usedArtifactIDs) request := event.CloudEventExecutionStart{ - ExecutionId: &executionID, - LaunchPlanId: launchPlanID, - WorkflowId: workflowID, - ArtifactIds: usedArtifactIDs, - ArtifactKeys: inputArtifactKeys, + ExecutionId: &executionID, + LaunchPlanId: launchPlanID, + WorkflowId: workflowID, + ArtifactIds: usedArtifactIDs, + ArtifactTrackers: artifactTrackerList, } go func() { ceCtx := context.TODO()