Skip to content

Commit

Permalink
GetDynamicNodeWorkflow endpoint (#4689)
Browse files Browse the repository at this point in the history
* GetDynamicNodeWorkflow endpoint and tests

Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin authored Jan 24, 2024
1 parent 5e2224a commit 5e23add
Show file tree
Hide file tree
Showing 57 changed files with 20,661 additions and 3,429 deletions.
20 changes: 11 additions & 9 deletions cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@ import (
"context"
"net/http"
"os"

metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook"

_ "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
_ "gorm.io/driver/postgres" // Required to import database driver.
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics"

datacatalogConfig "github.com/flyteorg/flyte/datacatalog/pkg/config"
datacatalogRepo "github.com/flyteorg/flyte/datacatalog/pkg/repositories"
datacatalog "github.com/flyteorg/flyte/datacatalog/pkg/rpc/datacatalogservice"
Expand All @@ -29,14 +39,6 @@ import (
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyte/flytestdlib/storage"
_ "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
_ "gorm.io/driver/postgres" // Required to import database driver.
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

const defaultNamespace = "all"
Expand Down Expand Up @@ -121,7 +123,7 @@ func startPropeller(ctx context.Context, cfg Propeller) error {
SyncPeriod: &propellerCfg.DownstreamEval.Duration,
DefaultNamespaces: namespaceConfigs,
},
NewCache: executors.NewCache,
NewCache: executors.NewCache,
NewClient: executors.BuildNewClientFunc(propellerScope),
Metrics: metricsserver.Options{
// Disable metrics serving
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,4 @@ qualityOfService:
staging: MEDIUM
# by default production has an UNDEFINED tier when it is omitted from the configuration
namespace_mapping:
template: "{{ project }}-{{ domain }}" # Default namespace mapping template.
template: "{{ project }}-{{ domain }}" # Default namespace mapping template.
60 changes: 46 additions & 14 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,31 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi
return &admin.NodeExecutionEventResponse{}, nil
}

func (m *NodeExecutionManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
if err := validation.ValidateNodeExecutionIdentifier(request.Id); err != nil {
logger.Debugf(ctx, "can't get node execution data with invalid identifier [%+v]: %v", request.Id, err)
}

ctx = getNodeExecutionContext(ctx, request.Id)
nodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, request.Id)
if err != nil {
logger.Errorf(ctx, "failed to get node execution with id [%+v] with err %v",
request.Id, err)
return nil, err
}

if nodeExecutionModel.DynamicWorkflowRemoteClosureReference == "" {
return &admin.DynamicNodeWorkflowResponse{}, errors.NewFlyteAdminErrorf(codes.NotFound, "node does not contain dynamic workflow")
}

closure, err := m.fetchDynamicWorkflowClosure(ctx, nodeExecutionModel.DynamicWorkflowRemoteClosureReference)
if err != nil {
return nil, err
}

return &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: closure}, nil
}

// Handles making additional database calls, if necessary, to populate IsParent & IsDynamic data using the historical pattern of
// preloading child node executions. Otherwise, simply calls transform on the input model.
func (m *NodeExecutionManager) transformNodeExecutionModel(ctx context.Context, nodeExecutionModel models.NodeExecution,
Expand Down Expand Up @@ -516,23 +541,15 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
}

if len(nodeExecutionModel.DynamicWorkflowRemoteClosureReference) > 0 {
closure := &core.CompiledWorkflowClosure{}
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecutionModel.DynamicWorkflowRemoteClosureReference), closure)
closure, err := m.fetchDynamicWorkflowClosure(ctx, nodeExecutionModel.DynamicWorkflowRemoteClosureReference)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"Unable to read WorkflowClosure from location %s : %v", nodeExecutionModel.DynamicWorkflowRemoteClosureReference, err)
return nil, err
}

if wf := closure.Primary; wf == nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Empty primary workflow definition in loaded dynamic workflow model.")
} else if template := wf.Template; template == nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Empty primary workflow template in loaded dynamic workflow model.")
} else {
response.DynamicWorkflow = &admin.DynamicWorkflowNodeMetadata{
Id: closure.Primary.Template.Id,
CompiledWorkflow: closure,
DynamicJobSpecUri: nodeExecution.Closure.DynamicJobSpecUri,
}
response.DynamicWorkflow = &admin.DynamicWorkflowNodeMetadata{
Id: closure.Primary.Template.Id,
CompiledWorkflow: closure,
DynamicJobSpecUri: nodeExecution.Closure.DynamicJobSpecUri,
}
}

