Skip to content

Commit

Permalink
remove missed inputs/outputs
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor committed Dec 29, 2023
1 parent 362bda3 commit 0abc124
Showing 1 changed file with 1 addition and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,7 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
// For now, don't append any additional information unless succeeded
if rawEvent.Phase != core.WorkflowExecution_SUCCEEDED {
return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: nil,
OutputInterface: nil,
RawEvent: rawEvent,

Check warning on line 139 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L139

Added line #L139 was not covered by tests
}, nil
}

Expand Down Expand Up @@ -181,39 +179,16 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
}
}

// Get inputs to the workflow execution
var inputs *core.LiteralMap
inputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, executionModel.InputsURI.String())
if err != nil {
logger.Warningf(ctx, "Error fetching input literal map %s", executionModel.InputsURI.String())
}
// The spec is used to retrieve metadata fields
spec := &admin.ExecutionSpec{}
err = proto.Unmarshal(executionModel.Spec, spec)
if err != nil {
fmt.Printf("there was an error with spec %v %v", err, executionModel.Spec)
}

// Get outputs from the workflow execution
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig, c.storageClient, rawEvent.GetOutputUri())
if err != nil {
// gatepr: metric this
logger.Warningf(ctx, "Error fetching output literal map %v", rawEvent)
return nil, err
}
}

return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: &workflowInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
Principal: spec.GetMetadata().Principal,
Expand Down Expand Up @@ -303,40 +278,6 @@ func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Con
fmt.Printf("there was an error with spec %v %v", err, executionModel.Spec)
}

// Get inputs/outputs
// This will likely need to move to the artifact service side, given message size limits.
// Replace with call to GetNodeExecutionData
var inputs *core.LiteralMap
logger.Debugf(ctx, "RawEvent id %v", rawEvent.Id)
if len(rawEvent.GetInputUri()) > 0 {
inputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, rawEvent.GetInputUri())
logger.Debugf(ctx, "RawEvent input uri %v, %v", rawEvent.GetInputUri(), inputs)

if err != nil {
fmt.Printf("Error fetching input literal map %v", rawEvent)
}
} else if rawEvent.GetInputData() != nil {
inputs = rawEvent.GetInputData()
logger.Debugf(ctx, "RawEvent input data %v, %v", rawEvent.GetInputData(), inputs)
} else {
logger.Infof(ctx, "Node execution for node exec [%+v] has no input data", rawEvent.Id)
}

// This will likely need to move to the artifact service side, given message size limits.
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, rawEvent.GetOutputUri())
if err != nil {
fmt.Printf("Error fetching output literal map %v", rawEvent)
return nil, err
}
}

// Fetch the latest task execution if any, and pull out the task interface, if applicable.
// These are optional fields... if the node execution doesn't have a task execution then these will be empty.
var taskExecID *core.TaskExecutionIdentifier
Expand Down Expand Up @@ -369,13 +310,10 @@ func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Con
taskExecID = lte.Id
}

logger.Debugf(ctx, "RawEvent inputs %v", inputs)
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
TaskExecId: taskExecID,
OutputData: outputs,
OutputInterface: typedInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
Principal: spec.GetMetadata().Principal,
LaunchPlanId: spec.LaunchPlan,
Expand Down

0 comments on commit 0abc124

Please sign in to comment.