Skip to content

Commit

Permalink
only support cache eviction for single task
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt committed Jan 8, 2024
1 parent 662355c commit 114a45e
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 1,257 deletions.
197 changes: 29 additions & 168 deletions flyteadmin/pkg/manager/impl/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package impl

import (
"context"
"strings"
"time"

"github.com/golang/protobuf/proto"
Expand All @@ -20,7 +19,6 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytestdlib/catalog/datacatalog"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
Expand All @@ -30,11 +28,7 @@ var (
_ interfaces.CacheInterface = &CacheManager{}
)

var sortByCreatedAtAsc = &admin.Sort{Key: shared.CreatedAt, Direction: admin.Sort_ASCENDING}

const (
nodeExecutionLimit = 10000
taskExecutionLimit = 10000
reservationHeartbeat = 10 * time.Second
)

Expand All @@ -59,39 +53,23 @@ func (m *CacheManager) EvictTaskExecutionCache(ctx context.Context, req service.
}

Check warning on line 53 in flyteadmin/pkg/manager/impl/cache_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/cache_manager.go#L51-L53

Added lines #L51 - L53 were not covered by tests

ctx = getTaskExecutionContext(ctx, req.TaskExecutionId)
var evictionErrors []*core.CacheEvictionError

// Sanity check to ensure referenced task execution exists although only encapsulated node execution is strictly required
_, err := util.GetTaskExecutionModel(ctx, m.db, req.TaskExecutionId)
taskExecutionModel, err := util.GetTaskExecutionModel(ctx, m.db, req.TaskExecutionId)
if err != nil {
logger.Debugf(ctx, "Failed to get task execution model for task execution ID %+v: %v", req.TaskExecutionId, err)
return nil, err
}

logger.Debugf(ctx, "Starting to evict cache for execution %+v of task %+v", req.TaskExecutionId, req.TaskExecutionId.TaskId)

nodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, req.TaskExecutionId.NodeExecutionId)
if err != nil {
logger.Debugf(ctx, "Failed to get node execution model for node execution ID %+v: %v", req.TaskExecutionId.NodeExecutionId, err)
return nil, err
}

evictionErrors := m.evictNodeExecutionCache(ctx, *nodeExecutionModel, nil)

logger.Debugf(ctx, "Finished evicting cache for execution %+v of task %+v", req.TaskExecutionId, req.TaskExecutionId.TaskId)
return &service.EvictCacheResponse{
Errors: &core.CacheEvictionErrorList{Errors: evictionErrors},
}, nil
}

