From d76bd9c2cd883b24c38b3bca1f511f9a3d99d902 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Mon, 4 Sep 2023 21:49:20 +0800 Subject: [PATCH 01/11] Add Pending State in Webapi Agent Signed-off-by: Future Outlier --- go/tasks/pluginmachinery/core/phase.go | 6 ++++++ go/tasks/plugins/webapi/agent/plugin.go | 2 ++ 2 files changed, 8 insertions(+) diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index 51d3e4e81..37e7e4e7c 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -25,6 +25,8 @@ const ( PhaseQueued // The system has started the pre-execution process, like container download, cluster startup etc PhaseInitializing + // Indicates that the task is awaiting resource allocation + PhasePending // Indicates that the task has started executing PhaseRunning // Indicates that the task has completed successfully @@ -238,6 +240,10 @@ func PhaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo) PhaseInf return phaseInfo(p, DefaultPhaseVersion, err, info, false) } +func PhaseInfoPending(version uint32, info *TaskInfo) PhaseInfo { + return phaseInfo(PhasePending, version, nil, info, false) +} + func PhaseInfoRunning(version uint32, info *TaskInfo) PhaseInfo { return phaseInfo(PhaseRunning, version, nil, info, false) } diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 128753b74..029964014 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -162,6 +162,8 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase taskInfo := &core.TaskInfo{} switch resource.State { + case admin.State_PENDING: + return core.PhaseInfoPending(core.DefaultPhaseVersion, taskInfo), nil case admin.State_RUNNING: return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil case admin.State_PERMANENT_FAILURE: From ec85912e7bce8ff1343d29eba34725530da7f7df Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Tue, 5 Sep 2023 12:21:16 +0800 Subject: [PATCH 02/11] change return core state Signed-off-by: Future Outlier --- go/tasks/pluginmachinery/core/phase.go | 6 ------ go/tasks/plugins/webapi/agent/plugin.go | 3 ++- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index 37e7e4e7c..51d3e4e81 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -25,8 +25,6 @@ const ( PhaseQueued // The system has started the pre-execution process, like container download, cluster startup etc PhaseInitializing - // Indicates that the task is awaiting resource allocation - PhasePending // Indicates that the task has started executing PhaseRunning // Indicates that the task has completed successfully @@ -240,10 +238,6 @@ func PhaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo) PhaseInf return phaseInfo(p, DefaultPhaseVersion, err, info, false) } -func PhaseInfoPending(version uint32, info *TaskInfo) PhaseInfo { - return phaseInfo(PhasePending, version, nil, info, false) -} - func PhaseInfoRunning(version uint32, info *TaskInfo) PhaseInfo { return phaseInfo(PhaseRunning, version, nil, info, false) } diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 029964014..3417a5f5d 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -5,6 +5,7 @@ import ( "crypto/x509" "encoding/gob" "fmt" + "time" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytestdlib/config" @@ -163,7 +164,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase switch resource.State { case admin.State_PENDING: - return core.PhaseInfoPending(core.DefaultPhaseVersion, taskInfo), nil + return core.PhaseInfoQueued(time.Now(), core.DefaultPhaseVersion, "Job Created"), nil case admin.State_RUNNING: return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil case admin.State_PERMANENT_FAILURE: From 763eaa70717490d6fcd1613b8e81a63a5ae17ca5 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Tue, 5 Sep 2023 16:26:04 +0800 Subject: [PATCH 03/11] add Status Test Signed-off-by: Future Outlier --- go/tasks/plugins/webapi/agent/plugin_test.go | 111 ++++++++++++++++++- 1 file changed, 110 insertions(+), 1 deletion(-) diff --git a/go/tasks/plugins/webapi/agent/plugin_test.go b/go/tasks/plugins/webapi/agent/plugin_test.go index 180a0d6e6..4656b0b28 100644 --- a/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/go/tasks/plugins/webapi/agent/plugin_test.go @@ -5,12 +5,13 @@ import ( "testing" "time" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytestdlib/config" - "google.golang.org/grpc" pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" pluginCoreMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" + webapiPlugin "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi/mocks" "github.com/flyteorg/flytestdlib/promutils" "github.com/stretchr/testify/assert" ) @@ -99,4 +100,112 @@ func TestPlugin(t *testing.T) { ctx, _ = getFinalContext(context.TODO(), "CreateTask", &Agent{Endpoint: "localhost:8080", Timeouts: map[string]config.Duration{"CreateTask": {Duration: 1 * time.Millisecond}}}) assert.NotEqual(t, context.TODO(), ctx) }) + + t.Run("test PENDING Status", func(t *testing.T) { + plugin := Plugin{ + metricScope: fakeSetupContext.MetricsScope(), + cfg: GetConfig(), + } + taskContext := new(webapiPlugin.StatusContext) + + taskContext.On("Resource").Return(&ResourceWrapper{ + State: admin.State_PENDING, + Outputs: nil, + }) + + plugin.Status(context.Background(), taskContext) + phase, err := plugin.Status(context.Background(), taskContext) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseQueued, phase.Phase()) + }) + + t.Run("test RUNNING Status", func(t *testing.T) { + plugin := Plugin{ + metricScope: fakeSetupContext.MetricsScope(), + cfg: GetConfig(), + } + taskContext := new(webapiPlugin.StatusContext) + + taskContext.On("Resource").Return(&ResourceWrapper{ + State: admin.State_RUNNING, + Outputs: nil, + }) + + plugin.Status(context.Background(), taskContext) + phase, err := plugin.Status(context.Background(), taskContext) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseRunning, phase.Phase()) + }) + + t.Run("test PERMANENT_FAILURE Status", func(t *testing.T) { + plugin := Plugin{ + metricScope: fakeSetupContext.MetricsScope(), + cfg: GetConfig(), + } + taskContext := new(webapiPlugin.StatusContext) + + taskContext.On("Resource").Return(&ResourceWrapper{ + State: admin.State_PERMANENT_FAILURE, + Outputs: nil, + }) + + plugin.Status(context.Background(), taskContext) + phase, err := plugin.Status(context.Background(), taskContext) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhasePermanentFailure, phase.Phase()) + }) + + t.Run("test RETRYABLE_FAILURE Status", func(t *testing.T) { + plugin := Plugin{ + metricScope: fakeSetupContext.MetricsScope(), + cfg: GetConfig(), + } + taskContext := new(webapiPlugin.StatusContext) + + taskContext.On("Resource").Return(&ResourceWrapper{ + State: admin.State_RETRYABLE_FAILURE, + Outputs: nil, + }) + + plugin.Status(context.Background(), taskContext) + phase, err := plugin.Status(context.Background(), taskContext) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseRetryableFailure, phase.Phase()) + }) + + t.Run("test SUCCEEDED Status", func(t *testing.T) { + plugin := Plugin{ + metricScope: fakeSetupContext.MetricsScope(), + cfg: GetConfig(), + } + taskContext := new(webapiPlugin.StatusContext) + + taskContext.On("Resource").Return(&ResourceWrapper{ + State: admin.State_SUCCEEDED, + Outputs: nil, + }) + + plugin.Status(context.Background(), taskContext) + phase, err := plugin.Status(context.Background(), taskContext) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseSuccess, phase.Phase()) + }) + + t.Run("test UNDEFINED Status", func(t *testing.T) { + plugin := Plugin{ + metricScope: fakeSetupContext.MetricsScope(), + cfg: GetConfig(), + } + taskContext := new(webapiPlugin.StatusContext) + + taskContext.On("Resource").Return(&ResourceWrapper{ + State: 5, + Outputs: nil, + }) + + plugin.Status(context.Background(), taskContext) + phase, err := plugin.Status(context.Background(), taskContext) + assert.Error(t, err) + assert.Equal(t, pluginsCore.PhaseUndefined, phase.Phase()) + }) } From 79975b25d44dc8de8189c0423da6d0bcd1e437c9 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Tue, 5 Sep 2023 16:31:42 +0800 Subject: [PATCH 04/11] lint Signed-off-by: Future Outlier --- go/tasks/plugins/webapi/agent/plugin_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/go/tasks/plugins/webapi/agent/plugin_test.go b/go/tasks/plugins/webapi/agent/plugin_test.go index 4656b0b28..e9383d088 100644 --- a/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/go/tasks/plugins/webapi/agent/plugin_test.go @@ -113,7 +113,6 @@ func TestPlugin(t *testing.T) { Outputs: nil, }) - plugin.Status(context.Background(), taskContext) phase, err := plugin.Status(context.Background(), taskContext) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseQueued, phase.Phase()) @@ -131,7 +130,6 @@ func TestPlugin(t *testing.T) { Outputs: nil, }) - plugin.Status(context.Background(), taskContext) phase, err := plugin.Status(context.Background(), taskContext) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseRunning, phase.Phase()) @@ -149,7 +147,6 @@ func TestPlugin(t *testing.T) { Outputs: nil, }) - plugin.Status(context.Background(), taskContext) phase, err := plugin.Status(context.Background(), taskContext) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhasePermanentFailure, phase.Phase()) @@ -167,7 +164,6 @@ func TestPlugin(t *testing.T) { Outputs: nil, }) - plugin.Status(context.Background(), taskContext) phase, err := plugin.Status(context.Background(), taskContext) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseRetryableFailure, phase.Phase()) @@ -185,7 +181,6 @@ func TestPlugin(t *testing.T) { Outputs: nil, }) - plugin.Status(context.Background(), taskContext) phase, err := plugin.Status(context.Background(), taskContext) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseSuccess, phase.Phase()) @@ -203,7 +198,6 @@ func TestPlugin(t *testing.T) { Outputs: nil, }) - plugin.Status(context.Background(), taskContext) phase, err := plugin.Status(context.Background(), taskContext) assert.Error(t, err) assert.Equal(t, pluginsCore.PhaseUndefined, phase.Phase()) From 53610087c29b009e051d3e4b314a56dfad58bb15 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Wed, 6 Sep 2023 01:25:58 +0800 Subject: [PATCH 05/11] refactor Signed-off-by: Future Outlier --- go/tasks/plugins/webapi/agent/plugin_test.go | 31 +------------------- 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/go/tasks/plugins/webapi/agent/plugin_test.go b/go/tasks/plugins/webapi/agent/plugin_test.go index e9383d088..8fbe3d4f2 100644 --- a/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/go/tasks/plugins/webapi/agent/plugin_test.go @@ -31,6 +31,7 @@ func TestPlugin(t *testing.T) { metricScope: fakeSetupContext.MetricsScope(), cfg: GetConfig(), } + t.Run("get config", func(t *testing.T) { err := SetConfig(&cfg) assert.NoError(t, err) @@ -102,12 +103,7 @@ func TestPlugin(t *testing.T) { }) t.Run("test PENDING Status", func(t *testing.T) { - plugin := Plugin{ - metricScope: fakeSetupContext.MetricsScope(), - cfg: GetConfig(), - } taskContext := new(webapiPlugin.StatusContext) - taskContext.On("Resource").Return(&ResourceWrapper{ State: admin.State_PENDING, Outputs: nil, @@ -119,12 +115,7 @@ func TestPlugin(t *testing.T) { }) t.Run("test RUNNING Status", func(t *testing.T) { - plugin := Plugin{ - metricScope: fakeSetupContext.MetricsScope(), - cfg: GetConfig(), - } taskContext := new(webapiPlugin.StatusContext) - taskContext.On("Resource").Return(&ResourceWrapper{ State: admin.State_RUNNING, Outputs: nil, @@ -136,12 +127,7 @@ func TestPlugin(t *testing.T) { }) t.Run("test PERMANENT_FAILURE Status", func(t *testing.T) { - plugin := Plugin{ - metricScope: fakeSetupContext.MetricsScope(), - cfg: GetConfig(), - } taskContext := new(webapiPlugin.StatusContext) - taskContext.On("Resource").Return(&ResourceWrapper{ State: admin.State_PERMANENT_FAILURE, Outputs: nil, @@ -153,12 +139,7 @@ func TestPlugin(t *testing.T) { }) t.Run("test RETRYABLE_FAILURE Status", func(t *testing.T) { - plugin := Plugin{ - metricScope: fakeSetupContext.MetricsScope(), - cfg: GetConfig(), - } taskContext := new(webapiPlugin.StatusContext) - taskContext.On("Resource").Return(&ResourceWrapper{ State: admin.State_RETRYABLE_FAILURE, Outputs: nil, @@ -170,12 +151,7 @@ func TestPlugin(t *testing.T) { }) t.Run("test SUCCEEDED Status", func(t *testing.T) { - plugin := Plugin{ - metricScope: fakeSetupContext.MetricsScope(), - cfg: GetConfig(), - } taskContext := new(webapiPlugin.StatusContext) - taskContext.On("Resource").Return(&ResourceWrapper{ State: admin.State_SUCCEEDED, Outputs: nil, @@ -187,12 +163,7 @@ func TestPlugin(t *testing.T) { }) t.Run("test UNDEFINED Status", func(t *testing.T) { - plugin := Plugin{ - metricScope: fakeSetupContext.MetricsScope(), - cfg: GetConfig(), - } taskContext := new(webapiPlugin.StatusContext) - taskContext.On("Resource").Return(&ResourceWrapper{ State: 5, Outputs: nil, From 503d1be56eeca69b29606082905d8c3f3dbb86d7 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Wed, 6 Sep 2023 10:38:34 +0800 Subject: [PATCH 06/11] change PENDING return function PhaseInfoQueuedWithTaskInfo Signed-off-by: Future Outlier --- go/tasks/plugins/webapi/agent/plugin.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 3417a5f5d..38222bb21 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -5,7 +5,6 @@ import ( "crypto/x509" "encoding/gob" "fmt" - "time" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytestdlib/config" @@ -164,7 +163,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase switch resource.State { case admin.State_PENDING: - return core.PhaseInfoQueued(time.Now(), core.DefaultPhaseVersion, "Job Created"), nil + return core.PhaseInfoQueuedWithTaskInfo(core.DefaultPhaseVersion, "Job is PENDING", taskInfo), nil case admin.State_RUNNING: return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil case admin.State_PERMANENT_FAILURE: From fa36634b8582c29674ded4f2ea4909cf1df346aa Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Sat, 9 Sep 2023 22:53:46 +0800 Subject: [PATCH 07/11] change phase queue to phase intializing Signed-off-by: Future Outlier --- go/tasks/pluginmachinery/core/phase.go | 1 - go/tasks/plugins/webapi/agent/plugin.go | 3 ++- go/tasks/plugins/webapi/agent/plugin_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index 51d3e4e81..1ecb8ca72 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -218,7 +218,6 @@ func PhaseInfoQueuedWithTaskInfo(version uint32, reason string, info *TaskInfo) } func PhaseInfoInitializing(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo { - pi := phaseInfo(PhaseInitializing, version, nil, info, false) pi.reason = reason return pi diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 38222bb21..5d47147dc 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -5,6 +5,7 @@ import ( "crypto/x509" "encoding/gob" "fmt" + "time" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytestdlib/config" @@ -163,7 +164,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase switch resource.State { case admin.State_PENDING: - return core.PhaseInfoQueuedWithTaskInfo(core.DefaultPhaseVersion, "Job is PENDING", taskInfo), nil + return core.PhaseInfoInitializing(time.Now(), core.DefaultPhaseVersion, "job submitted", taskInfo), nil case admin.State_RUNNING: return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil case admin.State_PERMANENT_FAILURE: diff --git a/go/tasks/plugins/webapi/agent/plugin_test.go b/go/tasks/plugins/webapi/agent/plugin_test.go index 8fbe3d4f2..2f5bfff4d 100644 --- a/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/go/tasks/plugins/webapi/agent/plugin_test.go @@ -111,7 +111,7 @@ func TestPlugin(t *testing.T) { phase, err := plugin.Status(context.Background(), taskContext) assert.NoError(t, err) - assert.Equal(t, pluginsCore.PhaseQueued, phase.Phase()) + assert.Equal(t, pluginsCore.PhaseInitializing, phase.Phase()) }) t.Run("test RUNNING Status", func(t *testing.T) { From 6b69092cfc585a9bc3c260c7d964ae450992d9a2 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Thu, 14 Sep 2023 09:19:23 +0800 Subject: [PATCH 08/11] Add Get Message in Agent Signed-off-by: Future Outlier --- go/tasks/plugins/webapi/agent/plugin.go | 4 +++- go/tasks/plugins/webapi/agent/plugin_test.go | 6 ++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 5d47147dc..4ce90a5bb 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -38,6 +38,7 @@ type Plugin struct { type ResourceWrapper struct { State admin.State Outputs *flyteIdl.LiteralMap + Message string } type ResourceMetaWrapper struct { @@ -133,6 +134,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba return &ResourceWrapper{ State: res.Resource.State, Outputs: res.Resource.Outputs, + Message: res.Resource.Message, }, nil } @@ -164,7 +166,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase switch resource.State { case admin.State_PENDING: - return core.PhaseInfoInitializing(time.Now(), core.DefaultPhaseVersion, "job submitted", taskInfo), nil + return core.PhaseInfoInitializing(time.Now(), core.DefaultPhaseVersion, resource.Message, taskInfo), nil case admin.State_RUNNING: return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil case admin.State_PERMANENT_FAILURE: diff --git a/go/tasks/plugins/webapi/agent/plugin_test.go b/go/tasks/plugins/webapi/agent/plugin_test.go index 2f5bfff4d..2bbfc4ccd 100644 --- a/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/go/tasks/plugins/webapi/agent/plugin_test.go @@ -107,6 +107,7 @@ func TestPlugin(t *testing.T) { taskContext.On("Resource").Return(&ResourceWrapper{ State: admin.State_PENDING, Outputs: nil, + Message: "Waiting for cluster", }) phase, err := plugin.Status(context.Background(), taskContext) @@ -119,6 +120,7 @@ func TestPlugin(t *testing.T) { taskContext.On("Resource").Return(&ResourceWrapper{ State: admin.State_RUNNING, Outputs: nil, + Message: "", }) phase, err := plugin.Status(context.Background(), taskContext) @@ -131,6 +133,7 @@ func TestPlugin(t *testing.T) { taskContext.On("Resource").Return(&ResourceWrapper{ State: admin.State_PERMANENT_FAILURE, Outputs: nil, + Message: "", }) phase, err := plugin.Status(context.Background(), taskContext) @@ -143,6 +146,7 @@ func TestPlugin(t *testing.T) { taskContext.On("Resource").Return(&ResourceWrapper{ State: admin.State_RETRYABLE_FAILURE, Outputs: nil, + Message: "", }) phase, err := plugin.Status(context.Background(), taskContext) @@ -155,6 +159,7 @@ func TestPlugin(t *testing.T) { taskContext.On("Resource").Return(&ResourceWrapper{ State: admin.State_SUCCEEDED, Outputs: nil, + Message: "", }) phase, err := plugin.Status(context.Background(), taskContext) @@ -167,6 +172,7 @@ func TestPlugin(t *testing.T) { taskContext.On("Resource").Return(&ResourceWrapper{ State: 5, Outputs: nil, + Message: "", }) phase, err := plugin.Status(context.Background(), taskContext) From 6835580efc6569d49f731aa525052db852e4dcd4 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Thu, 14 Sep 2023 16:16:25 +0800 Subject: [PATCH 09/11] Add log on flyte console! Signed-off-by: Future Outlier --- go/tasks/plugins/webapi/agent/plugin.go | 13 +++++++++++-- go/tasks/plugins/webapi/agent/plugin_test.go | 1 + 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 4ce90a5bb..2afdab52c 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -14,7 +14,7 @@ import ( "google.golang.org/grpc/grpclog" - flyteIdl "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + flyteIdlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" pluginErrors "github.com/flyteorg/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery" @@ -37,7 +37,7 @@ type Plugin struct { type ResourceWrapper struct { State admin.State - Outputs *flyteIdl.LiteralMap + Outputs *flyteIdlCore.LiteralMap Message string } @@ -166,6 +166,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase switch resource.State { case admin.State_PENDING: + taskInfo.Logs = createTaskLog(resource.Message) return core.PhaseInfoInitializing(time.Now(), core.DefaultPhaseVersion, resource.Message, taskInfo), nil case admin.State_RUNNING: return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil @@ -185,6 +186,14 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase return core.PhaseInfoUndefined, pluginErrors.Errorf(core.SystemErrorCode, "unknown execution phase [%v].", resource.State) } +func createTaskLog(message string) []*flyteIdlCore.TaskLog { + return []*flyteIdlCore.TaskLog{ + { + Name: message, + }, + } +} + func getFinalAgent(taskType string, cfg *Config) (*Agent, error) { if id, exists := cfg.AgentForTaskTypes[taskType]; exists { if agent, exists := cfg.Agents[id]; exists { diff --git a/go/tasks/plugins/webapi/agent/plugin_test.go b/go/tasks/plugins/webapi/agent/plugin_test.go index 2bbfc4ccd..15a1b142a 100644 --- a/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/go/tasks/plugins/webapi/agent/plugin_test.go @@ -113,6 +113,7 @@ func TestPlugin(t *testing.T) { phase, err := plugin.Status(context.Background(), taskContext) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseInitializing, phase.Phase()) + assert.Equal(t, "Waiting for cluster", phase.Info().Logs[0].Name) }) t.Run("test RUNNING Status", func(t *testing.T) { From 409b2b98810012ba6f41631d2b1923dec7445aed Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Thu, 14 Sep 2023 16:25:14 +0800 Subject: [PATCH 10/11] Add Running Message on console Signed-off-by: Future Outlier --- go/tasks/plugins/webapi/agent/plugin.go | 1 + go/tasks/plugins/webapi/agent/plugin_test.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 2afdab52c..6d7a9ce66 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -169,6 +169,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase taskInfo.Logs = createTaskLog(resource.Message) return core.PhaseInfoInitializing(time.Now(), core.DefaultPhaseVersion, resource.Message, taskInfo), nil case admin.State_RUNNING: + taskInfo.Logs = createTaskLog(resource.Message) return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil case admin.State_PERMANENT_FAILURE: return core.PhaseInfoFailure(pluginErrors.TaskFailedWithError, "failed to run the job", taskInfo), nil diff --git a/go/tasks/plugins/webapi/agent/plugin_test.go b/go/tasks/plugins/webapi/agent/plugin_test.go index 15a1b142a..73e40560b 100644 --- a/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/go/tasks/plugins/webapi/agent/plugin_test.go @@ -121,12 +121,13 @@ func TestPlugin(t *testing.T) { taskContext.On("Resource").Return(&ResourceWrapper{ State: admin.State_RUNNING, Outputs: nil, - Message: "", + Message: "Job is running", }) phase, err := plugin.Status(context.Background(), taskContext) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseRunning, phase.Phase()) + assert.Equal(t, "Job is running", phase.Info().Logs[0].Name) }) t.Run("test PERMANENT_FAILURE Status", func(t *testing.T) { From 90cff0bb5480ae5d160b60f6694b14825b03b062 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Wed, 27 Sep 2023 14:18:26 +0800 Subject: [PATCH 11/11] use reason on the log Signed-off-by: Future Outlier --- go/tasks/plugins/webapi/agent/plugin.go | 2 -- go/tasks/plugins/webapi/agent/plugin_test.go | 16 +--------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 8cfd1e46f..f59e94d6f 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -177,10 +177,8 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase switch resource.State { case admin.State_PENDING: - taskInfo.Logs = createTaskLog(resource.Message) return core.PhaseInfoInitializing(time.Now(), core.DefaultPhaseVersion, resource.Message, taskInfo), nil case admin.State_RUNNING: - taskInfo.Logs = createTaskLog(resource.Message) return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil case admin.State_PERMANENT_FAILURE: return core.PhaseInfoFailure(pluginErrors.TaskFailedWithError, "failed to run the job", taskInfo), nil diff --git a/go/tasks/plugins/webapi/agent/plugin_test.go b/go/tasks/plugins/webapi/agent/plugin_test.go index 0322d2a31..2915f3893 100644 --- a/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/go/tasks/plugins/webapi/agent/plugin_test.go @@ -113,7 +113,7 @@ func TestPlugin(t *testing.T) { phase, err := plugin.Status(context.Background(), taskContext) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseInitializing, phase.Phase()) - assert.Equal(t, "Waiting for cluster", phase.Info().Logs[0].Name) + assert.Equal(t, "Waiting for cluster", phase.Reason()) }) t.Run("test RUNNING Status", func(t *testing.T) { @@ -127,7 +127,6 @@ func TestPlugin(t *testing.T) { phase, err := plugin.Status(context.Background(), taskContext) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseRunning, phase.Phase()) - assert.Equal(t, "Job is running", phase.Info().Logs[0].Name) }) t.Run("test PERMANENT_FAILURE Status", func(t *testing.T) { @@ -156,19 +155,6 @@ func TestPlugin(t *testing.T) { assert.Equal(t, pluginsCore.PhaseRetryableFailure, phase.Phase()) }) - t.Run("test SUCCEEDED Status", func(t *testing.T) { - taskContext := new(webapiPlugin.StatusContext) - taskContext.On("Resource").Return(&ResourceWrapper{ - State: admin.State_SUCCEEDED, - Outputs: nil, - Message: "", - }) - - phase, err := plugin.Status(context.Background(), taskContext) - assert.NoError(t, err) - assert.Equal(t, pluginsCore.PhaseSuccess, phase.Phase()) - }) - t.Run("test UNDEFINED Status", func(t *testing.T) { taskContext := new(webapiPlugin.StatusContext) taskContext.On("Resource").Return(&ResourceWrapper{