Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream/node event update #5528

Merged
merged 2 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions flyteidl/clients/go/assets/admin.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions flyteidl/gen/pb-es/flyteidl/event/event_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions flyteidl/gen/pb-go/flyteidl/event/event.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions flyteidl/gen/pb_rust/flyteidl.event.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions flyteidl/protos/flyteidl/event/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,12 @@ message NodeExecutionEvent {
// Indicates if this node is an ArrayNode.
bool is_array = 22;

// Holding this field here for now, this will be upstreamed soon.
// So that Admin doesn't have to rebuild the node execution graph to find the target entity, propeller will fill this
// in optionally - currently this is only filled in for subworkflows. This is the ID of the subworkflow corresponding
// to this node execution. It is difficult to find because Admin only sees one node at a time. A subworkflow could be
// nested multiple layers deep, and you'd need to access the correct workflow template to know the target subworkflow.
core.Identifier target_entity = 23;

// Holding this field here for now, this will be upstreamed soon.
// Tasks and subworkflows (but not launch plans) that are run within a dynamic task are effectively independent of
// the tasks that are registered in Admin's db. Confusingly, they are often identical, but sometimes they are not
// even registered at all. Similar to the target_entity field, at the time Admin receives this event, it has no idea
Expand Down
13 changes: 10 additions & 3 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,18 @@
return in.OutputAliases
}

// In functions below, explicitly strip out nil type information because NodeSpec's WorkflowNode is a struct type,
// not interface and downstream nil checks will not pass.
// See the test in TestPointersForNodeSpec for more information.

