Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get Workflow Node Executions endpoint wip #4641

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions flyteadmin/flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ flyteadmin:
useOffloadedWorkflowClosure: false
database:
postgres:
port: 30001
port: 5432
username: postgres
password: postgres
password: ""
host: 127.0.0.1
dbname: flyte
dbname: flyteadmin
options: "sslmode=disable"
scheduler:
eventScheduler:
Expand Down Expand Up @@ -123,13 +123,13 @@ storage:
region: us-east-1
disable_ssl: true
v2_signing: true
endpoint: http://localhost:30002
endpoint: http://localhost:4566
auth_type: accesskey
access_key_id: minio
secret_key: miniostorage
signedUrl:
stowConfigOverride:
endpoint: http://localhost:30002
endpoint: http://localhost:4566
cache:
max_size_mbs: 10
target_gc_percent: 100
Expand Down Expand Up @@ -212,4 +212,4 @@ qualityOfService:
staging: MEDIUM
# by default production has an UNDEFINED tier when it is omitted from the configuration
namespace_mapping:
template: "{{ project }}-{{ domain }}" # Default namespace mapping template.
template: "{{ project }}-{{ domain }}" # Default namespace mapping template.
24 changes: 6 additions & 18 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,21 +463,17 @@ func (m *ExecutionManager) launchSingleTaskExecution(
// Prepare a skeleton workflow
taskIdentifier := request.Spec.LaunchPlan
workflowModel, err :=
util.CreateOrGetWorkflowModel(ctx, request, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task)
util.CreateOrGetWorkflowModel(ctx, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task)
if err != nil {
logger.Debugf(ctx, "Failed to created skeleton workflow for [%+v] with err: %v", taskIdentifier, err)
return nil, nil, err
}
workflow, err := transformers.FromWorkflowModel(*workflowModel)
if err != nil {
return nil, nil, err
}
closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier)

workflow, err := util.TransformWorkflowWithClosure(ctx, m.storageClient, workflowModel)
if err != nil {
return nil, nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure

// Also prepare a skeleton launch plan.
launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier,
workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec)
Expand Down Expand Up @@ -1042,21 +1038,13 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

workflowModel, err := util.GetWorkflowModel(ctx, m.db, *launchPlan.Spec.WorkflowId)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
}
workflow, err := transformers.FromWorkflowModel(workflowModel)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
}
closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier)

workflow, err := util.TransformWorkflowWithClosure(ctx, m.storageClient, workflowModel)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure

name := util.GetExecutionName(request)
workflowExecutionID := core.WorkflowExecutionIdentifier{
Expand Down
170 changes: 157 additions & 13 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,24 +361,21 @@ func (m *NodeExecutionManager) GetNodeExecution(
return nodeExecution, nil
}

func (m *NodeExecutionManager) listNodeExecutions(
ctx context.Context, identifierFilters []common.InlineFilter,
requestFilters string, limit uint32, requestToken string, sortBy *admin.Sort, mapFilters []common.MapFilter) (
*admin.NodeExecutionList, error) {

func (m *NodeExecutionManager) listNodeExecutionsModels(ctx context.Context, identifierFilters []common.InlineFilter,
requestFilters string, limit uint32, requestToken string, sortBy *admin.Sort, mapFilters []common.MapFilter) (repoInterfaces.NodeExecutionCollectionOutput, error) {
filters, err := util.AddRequestFilters(requestFilters, common.NodeExecution, identifierFilters)
if err != nil {
return nil, err
return repoInterfaces.NodeExecutionCollectionOutput{}, err
}

sortParameter, err := common.NewSortParameter(sortBy, models.NodeExecutionColumns)
if err != nil {
return nil, err
return repoInterfaces.NodeExecutionCollectionOutput{}, err
}

offset, err := validation.ValidateToken(requestToken)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
return repoInterfaces.NodeExecutionCollectionOutput{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"invalid pagination token %s for ListNodeExecutions", requestToken)
}
listInput := repoInterfaces.ListResourceInput{
Expand All @@ -391,14 +388,30 @@ func (m *NodeExecutionManager) listNodeExecutions(
listInput.MapFilters = mapFilters
output, err := m.db.NodeExecutionRepo().List(ctx, listInput)
if err != nil {
logger.Debugf(ctx, "Failed to list node executions for request with err %v", err)
return nil, err
logger.Errorf(ctx, "failed to list node executions for request with err %v", err)
return repoInterfaces.NodeExecutionCollectionOutput{}, err
}

var token string
if len(output.NodeExecutions) == int(limit) {
token = strconv.Itoa(offset + len(output.NodeExecutions))
output.Token = strconv.Itoa(offset + len(output.NodeExecutions))
}
return output, nil
}

func (m *NodeExecutionManager) listNodeExecutions(
ctx context.Context,
identifierFilters []common.InlineFilter,
requestFilters string,
limit uint32,
requestToken string,
sortBy *admin.Sort,
mapFilters []common.MapFilter,
) (*admin.NodeExecutionList, error) {
output, err := m.listNodeExecutionsModels(ctx, identifierFilters, requestFilters, limit, requestToken, sortBy, mapFilters)
if err != nil {
return nil, err
}

nodeExecutionList, err := m.transformNodeExecutionModelList(ctx, output.NodeExecutions)
if err != nil {
logger.Debugf(ctx, "failed to transform node execution models for request with err: %v", err)
Expand All @@ -407,7 +420,7 @@ func (m *NodeExecutionManager) listNodeExecutions(

return &admin.NodeExecutionList{
NodeExecutions: nodeExecutionList,
Token: token,
Token: output.Token,
}, nil
}

Expand Down Expand Up @@ -546,6 +559,137 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return response, nil
}

func (m *NodeExecutionManager) GetWorkflowNodeExecutions(ctx context.Context, request admin.WorkflowNodeExecutionsGetRequest) (*admin.WorkflowNodeExecutionsGetResponse, error) {
err := validation.ValidateWorkflowExecutionIdentifier(request.GetExecutionId())
if err != nil {
logger.Debugf(ctx, "can't get node execution data with invalid identifier [%+v]: %v", request.GetExecutionId(), err)
}

ctx = getExecutionContext(ctx, request.ExecutionId)

identifierFilters, err := util.GetWorkflowExecutionIdentifierFilters(ctx, *request.ExecutionId)
if err != nil {
return nil, err
}
var mapFilters []common.MapFilter
var parentNodeExecution *models.NodeExecution
parentNodeID := request.ParentNodeId
if parentNodeID == "" {
parentNodeID = request.DynamicNodeId
}
if parentNodeID != "" {
logger.Debugf(ctx, "fetching node execution for requested parent node id '%s'", parentNodeID)
parentNodeExecution, err = util.GetNodeExecutionModel(ctx, m.db, &core.NodeExecutionIdentifier{
ExecutionId: request.ExecutionId,
NodeId: parentNodeID,
})
if err != nil {
logger.Errorf(ctx, "failed to fetch node execution for requested parent node id '%s': %v", parentNodeID, err)
return nil, err
}
parentIDFilter, err := common.NewSingleValueFilter(
common.NodeExecution, common.Equal, shared.ParentID, parentNodeExecution.ID)
if err != nil {
return nil, err
}
identifierFilters = append(identifierFilters, parentIDFilter)
} else {
mapFilters = []common.MapFilter{isParent}
}

logger.Debugf(ctx, "fetching node executions")
output, err := m.listNodeExecutionsModels(ctx,
identifierFilters,
"",
10000,
"",
nil,
mapFilters)
if err != nil {
logger.Errorf(ctx, "failed to fetch node executions: %v", err)
return &admin.WorkflowNodeExecutionsGetResponse{}, err
}
if len(output.NodeExecutions) == 0 {
logger.Debugf(ctx, "fetch empty node executions")
return &admin.WorkflowNodeExecutionsGetResponse{}, nil
}

var workflowClosure *core.CompiledWorkflowClosure

if request.DynamicNodeId != "" {
// client should not send both dynamic_node_id and workflow_name+version pair
if request.WorkflowName != "" {
return nil, shared.GetInvalidArgumentError("workflow_name")
}
if request.WorkflowVersion != "" {
return nil, shared.GetMissingArgumentError("workflow_version")
}

nodeExecResource := repoInterfaces.NodeExecutionResource{
NodeExecutionIdentifier: core.NodeExecutionIdentifier{
NodeId: request.DynamicNodeId,
ExecutionId: request.ExecutionId,
},
}
nodeExecModel, err := m.db.NodeExecutionRepo().Get(ctx, nodeExecResource)
if err != nil {
logger.Errorf(ctx, "failed to fetch dynamic node execution: %v", err)
return nil, err
}
remoteReference := nodeExecModel.DynamicWorkflowRemoteClosureReference
if remoteReference != "" {
logger.Debugf(ctx, "fetching dynamic workflow closure from node")

closure := &core.CompiledWorkflowClosure{}
err = m.storageClient.ReadProtobuf(ctx, storage.DataReference(remoteReference), closure)
if err != nil {
logger.Errorf(ctx, "failed to fetch dynamic workflow closure from node: %v", err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"unable to read WorkflowClosure from location %s : %v", remoteReference, err)
}
workflowClosure = closure
} else {
logger.Warnf(ctx, "empty dynamic workflow reference in dynamic node")
}
} else {
if request.WorkflowName == "" {
return nil, shared.GetMissingArgumentError("workflow_name")
}
if request.WorkflowVersion == "" {
return nil, shared.GetMissingArgumentError("workflow_version")
}

workflowID := core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: request.ExecutionId.Project,
Domain: request.ExecutionId.Domain,
Name: request.WorkflowName,
Version: request.WorkflowVersion,
}
workflow, err := util.GetWorkflow(ctx, m.db, m.storageClient, workflowID)
if err != nil {
logger.Errorf(ctx, "failed to fetch workflow: %v", err)
return nil, err
}

workflowClosure = workflow.GetClosure().GetCompiledWorkflow()
}

// TODO return workflow closure sub-tree only for request.ParentNodeId.
// If its not provided, return 1st level nodes only

nodeExecutionList, err := m.transformNodeExecutionModelList(ctx, output.NodeExecutions)
if err != nil {
logger.Debugf(ctx, "failed to transform node execution models for request with err: %v", err)
return nil, err
}

return &admin.WorkflowNodeExecutionsGetResponse{
Closure: workflowClosure,
NodeExecutions: nodeExecutionList,
}, nil
}

func NewNodeExecutionManager(db repoInterfaces.Repository, config runtimeInterfaces.Configuration,
storagePrefix []string, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface,
eventPublisher notificationInterfaces.Publisher, cloudEventPublisher cloudeventInterfaces.Publisher,
Expand Down
28 changes: 18 additions & 10 deletions flyteadmin/pkg/manager/impl/util/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,15 @@ func FetchAndGetWorkflowClosure(ctx context.Context,
return closure, nil
}

func GetWorkflow(
ctx context.Context,
repo repoInterfaces.Repository,
func TransformWorkflowWithClosure(ctx context.Context,
store *storage.DataStore,
identifier core.Identifier) (*admin.Workflow, error) {
workflowModel, err := GetWorkflowModel(ctx, repo, identifier)
model models.Workflow,
) (*admin.Workflow, error) {
workflow, err := transformers.FromWorkflowModel(model)
if err != nil {
return nil, err
}
workflow, err := transformers.FromWorkflowModel(workflowModel)
if err != nil {
return nil, err
}
closure, err := FetchAndGetWorkflowClosure(ctx, store, workflowModel.RemoteClosureIdentifier)
closure, err := FetchAndGetWorkflowClosure(ctx, store, model.RemoteClosureIdentifier)
if err != nil {
return nil, err
}
Expand All @@ -92,6 +87,19 @@ func GetWorkflow(
return &workflow, nil
}

func GetWorkflow(
ctx context.Context,
repo repoInterfaces.Repository,
store *storage.DataStore,
identifier core.Identifier,
) (*admin.Workflow, error) {
workflowModel, err := GetWorkflowModel(ctx, repo, identifier)
if err != nil {
return nil, err
}
return TransformWorkflowWithClosure(ctx, store, workflowModel)
}

func GetLaunchPlanModel(
ctx context.Context, repo repoInterfaces.Repository, identifier core.Identifier) (models.LaunchPlan, error) {
launchPlanModel, err := (repo).LaunchPlanRepo().Get(ctx, repoInterfaces.Identifier{
Expand Down
14 changes: 7 additions & 7 deletions flyteadmin/pkg/manager/impl/util/single_task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func generateBindings(outputs core.VariableMap, nodeID string) []*core.Binding {
}

func CreateOrGetWorkflowModel(
ctx context.Context, request admin.ExecutionCreateRequest, db repositoryInterfaces.Repository,
ctx context.Context, db repositoryInterfaces.Repository,
workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, taskIdentifier *core.Identifier,
task *admin.Task) (*models.Workflow, error) {
task *admin.Task) (models.Workflow, error) {
workflowIdentifier := core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: taskIdentifier.Project,
Expand All @@ -87,7 +87,7 @@ func CreateOrGetWorkflowModel(

if err != nil {
if ferr, ok := err.(errors.FlyteAdminError); !ok || ferr.Code() != codes.NotFound {
return nil, err
return models.Workflow{}, err
}
// If we got this far, there is no existing workflow. Create a skeleton one now.
workflowSpec := admin.WorkflowSpec{
Expand Down Expand Up @@ -124,7 +124,7 @@ func CreateOrGetWorkflowModel(
// In the case of race conditions, if the workflow already exists we can safely ignore the corresponding
// error.
if ferr, ok := err.(errors.FlyteAdminError); !ok || ferr.Code() != codes.AlreadyExists {
return nil, err
return models.Workflow{}, err
}
}
// Now, set the newly created skeleton workflow to 'SYSTEM_GENERATED'.
Expand All @@ -139,7 +139,7 @@ func CreateOrGetWorkflowModel(
})
if err != nil {
logger.Warningf(ctx, "Failed to set skeleton workflow state to system-generated: %v", err)
return nil, err
return models.Workflow{}, err
}
workflowModel, err = db.WorkflowRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: workflowIdentifier.Project,
Expand All @@ -150,11 +150,11 @@ func CreateOrGetWorkflowModel(
if err != nil {
// This is unexpected - at this point we've successfully just created the skeleton workflow.
logger.Warningf(ctx, "Failed to fetch newly created workflow model from db store: %v", err)
return nil, err
return models.Workflow{}, err
}
}

return &workflowModel, nil
return workflowModel, nil
}

func CreateOrGetLaunchPlan(ctx context.Context,
Expand Down
Loading
Loading