Expand All @@ -546,6 +563,21 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return response, nil
}

func (m *NodeExecutionManager) fetchDynamicWorkflowClosure(ctx context.Context, ref string) (*core.CompiledWorkflowClosure, error) {
closure := &core.CompiledWorkflowClosure{}
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(ref), closure)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Unable to read WorkflowClosure from location %s : %v", ref, err)
}

if wf := closure.Primary; wf == nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Empty primary workflow definition in loaded dynamic workflow model.")
} else if template := wf.Template; template == nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Empty primary workflow template in loaded dynamic workflow model.")
}
return closure, nil
}

func NewNodeExecutionManager(db repoInterfaces.Repository, config runtimeInterfaces.Configuration,
storagePrefix []string, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface,
eventPublisher notificationInterfaces.Publisher, cloudEventPublisher cloudeventInterfaces.Publisher,
Expand Down
147 changes: 147 additions & 0 deletions flyteadmin/pkg/manager/impl/node_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

eventWriterMocks "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
Expand Down Expand Up @@ -1319,3 +1321,148 @@ func TestGetNodeExecutionData(t *testing.T) {
},
}, dataResponse))
}

func Test_GetDynamicNodeWorkflow_Success(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
expectedClosure := testutils.GetWorkflowClosure().CompiledWorkflow
mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
assert.Equal(t, remoteClosureIdentifier, reference.String())
bytes, err := proto.Marshal(expectedClosure)
require.NoError(t, err)
return proto.Unmarshal(bytes, msg)
}
ctx := context.TODO()
nodeExecManager := NewNodeExecutionManager(repo,
getMockExecutionsConfigProvider(),
storagePrefix,
mockStorageClient,
mockScope.NewTestScope(),
mockNodeExecutionRemoteURL,
nil, nil,
&eventWriterMocks.NodeExecutionEventWriter{})
expected := &admin.DynamicNodeWorkflowResponse{
CompiledWorkflow: expectedClosure,
}

resp, err := nodeExecManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.NoError(t, err)
assert.True(t, proto.Equal(expected, resp))
}

func Test_GetDynamicNodeWorkflow_DBError(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
expectedErr := errors.New("failure")
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{}, expectedErr
})
mockStorageClient := commonMocks.GetMockStorageClient()
ctx := context.TODO()
nodeExecManager := NewNodeExecutionManager(repo,
getMockExecutionsConfigProvider(),
storagePrefix,
mockStorageClient,
mockScope.NewTestScope(),
mockNodeExecutionRemoteURL,
nil, nil,
&eventWriterMocks.NodeExecutionEventWriter{})

resp, err := nodeExecManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.Equal(t, expectedErr, err)
assert.Empty(t, resp)
}

func Test_GetDynamicNodeWorkflow_NoRemoteReference(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: ""}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
ctx := context.TODO()
nodeExecManager := NewNodeExecutionManager(repo,
getMockExecutionsConfigProvider(),
storagePrefix,
mockStorageClient,
mockScope.NewTestScope(),
mockNodeExecutionRemoteURL,
nil, nil,
&eventWriterMocks.NodeExecutionEventWriter{})

resp, err := nodeExecManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

st, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.NotFound, st.Code())
assert.Equal(t, "node does not contain dynamic workflow", st.Message())
assert.Empty(t, resp)
}

func Test_GetDynamicNodeWorkflow_StorageError(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
assert.Equal(t, remoteClosureIdentifier, reference.String())
return errors.New("failure")
}
ctx := context.TODO()
nodeExecManager := NewNodeExecutionManager(repo,
getMockExecutionsConfigProvider(),
storagePrefix,
mockStorageClient,
mockScope.NewTestScope(),
mockNodeExecutionRemoteURL,
nil, nil,
&eventWriterMocks.NodeExecutionEventWriter{})

resp, err := nodeExecManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

st, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.Internal, st.Code())
assert.Equal(t, "Unable to read WorkflowClosure from location s3://flyte/metadata/admin/remote closure id : failure", st.Message())
assert.Empty(t, resp)
}
1 change: 1 addition & 0 deletions flyteadmin/pkg/manager/interfaces/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ type NodeExecutionInterface interface {
ListNodeExecutionsForTask(ctx context.Context, request admin.NodeExecutionForTaskListRequest) (*admin.NodeExecutionList, error)
GetNodeExecutionData(
ctx context.Context, request admin.NodeExecutionGetDataRequest) (*admin.NodeExecutionGetDataResponse, error)
GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error)
}
4 changes: 4 additions & 0 deletions flyteadmin/pkg/manager/mocks/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ func (m *MockNodeExecutionManager) GetNodeExecutionData(
}
return nil, nil
}

func (m *MockNodeExecutionManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
return nil, nil
}
28 changes: 15 additions & 13 deletions flyteadmin/pkg/rpc/adminservice/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ type namedEntityEndpointMetrics struct {
type nodeExecutionEndpointMetrics struct {
scope promutils.Scope

createEvent util.RequestMetrics
get util.RequestMetrics
getData util.RequestMetrics
getMetrics util.RequestMetrics
list util.RequestMetrics
listChildren util.RequestMetrics
createEvent util.RequestMetrics
get util.RequestMetrics
getData util.RequestMetrics
getMetrics util.RequestMetrics
list util.RequestMetrics
listChildren util.RequestMetrics
getDynamicNodeWorkflow util.RequestMetrics
}

type projectEndpointMetrics struct {
Expand Down Expand Up @@ -161,13 +162,14 @@ func InitMetrics(adminScope promutils.Scope) AdminMetrics {
update: util.NewRequestMetrics(adminScope, "update_named_entity"),
},
nodeExecutionEndpointMetrics: nodeExecutionEndpointMetrics{
scope: adminScope,
createEvent: util.NewRequestMetrics(adminScope, "create_node_execution_event"),
get: util.NewRequestMetrics(adminScope, "get_node_execution"),
getData: util.NewRequestMetrics(adminScope, "get_node_execution_data"),
getMetrics: util.NewRequestMetrics(adminScope, "get_node_execution_metrics"),
list: util.NewRequestMetrics(adminScope, "list_node_execution"),
listChildren: util.NewRequestMetrics(adminScope, "list_children_node_executions"),
scope: adminScope,
createEvent: util.NewRequestMetrics(adminScope, "create_node_execution_event"),
get: util.NewRequestMetrics(adminScope, "get_node_execution"),
getData: util.NewRequestMetrics(adminScope, "get_node_execution_data"),
getMetrics: util.NewRequestMetrics(adminScope, "get_node_execution_metrics"),
list: util.NewRequestMetrics(adminScope, "list_node_execution"),
listChildren: util.NewRequestMetrics(adminScope, "list_children_node_executions"),
getDynamicNodeWorkflow: util.NewRequestMetrics(adminScope, "get_dynamic_node_workflow"),
},
projectEndpointMetrics: projectEndpointMetrics{
scope: adminScope,
Expand Down
18 changes: 18 additions & 0 deletions flyteadmin/pkg/rpc/adminservice/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,24 @@ func (m *AdminService) GetNodeExecution(
return response, nil
}

func (m *AdminService) GetDynamicNodeWorkflow(ctx context.Context, request *admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}

var response *admin.DynamicNodeWorkflowResponse
var err error
m.Metrics.nodeExecutionEndpointMetrics.getDynamicNodeWorkflow.Time(func() {
response, err = m.NodeExecutionManager.GetDynamicNodeWorkflow(ctx, *request)
})
if err != nil {
return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.get)
}
m.Metrics.nodeExecutionEndpointMetrics.getDynamicNodeWorkflow.Success()
return response, nil
}

func (m *AdminService) ListNodeExecutions(
ctx context.Context, request *admin.NodeExecutionListRequest) (*admin.NodeExecutionList, error) {
defer m.interceptPanic(ctx, request)
Expand Down
Loading

0 comments on commit 5e23add

Please sign in to comment.