Skip to content

Commit

Permalink
integration test + make generate
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin committed Jan 20, 2024
1 parent 64ac9b4 commit 4f86c5f
Show file tree
Hide file tree
Showing 16 changed files with 1,422 additions and 862 deletions.
5 changes: 3 additions & 2 deletions flyteadmin/tests/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/testutils"
"github.com/flyteorg/flyte/flyteidl/clients/go/coreutils"
Expand Down Expand Up @@ -48,7 +49,7 @@ func insertTasksForTests(t *testing.T, client service.AdminServiceClient) {
}

_, err := client.CreateTask(ctx, &req)
assert.Nil(t, err)
require.NoError(t, err)
}
}
}
Expand Down Expand Up @@ -105,7 +106,7 @@ func insertWorkflowsForTests(t *testing.T, client service.AdminServiceClient) {
}

_, err := client.CreateWorkflow(ctx, &req)
assert.Nil(t, err, "Failed to create workflow test data with err %v", err)
require.NoError(t, err, "Failed to create workflow test data with err %v", err)
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions flyteadmin/tests/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
ptypesStruct "github.com/golang/protobuf/ptypes/struct"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/testutils"
Expand Down Expand Up @@ -51,7 +52,7 @@ func createTaskAndNodeExecution(
Id: taskIdentifier,
Spec: testutils.GetValidTaskRequest().Spec,
})
assert.Nil(t, err)
require.NoError(t, err)

_, err = client.CreateNodeEvent(ctx, &admin.NodeExecutionEventRequest{
RequestId: "request id",
Expand All @@ -64,7 +65,7 @@ func createTaskAndNodeExecution(
OccurredAt: occurredAtProto,
},
})
assert.Nil(t, err)
require.NoError(t, err)
}

func TestCreateTaskExecution(t *testing.T) {
Expand Down
122 changes: 122 additions & 0 deletions flyteadmin/tests/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@ package tests
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/testutils"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
)

func TestCreateWorkflow(t *testing.T) {
Expand Down Expand Up @@ -81,6 +86,123 @@ func TestGetWorkflows(t *testing.T) {
t.Run("TestListWorkflow_FiltersHTTP", testListWorkflow_FiltersHTTP)
}

func TestGetDynamicNodeWorkflow(t *testing.T) {
ctx := context.Background()
truncateAllTablesForTestingOnly()
populateWorkflowExecutionForTestingOnly(project, domain, name)
client, conn := GetTestAdminServiceClient()
defer conn.Close()

occurredAt := time.Now()
occurredAtProto := timestamppb.New(occurredAt)
childOccurredAt := occurredAt.Add(time.Minute)
childOccurredAtProto := timestamppb.New(childOccurredAt)

createTaskAndNodeExecution(ctx, t, client, conn, occurredAtProto)

_, err := client.CreateTaskEvent(ctx, &admin.TaskExecutionEventRequest{
RequestId: "request id",
Event: &event.TaskExecutionEvent{
TaskId: taskIdentifier,
ParentNodeExecutionId: nodeExecutionId,
Phase: core.TaskExecution_RUNNING,
RetryAttempt: 1,
OccurredAt: occurredAtProto,
},
})
require.NoError(t, err)

wfIdentifier := core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: "admintests",
Domain: "development",
Name: "name",
Version: "version",
}
wfClosure := &core.CompiledWorkflowClosure{
Primary: &core.CompiledWorkflow{
Template: &core.WorkflowTemplate{
Id: &wfIdentifier,
Interface: &core.TypedInterface{},
Nodes: []*core.Node{
{
Id: "I'm a node",
Target: &core.Node_TaskNode{
TaskNode: &core.TaskNode{
Reference: &core.TaskNode_ReferenceId{
ReferenceId: taskIdentifier,
},
},
},
},
},
},
},
}
childNodeExecutionID := &core.NodeExecutionIdentifier{
NodeId: "child_node",
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
_, err = client.CreateNodeEvent(ctx, &admin.NodeExecutionEventRequest{
RequestId: "request id",
Event: &event.NodeExecutionEvent{
Id: childNodeExecutionID,
Phase: core.NodeExecution_RUNNING,
InputValue: &event.NodeExecutionEvent_InputUri{
InputUri: inputURI,
},
OccurredAt: childOccurredAtProto,
ParentTaskMetadata: &event.ParentTaskExecutionMetadata{
Id: taskExecutionIdentifier,
},
IsDynamic: true,
IsParent: true,
TargetMetadata: &event.NodeExecutionEvent_TaskNodeMetadata{
TaskNodeMetadata: &event.TaskNodeMetadata{
DynamicWorkflow: &event.DynamicWorkflowNodeMetadata{
Id: &wfIdentifier,
CompiledWorkflow: wfClosure,
DynamicJobSpecUri: "s3://bla-bla",
},
},
},
},
})
require.NoError(t, err)

t.Run("TestGetDynamicNodeWorkflowGrpc", func(t *testing.T) {
resp, err := client.GetDynamicNodeWorkflow(ctx, &admin.GetDynamicNodeWorkflowRequest{
Id: childNodeExecutionID,
})

assert.NoError(t, err)
assert.True(t, proto.Equal(wfClosure, resp.GetCompiledWorkflow()))
})

t.Run("TestGetDynamicNodeWorkflowHttp", func(t *testing.T) {
url := fmt.Sprintf("%s/api/v1/dynamic_node_workflow/project/domain/execution%%20name/child_node", GetTestHostEndpoint())
getRequest, err := http.NewRequest(http.MethodGet, url, nil)
require.NoError(t, err)
addHTTPRequestHeaders(getRequest)

httpClient := &http.Client{}
resp, err := httpClient.Do(getRequest)
require.NoError(t, err)
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode, "unexpected resp: %s", string(body))
wfResp := &admin.DynamicNodeWorkflowResponse{}
require.NoError(t, proto.Unmarshal(body, wfResp))
assert.True(t, proto.Equal(wfClosure, wfResp.GetCompiledWorkflow()))
})
}

func testGetWorkflowGrpc(t *testing.T) {
ctx := context.Background()
client, conn := GetTestAdminServiceClient()
Expand Down
Loading

0 comments on commit 4f86c5f

Please sign in to comment.