Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add Pending State and State Message in Webapi Agent #399

Closed
2 changes: 2 additions & 0 deletions go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase
taskInfo := &core.TaskInfo{}

switch resource.State {
case admin.State_PENDING:
return core.PhaseInfoQueuedWithTaskInfo(core.DefaultPhaseVersion, "Job is PENDING", taskInfo), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the message. This message does not convey any information to the user. Can the agent send this message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kumare3

func PhaseInfoInitializing(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {
pi := phaseInfo(PhaseInitializing, version, nil, info, false)
pi.reason = reason
return pi
}

Is this solution suitable?

case admin.State_RUNNING:
return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil
case admin.State_PERMANENT_FAILURE:
Expand Down
76 changes: 75 additions & 1 deletion go/tasks/plugins/webapi/agent/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"testing"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/config"

"google.golang.org/grpc"

pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
pluginCoreMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks"
webapiPlugin "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi/mocks"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
)
Expand All @@ -30,6 +31,7 @@ func TestPlugin(t *testing.T) {
metricScope: fakeSetupContext.MetricsScope(),
cfg: GetConfig(),
}

t.Run("get config", func(t *testing.T) {
err := SetConfig(&cfg)
assert.NoError(t, err)
Expand Down Expand Up @@ -99,4 +101,76 @@ func TestPlugin(t *testing.T) {
ctx, _ = getFinalContext(context.TODO(), "CreateTask", &Agent{Endpoint: "localhost:8080", Timeouts: map[string]config.Duration{"CreateTask": {Duration: 1 * time.Millisecond}}})
assert.NotEqual(t, context.TODO(), ctx)
})

t.Run("test PENDING Status", func(t *testing.T) {
taskContext := new(webapiPlugin.StatusContext)
taskContext.On("Resource").Return(&ResourceWrapper{
State: admin.State_PENDING,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, phase.Phase())
})

t.Run("test RUNNING Status", func(t *testing.T) {
taskContext := new(webapiPlugin.StatusContext)
taskContext.On("Resource").Return(&ResourceWrapper{
State: admin.State_RUNNING,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRunning, phase.Phase())
})

t.Run("test PERMANENT_FAILURE Status", func(t *testing.T) {
taskContext := new(webapiPlugin.StatusContext)
taskContext.On("Resource").Return(&ResourceWrapper{
State: admin.State_PERMANENT_FAILURE,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhasePermanentFailure, phase.Phase())
})

t.Run("test RETRYABLE_FAILURE Status", func(t *testing.T) {
taskContext := new(webapiPlugin.StatusContext)
taskContext.On("Resource").Return(&ResourceWrapper{
State: admin.State_RETRYABLE_FAILURE,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phase.Phase())
})

t.Run("test SUCCEEDED Status", func(t *testing.T) {
taskContext := new(webapiPlugin.StatusContext)
taskContext.On("Resource").Return(&ResourceWrapper{
State: admin.State_SUCCEEDED,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseSuccess, phase.Phase())
})

t.Run("test UNDEFINED Status", func(t *testing.T) {
taskContext := new(webapiPlugin.StatusContext)
taskContext.On("Resource").Return(&ResourceWrapper{
State: 5,
Outputs: nil,
})

phase, err := plugin.Status(context.Background(), taskContext)
assert.Error(t, err)
assert.Equal(t, pluginsCore.PhaseUndefined, phase.Phase())
})
}