Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin committed Dec 27, 2023
1 parent 81ce71e commit ccb3171
Show file tree
Hide file tree
Showing 51 changed files with 24,394 additions and 6,249 deletions.
12 changes: 6 additions & 6 deletions flyteadmin/flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ flyteadmin:
useOffloadedWorkflowClosure: false
database:
postgres:
port: 30001
port: 5432
username: postgres
password: postgres
password: ""
host: 127.0.0.1
dbname: flyte
dbname: flyteadmin
options: "sslmode=disable"
scheduler:
eventScheduler:
Expand Down Expand Up @@ -123,13 +123,13 @@ storage:
region: us-east-1
disable_ssl: true
v2_signing: true
endpoint: http://localhost:30002
endpoint: http://localhost:4566
auth_type: accesskey
access_key_id: minio
secret_key: miniostorage
signedUrl:
stowConfigOverride:
endpoint: http://localhost:30002
endpoint: http://localhost:4566
cache:
max_size_mbs: 10
target_gc_percent: 100
Expand Down Expand Up @@ -212,4 +212,4 @@ qualityOfService:
staging: MEDIUM
# by default production has an UNDEFINED tier when it is omitted from the configuration
namespace_mapping:
template: "{{ project }}-{{ domain }}" # Default namespace mapping template.
template: "{{ project }}-{{ domain }}" # Default namespace mapping template.
24 changes: 6 additions & 18 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,21 +463,17 @@ func (m *ExecutionManager) launchSingleTaskExecution(
// Prepare a skeleton workflow
taskIdentifier := request.Spec.LaunchPlan
workflowModel, err :=
util.CreateOrGetWorkflowModel(ctx, request, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task)
util.CreateOrGetWorkflowModel(ctx, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task)
if err != nil {
logger.Debugf(ctx, "Failed to created skeleton workflow for [%+v] with err: %v", taskIdentifier, err)
return nil, nil, err
}
workflow, err := transformers.FromWorkflowModel(*workflowModel)
if err != nil {
return nil, nil, err
}
closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier)

workflow, err := util.TransformWorkflowWithClosure(ctx, m.storageClient, workflowModel)
if err != nil {
return nil, nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure

// Also prepare a skeleton launch plan.
launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier,
workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec)
Expand Down Expand Up @@ -1042,21 +1038,13 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

workflowModel, err := util.GetWorkflowModel(ctx, m.db, *launchPlan.Spec.WorkflowId)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
}
workflow, err := transformers.FromWorkflowModel(workflowModel)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
}
closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier)

