Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin committed Dec 29, 2023
1 parent 81ce71e commit feec8f4
Show file tree
Hide file tree
Showing 53 changed files with 26,856 additions and 7,369 deletions.
12 changes: 6 additions & 6 deletions flyteadmin/flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ flyteadmin:
useOffloadedWorkflowClosure: false
database:
postgres:
port: 30001
port: 5432
username: postgres
password: postgres
password: ""
host: 127.0.0.1
dbname: flyte
dbname: flyteadmin
options: "sslmode=disable"
scheduler:
eventScheduler:
Expand Down Expand Up @@ -123,13 +123,13 @@ storage:
region: us-east-1
disable_ssl: true
v2_signing: true
endpoint: http://localhost:30002
endpoint: http://localhost:4566
auth_type: accesskey
access_key_id: minio
secret_key: miniostorage
signedUrl:
stowConfigOverride:
endpoint: http://localhost:30002
endpoint: http://localhost:4566
cache:
max_size_mbs: 10
target_gc_percent: 100
Expand Down Expand Up @@ -212,4 +212,4 @@ qualityOfService:
staging: MEDIUM
# by default production has an UNDEFINED tier when it is omitted from the configuration
namespace_mapping:
template: "{{ project }}-{{ domain }}" # Default namespace mapping template.
template: "{{ project }}-{{ domain }}" # Default namespace mapping template.
24 changes: 6 additions & 18 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,21 +463,17 @@ func (m *ExecutionManager) launchSingleTaskExecution(
// Prepare a skeleton workflow
taskIdentifier := request.Spec.LaunchPlan
workflowModel, err :=
util.CreateOrGetWorkflowModel(ctx, request, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task)
util.CreateOrGetWorkflowModel(ctx, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task)
if err != nil {
logger.Debugf(ctx, "Failed to created skeleton workflow for [%+v] with err: %v", taskIdentifier, err)
return nil, nil, err
}
workflow, err := transformers.FromWorkflowModel(*workflowModel)
if err != nil {
return nil, nil, err
}
closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier)

workflow, err := util.TransformWorkflowWithClosure(ctx, m.storageClient, workflowModel)
if err != nil {
return nil, nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure

// Also prepare a skeleton launch plan.
launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier,
workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec)
Expand Down Expand Up @@ -1042,21 +1038,13 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

workflowModel, err := util.GetWorkflowModel(ctx, m.db, *launchPlan.Spec.WorkflowId)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
}
workflow, err := transformers.FromWorkflowModel(workflowModel)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
}
closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier)

workflow, err := util.TransformWorkflowWithClosure(ctx, m.storageClient, workflowModel)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure

name := util.GetExecutionName(request)
workflowExecutionID := core.WorkflowExecutionIdentifier{
Expand Down
165 changes: 152 additions & 13 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,24 +361,21 @@ func (m *NodeExecutionManager) GetNodeExecution(
return nodeExecution, nil
}

func (m *NodeExecutionManager) listNodeExecutions(
ctx context.Context, identifierFilters []common.InlineFilter,
requestFilters string, limit uint32, requestToken string, sortBy *admin.Sort, mapFilters []common.MapFilter) (
*admin.NodeExecutionList, error) {

func (m *NodeExecutionManager) listNodeExecutionsModels(ctx context.Context, identifierFilters []common.InlineFilter,
requestFilters string, limit uint32, requestToken string, sortBy *admin.Sort, mapFilters []common.MapFilter) (repoInterfaces.NodeExecutionCollectionOutput, error) {
filters, err := util.AddRequestFilters(requestFilters, common.NodeExecution, identifierFilters)
if err != nil {
return nil, err
return repoInterfaces.NodeExecutionCollectionOutput{}, err
}

sortParameter, err := common.NewSortParameter(sortBy, models.NodeExecutionColumns)
if err != nil {
return nil, err
return repoInterfaces.NodeExecutionCollectionOutput{}, err
}

offset, err := validation.ValidateToken(requestToken)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
return repoInterfaces.NodeExecutionCollectionOutput{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"invalid pagination token %s for ListNodeExecutions", requestToken)
}
listInput := repoInterfaces.ListResourceInput{
Expand All @@ -391,14 +388,30 @@ func (m *NodeExecutionManager) listNodeExecutions(
listInput.MapFilters = mapFilters
output, err := m.db.NodeExecutionRepo().List(ctx, listInput)
if err != nil {
logger.Debugf(ctx, "Failed to list node executions for request with err %v", err)
return nil, err
logger.Errorf(ctx, "failed to list node executions for request with err %v", err)
return repoInterfaces.NodeExecutionCollectionOutput{}, err
}

var token string
if len(output.NodeExecutions) == int(limit) {
token = strconv.Itoa(offset + len(output.NodeExecutions))
output.Token = strconv.Itoa(offset + len(output.NodeExecutions))
}
return output, nil
}

func (m *NodeExecutionManager) listNodeExecutions(
ctx context.Context,
identifierFilters []common.InlineFilter,
requestFilters string,
limit uint32,
requestToken string,
sortBy *admin.Sort,
mapFilters []common.MapFilter,
) (*admin.NodeExecutionList, error) {
output, err := m.listNodeExecutionsModels(ctx, identifierFilters, requestFilters, limit, requestToken, sortBy, mapFilters)
if err != nil {
return nil, err
}

nodeExecutionList, err := m.transformNodeExecutionModelList(ctx, output.NodeExecutions)
if err != nil {
logger.Debugf(ctx, "failed to transform node execution models for request with err: %v", err)
Expand All @@ -407,7 +420,7 @@ func (m *NodeExecutionManager) listNodeExecutions(

return &admin.NodeExecutionList{
NodeExecutions: nodeExecutionList,
Token: token,
Token: output.Token,
}, nil
}

Expand Down Expand Up @@ -546,6 +559,132 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return response, nil
}

func (m *NodeExecutionManager) GetWorkflowNodeExecutions(ctx context.Context, request admin.WorkflowNodeExecutionsGetRequest) (*admin.WorkflowNodeExecutionsGetResponse, error) {
err := validation.ValidateWorkflowExecutionIdentifier(request.GetExecutionId())
if err != nil {
logger.Debugf(ctx, "can't get node execution data with invalid identifier [%+v]: %v", request.GetExecutionId(), err)
}

ctx = getExecutionContext(ctx, request.ExecutionId)

identifierFilters, err := util.GetWorkflowExecutionIdentifierFilters(ctx, *request.ExecutionId)
if err != nil {
return nil, err
}
var mapFilters []common.MapFilter
var parentNodeExecution *models.NodeExecution
if request.ParentNodeId != "" {
logger.Debugf(ctx, "fetching node execution for requested parent node id '%s'", request.ParentNodeId)
parentNodeExecution, err = util.GetNodeExecutionModel(ctx, m.db, &core.NodeExecutionIdentifier{
ExecutionId: request.ExecutionId,
NodeId: request.ParentNodeId,
})
if err != nil {
logger.Errorf(ctx, "failed to fetch node execution for requested parent node id '%s': %v",
request.ParentNodeId, err)
return nil, err
}
parentIDFilter, err := common.NewSingleValueFilter(
common.NodeExecution, common.Equal, shared.ParentID, parentNodeExecution.ID)
if err != nil {
return nil, err
}
identifierFilters = append(identifierFilters, parentIDFilter)
} else {
mapFilters = []common.MapFilter{isParent}
}

logger.Debugf(ctx, "fetching node executions")
output, err := m.listNodeExecutionsModels(ctx,
identifierFilters,
"",
10000,
"",
nil,
mapFilters)
if err != nil {
logger.Errorf(ctx, "failed to fetch node executions: %v", err)
return &admin.WorkflowNodeExecutionsGetResponse{}, err
}
if len(output.NodeExecutions) == 0 {
logger.Debugf(ctx, "fetch empty node executions")
return &admin.WorkflowNodeExecutionsGetResponse{}, nil
}

var workflowClosure *core.CompiledWorkflowClosure

if request.DynamicNodeId != "" {
// client should not send both dynamic_node_id and workflow_name+version pair
if request.WorkflowName != "" {
return nil, shared.GetInvalidArgumentError("workflow_name")
}
if request.WorkflowVersion != "" {
return nil, shared.GetMissingArgumentError("workflow_version")
}

nodeExecResource := repoInterfaces.NodeExecutionResource{
NodeExecutionIdentifier: core.NodeExecutionIdentifier{
NodeId: request.DynamicNodeId,
ExecutionId: request.ExecutionId,
},
}
nodeExecModel, err := m.db.NodeExecutionRepo().Get(ctx, nodeExecResource)
if err != nil {
logger.Errorf(ctx, "failed to fetch dynamic node execution: %v", err)
return nil, err
}
remoteReference := nodeExecModel.DynamicWorkflowRemoteClosureReference
if remoteReference != "" {
logger.Debugf(ctx, "fetching dynamic workflow closure from node")

err = m.storageClient.ReadProtobuf(ctx, storage.DataReference(remoteReference), workflowClosure)
if err != nil {
logger.Errorf(ctx, "failed to fetch dynamic workflow closure from node: %v", err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"unable to read WorkflowClosure from location %s : %v", remoteReference, err)
}
} else {
logger.Warnf(ctx, "empty dynamic workflow reference in dynamic node")
}
} else {
if request.WorkflowName == "" {
return nil, shared.GetMissingArgumentError("workflow_name")
}
if request.WorkflowVersion == "" {
return nil, shared.GetMissingArgumentError("workflow_version")
}

workflowID := core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: request.ExecutionId.Project,
Domain: request.ExecutionId.Domain,
Name: request.WorkflowName,
Version: request.WorkflowVersion,
}
workflow, err := util.GetWorkflow(ctx, m.db, m.storageClient, workflowID)
if err != nil {
logger.Errorf(ctx, "failed to fetch workflow: %v", err)
return nil, err
}

workflowClosure = workflow.GetClosure().GetCompiledWorkflow()
}

// TODO return workflow closure sub-tree only for request.ParentNodeId.
// If its not provided, return 1st level nodes only

nodeExecutionList, err := m.transformNodeExecutionModelList(ctx, output.NodeExecutions)
if err != nil {
logger.Debugf(ctx, "failed to transform node execution models for request with err: %v", err)
return nil, err
}

return &admin.WorkflowNodeExecutionsGetResponse{
Closure: workflowClosure,
NodeExecutions: nodeExecutionList,
}, nil
}

func NewNodeExecutionManager(db repoInterfaces.Repository, config runtimeInterfaces.Configuration,
storagePrefix []string, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface,
eventPublisher notificationInterfaces.Publisher, cloudEventPublisher cloudeventInterfaces.Publisher,
Expand Down
28 changes: 18 additions & 10 deletions flyteadmin/pkg/manager/impl/util/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,15 @@ func FetchAndGetWorkflowClosure(ctx context.Context,
return closure, nil
}

func GetWorkflow(
ctx context.Context,
repo repoInterfaces.Repository,
func TransformWorkflowWithClosure(ctx context.Context,
store *storage.DataStore,
identifier core.Identifier) (*admin.Workflow, error) {
workflowModel, err := GetWorkflowModel(ctx, repo, identifier)
model models.Workflow,
) (*admin.Workflow, error) {
workflow, err := transformers.FromWorkflowModel(model)
if err != nil {
return nil, err
}
workflow, err := transformers.FromWorkflowModel(workflowModel)
if err != nil {
return nil, err
}
closure, err := FetchAndGetWorkflowClosure(ctx, store, workflowModel.RemoteClosureIdentifier)
closure, err := FetchAndGetWorkflowClosure(ctx, store, model.RemoteClosureIdentifier)
if err != nil {
return nil, err
}
Expand All @@ -92,6 +87,19 @@ func GetWorkflow(
return &workflow, nil
}

func GetWorkflow(
ctx context.Context,
repo repoInterfaces.Repository,
store *storage.DataStore,
identifier core.Identifier,
) (*admin.Workflow, error) {
workflowModel, err := GetWorkflowModel(ctx, repo, identifier)
if err != nil {
return nil, err
}
return TransformWorkflowWithClosure(ctx, store, workflowModel)
}

func GetLaunchPlanModel(
ctx context.Context, repo repoInterfaces.Repository, identifier core.Identifier) (models.LaunchPlan, error) {
launchPlanModel, err := (repo).LaunchPlanRepo().Get(ctx, repoInterfaces.Identifier{
Expand Down
14 changes: 7 additions & 7 deletions flyteadmin/pkg/manager/impl/util/single_task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func generateBindings(outputs core.VariableMap, nodeID string) []*core.Binding {
}

func CreateOrGetWorkflowModel(
ctx context.Context, request admin.ExecutionCreateRequest, db repositoryInterfaces.Repository,
ctx context.Context, db repositoryInterfaces.Repository,
workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, taskIdentifier *core.Identifier,
task *admin.Task) (*models.Workflow, error) {
task *admin.Task) (models.Workflow, error) {
workflowIdentifier := core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: taskIdentifier.Project,
Expand All @@ -87,7 +87,7 @@ func CreateOrGetWorkflowModel(

if err != nil {
if ferr, ok := err.(errors.FlyteAdminError); !ok || ferr.Code() != codes.NotFound {
return nil, err
return models.Workflow{}, err
}
// If we got this far, there is no existing workflow. Create a skeleton one now.
workflowSpec := admin.WorkflowSpec{
Expand Down Expand Up @@ -124,7 +124,7 @@ func CreateOrGetWorkflowModel(
// In the case of race conditions, if the workflow already exists we can safely ignore the corresponding
// error.
if ferr, ok := err.(errors.FlyteAdminError); !ok || ferr.Code() != codes.AlreadyExists {
return nil, err
return models.Workflow{}, err
}
}
// Now, set the newly created skeleton workflow to 'SYSTEM_GENERATED'.
Expand All @@ -139,7 +139,7 @@ func CreateOrGetWorkflowModel(
})
if err != nil {
logger.Warningf(ctx, "Failed to set skeleton workflow state to system-generated: %v", err)
return nil, err
return models.Workflow{}, err
}
workflowModel, err = db.WorkflowRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: workflowIdentifier.Project,
Expand All @@ -150,11 +150,11 @@ func CreateOrGetWorkflowModel(
if err != nil {
// This is unexpected - at this point we've successfully just created the skeleton workflow.
logger.Warningf(ctx, "Failed to fetch newly created workflow model from db store: %v", err)
return nil, err
return models.Workflow{}, err
}
}

return &workflowModel, nil
return workflowModel, nil
}

func CreateOrGetLaunchPlan(ctx context.Context,
Expand Down
Loading

0 comments on commit feec8f4

Please sign in to comment.