func (in *NodeSpec) GetWorkflowNode() ExecutableWorkflowNode {
if in.WorkflowNode == nil {
return nil
if in != nil {
if in.WorkflowNode == nil {
return nil
}
return in.WorkflowNode

Check warning on line 236 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes.go#L236

Added line #L236 was not covered by tests
}
return in.WorkflowNode
return nil

Check warning on line 238 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes.go#L238

Added line #L238 was not covered by tests
}

func (in *NodeSpec) GetBranchNode() ExecutableBranchNode {
Expand Down
51 changes: 51 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package v1alpha1

import (
"testing"

"github.com/stretchr/testify/assert"
)

type CanDo interface {
MyDo() int
}

type Concrete struct {
Doer CanDo
}

func (c *Concrete) MyDo() int {
return 1
}

type Parent struct {
Concrete *Concrete
}

func (p *Parent) GetDoer() CanDo {
return p.Concrete
}

func (p *Parent) GetConcreteDoer() *Concrete {
return p.Concrete
}

func TestPointersForNodeSpec(t *testing.T) {
p := &Parent{
Concrete: nil,
}
// GetDoer returns a fake nil because it carries type information
// assert.NotNil(t, p.GetDoer()) funnily enough doesn't work, so use a regular if statement
if p.GetDoer() == nil {
assert.Fail(t, "GetDoer")
}

assert.Nil(t, p.GetConcreteDoer())
}

func TestNodeSpec(t *testing.T) {
n := &NodeSpec{
WorkflowNode: nil,
}
assert.Nil(t, n.GetWorkflowNode())
}
10 changes: 8 additions & 2 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@
}

func (in *WorkflowNodeSpec) GetLaunchPlanRefID() *LaunchPlanRefID {
return in.LaunchPlanRefID
if in != nil {
return in.LaunchPlanRefID

Check warning on line 19 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go#L18-L19

Added lines #L18 - L19 were not covered by tests
}
return nil

Check warning on line 21 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go#L21

Added line #L21 was not covered by tests
}

func (in *WorkflowNodeSpec) GetSubWorkflowRef() *WorkflowID {
return in.SubWorkflowReference
if in != nil {
return in.SubWorkflowReference

Check warning on line 26 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go#L25-L26

Added lines #L25 - L26 were not covered by tests
}
return nil

Check warning on line 28 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go#L28

Added line #L28 was not covered by tests
}
17 changes: 12 additions & 5 deletions flytepropeller/pkg/controller/executors/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ParentInfoGetter interface {
type ImmutableParentInfo interface {
GetUniqueID() v1alpha1.NodeID
CurrentAttempt() uint32
IsInDynamicChain() bool
}

type ControlFlow interface {
Expand Down Expand Up @@ -60,14 +61,19 @@ func (e execContext) GetParentInfo() ImmutableParentInfo {
}

type parentExecutionInfo struct {
uniqueID v1alpha1.NodeID
currentAttempts uint32
uniqueID v1alpha1.NodeID
currentAttempts uint32
isInDynamicChain bool
}

func (p *parentExecutionInfo) GetUniqueID() v1alpha1.NodeID {
return p.uniqueID
}

func (p *parentExecutionInfo) IsInDynamicChain() bool {
return p.isInDynamicChain
}

func (p *parentExecutionInfo) CurrentAttempt() uint32 {
return p.currentAttempts
}
Expand Down Expand Up @@ -129,10 +135,11 @@ func NewExecutionContext(immExecContext ImmutableExecutionContext, tasksGetter T
}
}

func NewParentInfo(uniqueID string, currentAttempts uint32) ImmutableParentInfo {
func NewParentInfo(uniqueID string, currentAttempts uint32, isInDynamicChain bool) ImmutableParentInfo {
return &parentExecutionInfo{
currentAttempts: currentAttempts,
uniqueID: uniqueID,
currentAttempts: currentAttempts,
uniqueID: uniqueID,
isInDynamicChain: isInDynamicChain,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,22 @@ func TestExecutionContext(t *testing.T) {

func TestParentExecutionInfo_GetUniqueID(t *testing.T) {
expectedID := "testID"
parentInfo := NewParentInfo(expectedID, 1)
parentInfo := NewParentInfo(expectedID, 1, false)
assert.Equal(t, expectedID, parentInfo.GetUniqueID())
}

func TestParentExecutionInfo_CurrentAttempt(t *testing.T) {
expectedAttempt := uint32(123465)
parentInfo := NewParentInfo("testID", expectedAttempt)
parentInfo := NewParentInfo("testID", expectedAttempt, false)
assert.Equal(t, expectedAttempt, parentInfo.CurrentAttempt())
}

func TestParentExecutionInfo_DynamicChain(t *testing.T) {
expectedAttempt := uint32(123465)
parentInfo := NewParentInfo("testID", expectedAttempt, true)
assert.True(t, parentInfo.IsInDynamicChain())
}

func TestControlFlow_ControlFlowParallelism(t *testing.T) {
cFlow := InitializeControlFlow().(*controlFlow)
assert.Equal(t, uint32(0), cFlow.CurrentParallelism())
Expand All @@ -88,7 +94,7 @@ func TestControlFlow_ControlFlowParallelism(t *testing.T) {
func TestNewParentInfo(t *testing.T) {
expectedID := "testID"
expectedAttempt := uint32(123465)
parentInfo := NewParentInfo(expectedID, expectedAttempt).(*parentExecutionInfo)
parentInfo := NewParentInfo(expectedID, expectedAttempt, false).(*parentExecutionInfo)
assert.Equal(t, expectedID, parentInfo.uniqueID)
assert.Equal(t, expectedAttempt, parentInfo.currentAttempts)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions flytepropeller/pkg/controller/nodes/array/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@
timestamp := ptypes.TimestampNow()
workflowExecutionID := nCtx.ExecutionContext().GetExecutionID().WorkflowExecutionIdentifier

// send NodeExecutionEvent
// Extract dynamic chain information.
var dynamic = false
if nCtx.ExecutionContext() != nil && nCtx.ExecutionContext().GetParentInfo() != nil && nCtx.ExecutionContext().GetParentInfo().IsInDynamicChain() {
dynamic = true

Check warning on line 226 in flytepropeller/pkg/controller/nodes/array/event_recorder.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/event_recorder.go#L226

Added line #L226 was not covered by tests
}
nodeExecutionEvent := &event.NodeExecutionEvent{
Id: &idlcore.NodeExecutionIdentifier{
NodeId: subNodeID,
Expand All @@ -231,14 +235,15 @@
ParentNodeMetadata: &event.ParentNodeExecutionMetadata{
NodeId: nCtx.NodeID(),
},
ReportedAt: timestamp,
ReportedAt: timestamp,
IsInDynamicChain: dynamic,
}

if err := eventRecorder.RecordNodeEvent(ctx, nodeExecutionEvent, eventConfig); err != nil {
return err
}

// send TaskExeucutionEvent
// send TaskExecutionEvent
taskExecutionEvent := &event.TaskExecutionEvent{
TaskId: &idlcore.Identifier{
ResourceType: idlcore.ResourceType_TASK,
Expand Down
3 changes: 2 additions & 1 deletion flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut
} else {
// record events transitioning subNodes to aborted
retryAttempt := uint32(arrayNodeState.SubNodeRetryAttempts.GetItem(i))

if err := sendEvents(ctx, nCtx, i, retryAttempt, idlcore.NodeExecution_ABORTED, idlcore.TaskExecution_ABORTED, eventRecorder, a.eventConfig); err != nil {
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}
Expand Down Expand Up @@ -707,7 +708,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter
// initialize mocks
arrayNodeLookup := newArrayNodeLookup(nCtx.ContextualNodeLookup(), subNodeID, &subNodeSpec, subNodeStatus)

newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt())
newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt(), false)
if err != nil {
return nil, nil, nil, nil, nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/branch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (b *branchHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecutio
}

func (b *branchHandler) getExecutionContextForDownstream(nCtx interfaces.NodeExecutionContext) (executors.ExecutionContext, error) {
newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt())
newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt(), false)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading