diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 214540ac07..d1595890d8 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -6,7 +6,6 @@ import ( "runtime/debug" "time" - "github.com/golang/protobuf/ptypes" regErrors "github.com/pkg/errors" "k8s.io/client-go/kubernetes" @@ -836,23 +835,28 @@ func (t Handler) Abort(ctx context.Context, nCtx interfaces.NodeExecutionContext } } - taskExecID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID() - nodeExecutionID, err := getParentNodeExecIDForTask(&taskExecID, nCtx.ExecutionContext()) + phaseInfo := pluginCore.PhaseInfoFailed(pluginCore.PhaseAborted, &core.ExecutionError{ + Code: "Task Aborted", + Message: reason, + }, nil) + evInfo, err := ToTaskExecutionEvent(ToTaskExecutionEventInputs{ + TaskExecContext: tCtx, + InputReader: nCtx.InputReader(), + EventConfig: t.eventConfig, + OutputWriter: tCtx.ow, + Info: phaseInfo, + NodeExecutionMetadata: nCtx.NodeExecutionMetadata(), + ExecContext: nCtx.ExecutionContext(), + TaskType: ttype, + PluginID: p.GetID(), + ResourcePoolInfo: tCtx.rm.GetResourcePoolInfo(), + ClusterID: t.clusterID, + OccurredAt: time.Now(), + }) if err != nil { return err } - if err := evRecorder.RecordTaskEvent(ctx, &event.TaskExecutionEvent{ - TaskId: taskExecID.TaskId, - ParentNodeExecutionId: nodeExecutionID, - RetryAttempt: nCtx.CurrentAttempt(), - Phase: core.TaskExecution_ABORTED, - OccurredAt: ptypes.TimestampNow(), - OutputResult: &event.TaskExecutionEvent_Error{ - Error: &core.ExecutionError{ - Code: "Task Aborted", - Message: reason, - }}, - }, t.eventConfig); err != nil && !eventsErr.IsNotFound(err) && !eventsErr.IsEventIncompatibleClusterError(err) { + if err := evRecorder.RecordTaskEvent(ctx, evInfo, t.eventConfig); err != nil && !eventsErr.IsNotFound(err) && !eventsErr.IsEventIncompatibleClusterError(err) { // If a prior workflow/node/task execution event has failed because of an invalid cluster error, don't stall the abort // at this point in the clean-up. logger.Errorf(ctx, "failed to send event to Admin. error: %s", err.Error()) diff --git a/flytepropeller/pkg/controller/nodes/task/handler_test.go b/flytepropeller/pkg/controller/nodes/task/handler_test.go index 22e1d7451f..145daf806a 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/task/handler_test.go @@ -777,6 +777,7 @@ func Test_task_Abort(t *testing.T) { Kind: "sample", Name: "name", }) + nm.OnIsInterruptible().Return(false) taskID := &core.Identifier{} tr := &nodeMocks.TaskReader{} @@ -891,6 +892,7 @@ func Test_task_Abort(t *testing.T) { defaultPlugin: m, resourceManager: noopRm, agentService: &pluginCore.AgentService{}, + eventConfig: eventConfig, } nCtx := createNodeCtx(tt.args.ev) if err := tk.Abort(context.TODO(), nCtx, "reason"); (err != nil) != tt.wantErr { @@ -939,6 +941,7 @@ func Test_task_Abort_v1(t *testing.T) { Kind: "sample", Name: "name", }) + nm.OnIsInterruptible().Return(false) taskID := &core.Identifier{} tr := &nodeMocks.TaskReader{} @@ -1053,6 +1056,7 @@ func Test_task_Abort_v1(t *testing.T) { defaultPlugin: m, resourceManager: noopRm, agentService: &pluginCore.AgentService{}, + eventConfig: eventConfig, } nCtx := createNodeCtx(tt.args.ev) if err := tk.Abort(context.TODO(), nCtx, "reason"); (err != nil) != tt.wantErr {