Skip to content

Commit

Permalink
Merge branch 'master' into vraiyani/tasklog-template
Browse files Browse the repository at this point in the history
  • Loading branch information
pingsutw authored Jan 8, 2024
2 parents c887275 + 842d3a8 commit 8bbdd44
Show file tree
Hide file tree
Showing 39 changed files with 1,737 additions and 3,146 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flyteidl-buf-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Publish flyteidl Buf Package
on:
push:
branches:
- artifacts-shell
- artifacts-shell-2
- artifacts
- master
paths:
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/pkg/artifacts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func NewArtifactRegistry(ctx context.Context, connCfg *admin2.Config, _ ...grpc.
client: nil,
}
}
var cfg = connCfg
clients, err := admin2.NewClientsetBuilder().WithConfig(cfg).Build(ctx)

clients, err := admin2.NewClientsetBuilder().WithConfig(connCfg).Build(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to create Artifact client")
// too many calls to this function to update, just panic for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,7 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
// For now, don't append any additional information unless succeeded
if rawEvent.Phase != core.WorkflowExecution_SUCCEEDED {
return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: nil,
OutputInterface: nil,
RawEvent: rawEvent,
}, nil
}

Expand Down Expand Up @@ -181,39 +179,16 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
}
}

// Get inputs to the workflow execution
var inputs *core.LiteralMap
inputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, executionModel.InputsURI.String())
if err != nil {
logger.Warningf(ctx, "Error fetching input literal map %s", executionModel.InputsURI.String())
}
// The spec is used to retrieve metadata fields
spec := &admin.ExecutionSpec{}
err = proto.Unmarshal(executionModel.Spec, spec)
if err != nil {
fmt.Printf("there was an error with spec %v %v", err, executionModel.Spec)
}

// Get outputs from the workflow execution
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig, c.storageClient, rawEvent.GetOutputUri())
if err != nil {
// gatepr: metric this
logger.Warningf(ctx, "Error fetching output literal map %v", rawEvent)
return nil, err
}
}

return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: &workflowInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
Principal: spec.GetMetadata().Principal,
Expand Down Expand Up @@ -303,40 +278,6 @@ func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Con
fmt.Printf("there was an error with spec %v %v", err, executionModel.Spec)
}

// Get inputs/outputs
// This will likely need to move to the artifact service side, given message size limits.
// Replace with call to GetNodeExecutionData
var inputs *core.LiteralMap
logger.Debugf(ctx, "RawEvent id %v", rawEvent.Id)
if len(rawEvent.GetInputUri()) > 0 {
inputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, rawEvent.GetInputUri())
logger.Debugf(ctx, "RawEvent input uri %v, %v", rawEvent.GetInputUri(), inputs)

if err != nil {
fmt.Printf("Error fetching input literal map %v", rawEvent)
}
} else if rawEvent.GetInputData() != nil {
inputs = rawEvent.GetInputData()
logger.Debugf(ctx, "RawEvent input data %v, %v", rawEvent.GetInputData(), inputs)
} else {
logger.Infof(ctx, "Node execution for node exec [%+v] has no input data", rawEvent.Id)
}

// This will likely need to move to the artifact service side, given message size limits.
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, rawEvent.GetOutputUri())
if err != nil {
fmt.Printf("Error fetching output literal map %v", rawEvent)
return nil, err
}
}

// Fetch the latest task execution if any, and pull out the task interface, if applicable.
// These are optional fields... if the node execution doesn't have a task execution then these will be empty.
var taskExecID *core.TaskExecutionIdentifier
Expand Down Expand Up @@ -369,13 +310,10 @@ func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Con
taskExecID = lte.Id
}

logger.Debugf(ctx, "RawEvent inputs %v", inputs)
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
TaskExecId: taskExecID,
OutputData: outputs,
OutputInterface: typedInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
Principal: spec.GetMetadata().Principal,
LaunchPlanId: spec.LaunchPlan,
Expand Down
12 changes: 8 additions & 4 deletions flyteadmin/pkg/manager/impl/exec_manager_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,16 @@ func TestTrackingBitExtract(t *testing.T) {
},
}

trackers := execManager.ExtractArtifactKeys(&lit)
var trackers = make(map[string]string)
execManager.ExtractArtifactTrackers(trackers, &lit)
assert.Equal(t, 1, len(trackers))

trackers = execManager.ExtractArtifactKeys(&core.Literal{Value: &core.Literal_Map{Map: &inputMap}})
trackers = make(map[string]string)
execManager.ExtractArtifactTrackers(trackers, &core.Literal{Value: &core.Literal_Map{Map: &inputMap}})
assert.Equal(t, 1, len(trackers))
trackers = execManager.ExtractArtifactKeys(&core.Literal{Value: &core.Literal_Collection{Collection: &inputColl}})

trackers = make(map[string]string)
execManager.ExtractArtifactTrackers(trackers, &core.Literal{Value: &core.Literal_Collection{Collection: &inputColl}})
assert.Equal(t, 1, len(trackers))
assert.Equal(t, "proj/domain/name@version", trackers[0])
assert.Equal(t, "", trackers["proj/domain/name@version"])
}
52 changes: 27 additions & 25 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
)

const childContainerQueueKey = "child_queue"
const artifactTrackerKey = "_ua"

// Map of [project] -> map of [domain] -> stop watch
type projectDomainScopedStopWatchMap = map[string]map[string]*promutils.StopWatch
Expand Down Expand Up @@ -700,31 +701,26 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se
}
}

// ExtractArtifactKeys pulls out artifact keys from Literals for lineage
// todo: rename this function to be less confusing
func (m *ExecutionManager) ExtractArtifactKeys(input *core.Literal) []string {
var artifactKeys []string
// ExtractArtifactTrackers pulls out artifact tracker strings from Literals for lineage
func (m *ExecutionManager) ExtractArtifactTrackers(artifactTrackers map[string]string, input *core.Literal) {

if input == nil {
return artifactKeys
return
}
if input.GetMetadata() != nil {
if artifactKey, ok := input.GetMetadata()["_ua"]; ok {
artifactKeys = append(artifactKeys, artifactKey)
if tracker, ok := input.GetMetadata()[artifactTrackerKey]; ok {
artifactTrackers[tracker] = ""
}
}
if input.GetCollection() != nil {
for _, v := range input.GetCollection().Literals {
mapKeys := m.ExtractArtifactKeys(v)
artifactKeys = append(artifactKeys, mapKeys...)
m.ExtractArtifactTrackers(artifactTrackers, v)
}
} else if input.GetMap() != nil {
for _, v := range input.GetMap().Literals {
mapKeys := m.ExtractArtifactKeys(v)
artifactKeys = append(artifactKeys, mapKeys...)
m.ExtractArtifactTrackers(artifactTrackers, v)
}
}
return artifactKeys
}

// getStringFromInput should be called when a tag or partition value is a binding to an input. the input is looked up
Expand Down Expand Up @@ -976,7 +972,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

// TODO: Artifact feature gate, remove when ready
var lpExpectedInputs *core.ParameterMap
var artifactTrackers []string
var artifactTrackers = make(map[string]string)
var usedArtifactIDs []*core.ArtifactID
if m.artifactRegistry.GetClient() != nil {
// Literals may have an artifact key in the metadata field. This is something the artifact service should have
Expand All @@ -988,9 +984,8 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
fixedInputMap := &core.Literal{
Value: &core.Literal_Map{Map: launchPlan.Spec.FixedInputs},
}
artifactTrackers = m.ExtractArtifactKeys(requestInputMap)
fixedInputArtifactKeys := m.ExtractArtifactKeys(fixedInputMap)
artifactTrackers = append(artifactTrackers, fixedInputArtifactKeys...)
m.ExtractArtifactTrackers(artifactTrackers, requestInputMap)
m.ExtractArtifactTrackers(artifactTrackers, fixedInputMap)

// Put together the inputs that we've already resolved so that the artifact querying bit can fill them in.
// This is to support artifact queries that depend on other inputs using the {{ .inputs.var }} construct.
Expand Down Expand Up @@ -1018,7 +1013,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
}

logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, lpExpectedInputs)
logger.Debugf(ctx, "Found artifact keys: %v", artifactTrackers)
logger.Debugf(ctx, "Found artifact trackers: %v", artifactTrackers)
logger.Debugf(ctx, "Found artifact IDs: %v", usedArtifactIDs)

} else {
Expand Down Expand Up @@ -1173,6 +1168,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
// Publish of event is also gated on the artifact client being available, even though it's not directly required.
// TODO: Artifact feature gate, remove when ready
if m.artifactRegistry.GetClient() != nil {
// TODO: Add principal
m.publishExecutionStart(ctx, workflowExecutionID, request.Spec.LaunchPlan, workflow.Id, artifactTrackers, usedArtifactIDs)
}

Expand Down Expand Up @@ -1227,17 +1223,23 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

// publishExecutionStart is an event that Admin publishes for artifact lineage.
func (m *ExecutionManager) publishExecutionStart(ctx context.Context, executionID core.WorkflowExecutionIdentifier,
launchPlanID *core.Identifier, workflowID *core.Identifier, inputArtifactKeys []string, usedArtifactIDs []*core.ArtifactID) {
launchPlanID *core.Identifier, workflowID *core.Identifier, artifactTrackers map[string]string, usedArtifactIDs []*core.ArtifactID) {

var artifactTrackerList []string
// Use a list instead of the fake set
for k := range artifactTrackers {
artifactTrackerList = append(artifactTrackerList, k)
}

if len(inputArtifactKeys) > 0 || len(usedArtifactIDs) > 0 {
logger.Debugf(ctx, "Sending execution start event for execution [%+v] with input artifact keys [%+v] and used artifact ids [%+v]", executionID, inputArtifactKeys, usedArtifactIDs)
if len(artifactTrackerList) > 0 || len(usedArtifactIDs) > 0 {
logger.Debugf(ctx, "Sending execution start event for execution [%+v] with trackers [%+v] and artifact ids [%+v]", executionID, artifactTrackerList, usedArtifactIDs)

request := event.CloudEventExecutionStart{
ExecutionId: &executionID,
LaunchPlanId: launchPlanID,
WorkflowId: workflowID,
ArtifactIds: usedArtifactIDs,
ArtifactKeys: inputArtifactKeys,
ExecutionId: &executionID,
LaunchPlanId: launchPlanID,
WorkflowId: workflowID,
ArtifactIds: usedArtifactIDs,
ArtifactTrackers: artifactTrackerList,
}
go func() {
ceCtx := context.TODO()
Expand Down
Loading

0 comments on commit 8bbdd44

Please sign in to comment.