diff --git a/flyteadmin/pkg/manager/impl/task_execution_manager_test.go b/flyteadmin/pkg/manager/impl/task_execution_manager_test.go index 74b286b9683..7e256541e49 100644 --- a/flyteadmin/pkg/manager/impl/task_execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/task_execution_manager_test.go @@ -426,46 +426,52 @@ func TestCreateTaskEvent_UpdateTerminalEventError(t *testing.T) { }, nil }) - // request w/ non terminal phase - taskEventRequest.Event.Phase = core.TaskExecution_RUNNING - taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil, nil) - resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) - assert.Nil(t, resp) - adminError := err.(flyteAdminErrors.FlyteAdminError) - assert.Equal(t, adminError.Code(), codes.FailedPrecondition) - details, ok := adminError.GRPCStatus().Details()[0].(*admin.EventFailureReason) - assert.True(t, ok) - _, ok = details.GetReason().(*admin.EventFailureReason_AlreadyInTerminalState) - assert.True(t, ok) - - // request w/ different terminal phase - taskEventRequest.Event.Phase = core.TaskExecution_FAILED - taskEventRequest.Event.PhaseVersion = uint32(0) - taskExecManager = NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil, nil) - resp, err = taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) - assert.Nil(t, resp) - adminError = err.(flyteAdminErrors.FlyteAdminError) - assert.Equal(t, adminError.Code(), codes.FailedPrecondition) - details, ok = adminError.GRPCStatus().Details()[0].(*admin.EventFailureReason) - assert.True(t, ok) - _, ok = details.GetReason().(*admin.EventFailureReason_AlreadyInTerminalState) - assert.True(t, ok) - - // request w/ same terminal phase, not a later version - taskEventRequest.Event.Phase = core.TaskExecution_SUCCEEDED - taskEventRequest.Event.PhaseVersion = uint32(0) - taskExecManager = NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil, nil) - resp, err = taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) - assert.Nil(t, resp) - adminError = err.(flyteAdminErrors.FlyteAdminError) - assert.Equal(t, adminError.Code(), codes.AlreadyExists) + t.Run("CreateExecutionEvent_NonTerminalPhase", func(t *testing.T) { + taskEventRequest.Event.Phase = core.TaskExecution_RUNNING + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil, nil) + resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) + assert.Nil(t, resp) + adminError := err.(flyteAdminErrors.FlyteAdminError) + assert.Equal(t, adminError.Code(), codes.FailedPrecondition) + details, ok := adminError.GRPCStatus().Details()[0].(*admin.EventFailureReason) + assert.True(t, ok) + _, ok = details.GetReason().(*admin.EventFailureReason_AlreadyInTerminalState) + assert.True(t, ok) + }) - // request w/ same terminal phase, later version - taskEventRequest.Event.Phase = core.TaskExecution_SUCCEEDED - taskEventRequest.Event.PhaseVersion = uint32(1) - taskExecManager = NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, &mockPublisher, &mockPublisher) - _, err = taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) - assert.Nil(t, err) + t.Run("CreateExecutionEvent_DifferentTerminalPhase", func(t *testing.T) { + taskEventRequest.Event.Phase = core.TaskExecution_FAILED + taskEventRequest.Event.PhaseVersion = uint32(0) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil, nil) + resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) + assert.Nil(t, resp) + adminError := err.(flyteAdminErrors.FlyteAdminError) + assert.Equal(t, adminError.Code(), codes.FailedPrecondition) + details, ok := adminError.GRPCStatus().Details()[0].(*admin.EventFailureReason) + assert.True(t, ok) + _, ok = details.GetReason().(*admin.EventFailureReason_AlreadyInTerminalState) + assert.True(t, ok) + }) + + t.Run("CreateExecutionEvent_SameTerminalPhase_OldVersion", func(t *testing.T) { + // request w/ same terminal phase, not a later version + taskEventRequest.Event.Phase = core.TaskExecution_SUCCEEDED + taskEventRequest.Event.PhaseVersion = uint32(0) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil, nil) + resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) + assert.Nil(t, resp) + adminError := err.(flyteAdminErrors.FlyteAdminError) + assert.Equal(t, adminError.Code(), codes.AlreadyExists) + }) + + t.Run("CreateExecutionEvent_SameTerminalPhase_NewVersion", func(t *testing.T) { + // request w/ same terminal phase, later version + taskEventRequest.Event.Phase = core.TaskExecution_SUCCEEDED + taskEventRequest.Event.PhaseVersion = uint32(1) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, &mockPublisher, &mockPublisher) + _, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) + assert.Nil(t, err) + }) } func TestCreateTaskEvent_PhaseVersionChange(t *testing.T) {