func (m *CacheManager) evictNodeExecutionCache(ctx context.Context, nodeExecutionModel models.NodeExecution,
evictionErrors []*core.CacheEvictionError) []*core.CacheEvictionError {
if strings.HasSuffix(nodeExecutionModel.NodeID, v1alpha1.StartNodeID) || strings.HasSuffix(nodeExecutionModel.NodeID, v1alpha1.EndNodeID) {
return evictionErrors
}

nodeExecution, err := transformers.FromNodeExecutionModel(nodeExecutionModel, transformers.DefaultExecutionTransformerOptions)
nodeExecution, err := transformers.FromNodeExecutionModel(*nodeExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Warnf(ctx, "Failed to transform node execution model %+v: %v",
logger.Debugf(ctx, "Failed to transform node execution model %+v: %v",
nodeExecutionModel.NodeExecutionKey, err)
m.metrics.CacheEvictionFailures.Inc()
evictionErrors = append(evictionErrors, &core.CacheEvictionError{
Expand All @@ -106,83 +84,35 @@ func (m *CacheManager) evictNodeExecutionCache(ctx context.Context, nodeExecutio
Code: core.CacheEvictionError_INTERNAL,
Message: "Internal error",
})
return evictionErrors
}

logger.Debugf(ctx, "Starting to evict cache for node execution %+v", nodeExecution.Id)
t := m.metrics.CacheEvictionTime.Start()
defer t.Stop()

if nodeExecution.Metadata.IsParentNode {
var childNodeExecutions []models.NodeExecution
if nodeExecution.Metadata.IsDynamic {
var err error
childNodeExecutions, err = m.listAllNodeExecutionsForWorkflow(ctx, nodeExecution.Id.ExecutionId, nodeExecution.Id.NodeId)
if err != nil {
logger.Warnf(ctx, "Failed to list child node executions for dynamic node execution %+v: %v",
nodeExecution.Id, err)
m.metrics.CacheEvictionFailures.Inc()
evictionErrors = append(evictionErrors, &core.CacheEvictionError{
NodeExecutionId: nodeExecution.Id,
Source: &core.CacheEvictionError_WorkflowExecutionId{
WorkflowExecutionId: nodeExecution.Id.ExecutionId,
},
Code: core.CacheEvictionError_INTERNAL,
Message: "Failed to list child node executions",
})
return evictionErrors
}
} else {
childNodeExecutions = nodeExecutionModel.ChildNodeExecutions
}
for _, childNodeExecution := range childNodeExecutions {
evictionErrors = m.evictNodeExecutionCache(ctx, childNodeExecution, evictionErrors)
}
return &service.EvictCacheResponse{
Errors: &core.CacheEvictionErrorList{Errors: evictionErrors},
}, nil
}

Check warning on line 90 in flyteadmin/pkg/manager/impl/cache_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/cache_manager.go#L72-L90

Added lines #L72 - L90 were not covered by tests

taskExecutions, err := m.listAllTaskExecutions(ctx, nodeExecution.Id)
taskExecution, err := transformers.FromTaskExecutionModel(*taskExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Warnf(ctx, "Failed to list task executions for node execution %+v: %v",
nodeExecution.Id, err)
logger.Debugf(ctx, "Failed to transform task execution model %+v: %v",
taskExecutionModel.TaskExecutionKey, err)
m.metrics.CacheEvictionFailures.Inc()
evictionErrors = append(evictionErrors, &core.CacheEvictionError{
NodeExecutionId: nodeExecution.Id,
Source: &core.CacheEvictionError_WorkflowExecutionId{
WorkflowExecutionId: nodeExecution.Id.ExecutionId,
Source: &core.CacheEvictionError_TaskExecutionId{
TaskExecutionId: taskExecution.Id,
},
Code: core.CacheEvictionError_INTERNAL,
Message: "Failed to list task executions",
Message: "Internal error",
})
return evictionErrors
return &service.EvictCacheResponse{
Errors: &core.CacheEvictionErrorList{Errors: evictionErrors},
}, nil
}

Check warning on line 108 in flyteadmin/pkg/manager/impl/cache_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/cache_manager.go#L94-L108

Added lines #L94 - L108 were not covered by tests

switch md := nodeExecution.GetClosure().GetTargetMetadata().(type) {
case *admin.NodeExecutionClosure_TaskNodeMetadata:
for _, taskExecutionModel := range taskExecutions {
taskExecution, err := transformers.FromTaskExecutionModel(taskExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Warnf(ctx, "Failed to transform task execution model %+v: %v",
taskExecutionModel.TaskExecutionKey, err)
m.metrics.CacheEvictionFailures.Inc()
evictionErrors = append(evictionErrors, &core.CacheEvictionError{
NodeExecutionId: nodeExecution.Id,
Source: &core.CacheEvictionError_TaskExecutionId{
TaskExecutionId: taskExecution.Id,
},
Code: core.CacheEvictionError_INTERNAL,
Message: "Internal error",
})
return evictionErrors
}
logger.Debugf(ctx, "Starting to evict cache for execution %+v of task %+v", req.TaskExecutionId, req.TaskExecutionId.TaskId)

evictionErrors = m.evictTaskNodeExecutionCache(ctx, nodeExecutionModel, nodeExecution, taskExecution,
md.TaskNodeMetadata, evictionErrors)
}
case *admin.NodeExecutionClosure_WorkflowNodeMetadata:
evictionErrors = m.evictWorkflowNodeExecutionCache(ctx, nodeExecution, md.WorkflowNodeMetadata, evictionErrors)
default:
logger.Errorf(ctx, "Invalid target metadata type %T for node execution closure %+v for node execution %+v",
md, md, nodeExecution.Id)
metadata, ok := nodeExecution.GetClosure().GetTargetMetadata().(*admin.NodeExecutionClosure_TaskNodeMetadata)
if !ok {
logger.Debugf(ctx, "Node execution %+v did not contain task node metadata, skipping cache eviction",
nodeExecution.Id)
m.metrics.CacheEvictionFailures.Inc()
evictionErrors = append(evictionErrors, &core.CacheEvictionError{
NodeExecutionId: nodeExecution.Id,
Expand All @@ -192,9 +122,16 @@ func (m *CacheManager) evictNodeExecutionCache(ctx context.Context, nodeExecutio
Code: core.CacheEvictionError_INTERNAL,
Message: "Internal error",
})
return &service.EvictCacheResponse{
Errors: &core.CacheEvictionErrorList{Errors: evictionErrors},
}, nil
}

Check warning on line 128 in flyteadmin/pkg/manager/impl/cache_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/cache_manager.go#L114-L128

Added lines #L114 - L128 were not covered by tests

return evictionErrors
evictionErrors = m.evictTaskNodeExecutionCache(ctx, *nodeExecutionModel, nodeExecution, taskExecution, metadata.TaskNodeMetadata, nil)
logger.Debugf(ctx, "Finished evicting cache for execution %+v of task %+v", req.TaskExecutionId, req.TaskExecutionId.TaskId)
return &service.EvictCacheResponse{
Errors: &core.CacheEvictionErrorList{Errors: evictionErrors},
}, nil
}

func (m *CacheManager) evictTaskNodeExecutionCache(ctx context.Context, nodeExecutionModel models.NodeExecution,
Expand Down Expand Up @@ -333,82 +270,6 @@ func (m *CacheManager) evictTaskNodeExecutionCache(ctx context.Context, nodeExec
return evictionErrors
}

func (m *CacheManager) evictWorkflowNodeExecutionCache(ctx context.Context, nodeExecution *admin.NodeExecution,
workflowNodeMetadata *admin.WorkflowNodeMetadata, evictionErrors []*core.CacheEvictionError) []*core.CacheEvictionError {
if workflowNodeMetadata == nil {
logger.Debugf(ctx, "Node execution %+v did not contain cached data, skipping cache eviction",
nodeExecution.Id)
return evictionErrors
}

childNodeExecutions, err := m.listAllNodeExecutionsForWorkflow(ctx, workflowNodeMetadata.GetExecutionId(), "")
if err != nil {
logger.Debugf(ctx, "Failed to list child executions for node execution %+v of workflow %+v: %v",
nodeExecution.Id, workflowNodeMetadata.GetExecutionId(), err)
m.metrics.CacheEvictionFailures.Inc()
evictionErrors = append(evictionErrors, &core.CacheEvictionError{
NodeExecutionId: nodeExecution.Id,
Source: &core.CacheEvictionError_WorkflowExecutionId{
WorkflowExecutionId: workflowNodeMetadata.GetExecutionId(),
},
Code: core.CacheEvictionError_INTERNAL,
Message: "Failed to evict child executions for workflow",
})
return evictionErrors
}
for _, childNodeExecution := range childNodeExecutions {
evictionErrors = m.evictNodeExecutionCache(ctx, childNodeExecution, evictionErrors)
}

logger.Debugf(ctx, "Successfully evicted cache for workflow node execution %+v", nodeExecution.Id)
m.metrics.CacheEvictionSuccess.Inc()

return evictionErrors
}

func (m *CacheManager) listAllNodeExecutionsForWorkflow(ctx context.Context,
workflowExecutionID *core.WorkflowExecutionIdentifier, uniqueParentID string) ([]models.NodeExecution, error) {
var nodeExecutions []models.NodeExecution
var token string
for {
executions, newToken, err := util.ListNodeExecutionsForWorkflow(ctx, m.db, workflowExecutionID,
uniqueParentID, "", nodeExecutionLimit, token, sortByCreatedAtAsc)
if err != nil {
return nil, err
}

nodeExecutions = append(nodeExecutions, executions...)
if len(newToken) == 0 {
// empty token is returned once no more node executions are available
break
}
token = newToken
}

return nodeExecutions, nil
}

func (m *CacheManager) listAllTaskExecutions(ctx context.Context, nodeExecutionID *core.NodeExecutionIdentifier) ([]models.TaskExecution, error) {
var taskExecutions []models.TaskExecution
var token string
for {
executions, newToken, err := util.ListTaskExecutions(ctx, m.db, nodeExecutionID, "",
taskExecutionLimit, token, sortByCreatedAtAsc)
if err != nil {
return nil, err
}

taskExecutions = append(taskExecutions, executions...)
if len(newToken) == 0 {
// empty token is returned once no more task executions are available
break
}
token = newToken
}

return taskExecutions, nil
}

func NewCacheManager(db repoInterfaces.Repository, config runtimeInterfaces.Configuration, catalogClient catalog.Client,
scope promutils.Scope) interfaces.CacheInterface {
metrics := cacheMetrics{
Expand Down
Loading

0 comments on commit 114a45e

Please sign in to comment.