workflow, err := util.TransformWorkflowWithClosure(ctx, m.storageClient, workflowModel)
if err != nil {
logger.Debugf(ctx, "Failed to get workflow with id %+v with err %v", launchPlan.Spec.WorkflowId, err)
return nil, nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure

name := util.GetExecutionName(request)
workflowExecutionID := core.WorkflowExecutionIdentifier{
Expand Down
152 changes: 139 additions & 13 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,24 +361,21 @@ func (m *NodeExecutionManager) GetNodeExecution(
return nodeExecution, nil
}

func (m *NodeExecutionManager) listNodeExecutions(
ctx context.Context, identifierFilters []common.InlineFilter,
requestFilters string, limit uint32, requestToken string, sortBy *admin.Sort, mapFilters []common.MapFilter) (
*admin.NodeExecutionList, error) {

func (m *NodeExecutionManager) listNodeExecutionsModels(ctx context.Context, identifierFilters []common.InlineFilter,
requestFilters string, limit uint32, requestToken string, sortBy *admin.Sort, mapFilters []common.MapFilter) (repoInterfaces.NodeExecutionCollectionOutput, error) {
filters, err := util.AddRequestFilters(requestFilters, common.NodeExecution, identifierFilters)
if err != nil {
return nil, err
return repoInterfaces.NodeExecutionCollectionOutput{}, err
}

sortParameter, err := common.NewSortParameter(sortBy, models.NodeExecutionColumns)
if err != nil {
return nil, err
return repoInterfaces.NodeExecutionCollectionOutput{}, err
}

offset, err := validation.ValidateToken(requestToken)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
return repoInterfaces.NodeExecutionCollectionOutput{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"invalid pagination token %s for ListNodeExecutions", requestToken)
}
listInput := repoInterfaces.ListResourceInput{
Expand All @@ -391,14 +388,30 @@ func (m *NodeExecutionManager) listNodeExecutions(
listInput.MapFilters = mapFilters
output, err := m.db.NodeExecutionRepo().List(ctx, listInput)
if err != nil {
logger.Debugf(ctx, "Failed to list node executions for request with err %v", err)
return nil, err
logger.Errorf(ctx, "failed to list node executions for request with err %v", err)
return repoInterfaces.NodeExecutionCollectionOutput{}, err
}

var token string
if len(output.NodeExecutions) == int(limit) {
token = strconv.Itoa(offset + len(output.NodeExecutions))
output.Token = strconv.Itoa(offset + len(output.NodeExecutions))
}
return output, nil
}

func (m *NodeExecutionManager) listNodeExecutions(
ctx context.Context,
identifierFilters []common.InlineFilter,
requestFilters string,
limit uint32,
requestToken string,
sortBy *admin.Sort,
mapFilters []common.MapFilter,
) (*admin.NodeExecutionList, error) {
output, err := m.listNodeExecutionsModels(ctx, identifierFilters, requestFilters, limit, requestToken, sortBy, mapFilters)
if err != nil {
return nil, err
}

nodeExecutionList, err := m.transformNodeExecutionModelList(ctx, output.NodeExecutions)
if err != nil {
logger.Debugf(ctx, "failed to transform node execution models for request with err: %v", err)
Expand All @@ -407,7 +420,7 @@ func (m *NodeExecutionManager) listNodeExecutions(

return &admin.NodeExecutionList{
NodeExecutions: nodeExecutionList,
Token: token,
Token: output.Token,
}, nil
}

Expand Down Expand Up @@ -546,6 +559,119 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return response, nil
}

func (m *NodeExecutionManager) GetWorkflowNodeExecutions(ctx context.Context, request admin.WorkflowNodeExecutionsGetRequest) (*admin.WorkflowNodeExecutionsGetResponse, error) {
err := validation.ValidateWorkflowExecutionIdentifier(request.GetExecutionId())
if err != nil {
logger.Debugf(ctx, "can't get node execution data with invalid identifier [%+v]: %v", request.GetExecutionId(), err)
}

ctx = getExecutionContext(ctx, request.ExecutionId)

identifierFilters, err := util.GetWorkflowExecutionIdentifierFilters(ctx, *request.ExecutionId)
if err != nil {
return nil, err
}
var mapFilters []common.MapFilter
var parentNodeExecution *models.NodeExecution
if request.ParentNodeId != "" {
logger.Debugf(ctx, "fetching node execution for requested parent node id '%s'", request.ParentNodeId)
parentNodeExecution, err = util.GetNodeExecutionModel(ctx, m.db, &core.NodeExecutionIdentifier{
ExecutionId: request.ExecutionId,
NodeId: request.ParentNodeId,
})
if err != nil {
logger.Errorf(ctx, "failed to fetch node execution for requested parent node id '%s': %v",
request.ParentNodeId, err)
return nil, err
}
parentIDFilter, err := common.NewSingleValueFilter(
common.NodeExecution, common.Equal, shared.ParentID, parentNodeExecution.ID)
if err != nil {
return nil, err
}
identifierFilters = append(identifierFilters, parentIDFilter)
} else {
mapFilters = []common.MapFilter{
isParent,
}
}

logger.Debugf(ctx, "fetching node executions")
output, err := m.listNodeExecutionsModels(
ctx, identifierFilters,
"",
10000,
"",
nil,
mapFilters)
if err != nil {
logger.Errorf(ctx, "failed to fetch node executions: %v", err)
return &admin.WorkflowNodeExecutionsGetResponse{}, err
}
if len(output.NodeExecutions) == 0 {
logger.Debugf(ctx, "fetch empty node executions")
return &admin.WorkflowNodeExecutionsGetResponse{}, nil
}

// all node executions should have same or no ParentID so let's just pick the first one
childNodeExecution := output.NodeExecutions[0]
dynamicParentID := childNodeExecution.ParentID
var workflowClosure *admin.WorkflowClosure
if dynamicParentID == nil {
logger.Debugf(ctx, "first node execution has no dynamic parent id")
executionModel := childNodeExecution.LaunchedExecution
workflow, err := util.GetWorkflowByID(ctx, m.db, m.storageClient, executionModel.WorkflowID)
if err != nil {
logger.Errorf(ctx, "failed to fetch workflow by node execution workflow id '%s': %v",
executionModel.WorkflowID, err)
return nil, err
}
workflowClosure = workflow.Closure
} else {
logger.Debugf(ctx, "first node execution has dynamic parent id")
// fetch dynamic workflow
remoteReference := ""
if parentNodeExecution != nil {
remoteReference = parentNodeExecution.DynamicWorkflowRemoteClosureReference
requestedID := (*parentNodeExecution).ID
if *dynamicParentID != requestedID {
logger.Debugf(ctx, "dynamic parent id '%d' is different from requested id '%d', fetching dynamic parent node execution",
*dynamicParentID, requestedID)
parentModel, err := m.db.NodeExecutionRepo().GetByID(ctx, *dynamicParentID)
if err != nil {
return nil, err
}
remoteReference = parentModel.DynamicWorkflowRemoteClosureReference
} else {
logger.Debugf(ctx, "dynamic parent id '%d' equals requested id '%d'", *dynamicParentID, requestedID)
}
} else {
logger.Warnf(ctx, "parent node execution model is nil")
}
if remoteReference != "" {
logger.Debugf(ctx, "fetching dynamic workflow closure")
workflowClosure, err = util.FetchAndGetWorkflowClosure(ctx, m.storageClient, remoteReference)
if err != nil {
logger.Errorf(ctx, "failed to fetch dynamic workflow closure: %v", err)
return nil, err
}
} else {
logger.Warnf(ctx, "empty dynamic workflow reference")
}
}

nodeExecutionList, err := m.transformNodeExecutionModelList(ctx, output.NodeExecutions)
if err != nil {
logger.Debugf(ctx, "failed to transform node execution models for request with err: %v", err)
return nil, err
}

return &admin.WorkflowNodeExecutionsGetResponse{
Closure: workflowClosure,
NodeExecutions: nodeExecutionList,
}, nil
}

func NewNodeExecutionManager(db repoInterfaces.Repository, config runtimeInterfaces.Configuration,
storagePrefix []string, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface,
eventPublisher notificationInterfaces.Publisher, cloudEventPublisher cloudeventInterfaces.Publisher,
Expand Down
39 changes: 30 additions & 9 deletions flyteadmin/pkg/manager/impl/util/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,47 @@ func FetchAndGetWorkflowClosure(ctx context.Context,
return closure, nil
}

func TransformWorkflowWithClosure(ctx context.Context,
store *storage.DataStore,
model models.Workflow,
) (*admin.Workflow, error) {
workflow, err := transformers.FromWorkflowModel(model)
if err != nil {
return nil, err
}
closure, err := FetchAndGetWorkflowClosure(ctx, store, model.RemoteClosureIdentifier)
if err != nil {
return nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure
return &workflow, nil
}

func GetWorkflow(
ctx context.Context,
repo repoInterfaces.Repository,
store *storage.DataStore,
identifier core.Identifier) (*admin.Workflow, error) {
identifier core.Identifier,
) (*admin.Workflow, error) {
workflowModel, err := GetWorkflowModel(ctx, repo, identifier)
if err != nil {
return nil, err
}
workflow, err := transformers.FromWorkflowModel(workflowModel)
if err != nil {
return nil, err
}
closure, err := FetchAndGetWorkflowClosure(ctx, store, workflowModel.RemoteClosureIdentifier)
return TransformWorkflowWithClosure(ctx, store, workflowModel)
}

func GetWorkflowByID(
ctx context.Context,
repo repoInterfaces.Repository,
store *storage.DataStore,
id uint,
) (*admin.Workflow, error) {
workflowModel, err := repo.WorkflowRepo().GetByID(ctx, id)
if err != nil {
return nil, err
}
closure.CreatedAt = workflow.Closure.CreatedAt
workflow.Closure = closure
return &workflow, nil
return TransformWorkflowWithClosure(ctx, store, workflowModel)
}

func GetLaunchPlanModel(
Expand Down
14 changes: 7 additions & 7 deletions flyteadmin/pkg/manager/impl/util/single_task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func generateBindings(outputs core.VariableMap, nodeID string) []*core.Binding {
}

func CreateOrGetWorkflowModel(
ctx context.Context, request admin.ExecutionCreateRequest, db repositoryInterfaces.Repository,
ctx context.Context, db repositoryInterfaces.Repository,
workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, taskIdentifier *core.Identifier,
task *admin.Task) (*models.Workflow, error) {
task *admin.Task) (models.Workflow, error) {
workflowIdentifier := core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: taskIdentifier.Project,
Expand All @@ -87,7 +87,7 @@ func CreateOrGetWorkflowModel(

if err != nil {
if ferr, ok := err.(errors.FlyteAdminError); !ok || ferr.Code() != codes.NotFound {
return nil, err
return models.Workflow{}, err
}
// If we got this far, there is no existing workflow. Create a skeleton one now.
workflowSpec := admin.WorkflowSpec{
Expand Down Expand Up @@ -124,7 +124,7 @@ func CreateOrGetWorkflowModel(
// In the case of race conditions, if the workflow already exists we can safely ignore the corresponding
// error.
if ferr, ok := err.(errors.FlyteAdminError); !ok || ferr.Code() != codes.AlreadyExists {
return nil, err
return models.Workflow{}, err
}
}
// Now, set the newly created skeleton workflow to 'SYSTEM_GENERATED'.
Expand All @@ -139,7 +139,7 @@ func CreateOrGetWorkflowModel(
})
if err != nil {
logger.Warningf(ctx, "Failed to set skeleton workflow state to system-generated: %v", err)
return nil, err
return models.Workflow{}, err
}
workflowModel, err = db.WorkflowRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: workflowIdentifier.Project,
Expand All @@ -150,11 +150,11 @@ func CreateOrGetWorkflowModel(
if err != nil {
// This is unexpected - at this point we've successfully just created the skeleton workflow.
logger.Warningf(ctx, "Failed to fetch newly created workflow model from db store: %v", err)
return nil, err
return models.Workflow{}, err
}
}

return &workflowModel, nil
return workflowModel, nil
}

func CreateOrGetLaunchPlan(ctx context.Context,
Expand Down
Loading

0 comments on commit ccb3171

Please sign in to comment.