Skip to content

Commit

Permalink
Override ArrayNode log links with map plugin
Browse files Browse the repository at this point in the history
This PR adds a configuration option to override ArrayNode log links with those defined in the map plugin. The map plugin contains it's own configuration for log links, which may differ from those defined on the PodPlugin. ArrayNode, executing subNodes as regular tasks (ie. using the PodPlugin) means that it uses the default PodPlugin log templates.

Signed-off-by: Andrew Dye <[email protected]>
  • Loading branch information
hamersaw authored and andrewwdye committed Sep 25, 2024
1 parent 289d450 commit c19b179
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 9 deletions.
10 changes: 5 additions & 5 deletions flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (s SubTaskExecutionID) GetLogSuffix() string {
return fmt.Sprintf(" #%d-%d-%d", s.taskRetryAttempt, s.executionIndex, s.subtaskRetryAttempt)
}

var logTemplateRegexes = struct {
var LogTemplateRegexes = struct {
ExecutionIndex *regexp.Regexp
ParentName *regexp.Regexp
RetryAttempt *regexp.Regexp
Expand All @@ -189,17 +189,17 @@ var logTemplateRegexes = struct {

func (s SubTaskExecutionID) TemplateVarsByScheme() []tasklog.TemplateVar {
return []tasklog.TemplateVar{
{Regex: logTemplateRegexes.ParentName, Value: s.parentName},
{Regex: LogTemplateRegexes.ParentName, Value: s.parentName},
{
Regex: logTemplateRegexes.ExecutionIndex,
Regex: LogTemplateRegexes.ExecutionIndex,
Value: strconv.FormatUint(uint64(s.executionIndex), 10),
},
{
Regex: logTemplateRegexes.RetryAttempt,
Regex: LogTemplateRegexes.RetryAttempt,
Value: strconv.FormatUint(s.subtaskRetryAttempt, 10),
},
{
Regex: logTemplateRegexes.ParentRetryAttempt,
Regex: LogTemplateRegexes.ParentRetryAttempt,
Value: strconv.FormatUint(uint64(s.taskRetryAttempt), 10),
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func TestSubTaskExecutionContext(t *testing.T) {
assert.Equal(t, storage.DataReference("/raw_prefix/5/1"), stCtx.OutputWriter().GetRawOutputPrefix())
assert.Equal(t,
[]tasklog.TemplateVar{
{Regex: logTemplateRegexes.ParentName, Value: "notfound"},
{Regex: logTemplateRegexes.ExecutionIndex, Value: "0"},
{Regex: logTemplateRegexes.RetryAttempt, Value: "1"},
{Regex: logTemplateRegexes.ParentRetryAttempt, Value: "0"},
{Regex: LogTemplateRegexes.ParentName, Value: "notfound"},
{Regex: LogTemplateRegexes.ExecutionIndex, Value: "0"},
{Regex: LogTemplateRegexes.RetryAttempt, Value: "1"},
{Regex: LogTemplateRegexes.ParentRetryAttempt, Value: "0"},
},
stCtx.TaskExecutionMetadata().GetTaskExecutionID().(SubTaskExecutionID).TemplateVarsByScheme(),
)
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ const (
type ArrayNodeConfig struct {
EventVersion int `json:"event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"`
DefaultParallelismBehavior ParallelismBehavior `json:"default-parallelism-behavior" pflag:",Default parallelism behavior for array nodes"`
UseMapPluginLogs bool `json:"use-map-plugin-logs" pflag:",Override subNode log links with those configured for the map plugin logs"`
}

// GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object.
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

127 changes: 127 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,44 @@ import (

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
pluginscore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/encoding"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
mapplugin "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/array/k8s"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/pod"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

type taskExecutionID struct {
pluginscore.TaskExecutionID

generatedName string
id *idlcore.TaskExecutionIdentifier
nodeID string
}

func (t *taskExecutionID) GetGeneratedName() string {
return t.generatedName
}

func (t *taskExecutionID) GetID() idlcore.TaskExecutionIdentifier {
return *t.id
}

func (t *taskExecutionID) GetGeneratedNameWith(minLength, maxLength int) (string, error) {
return "", nil

Check warning on line 44 in flytepropeller/pkg/controller/nodes/array/event_recorder.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/event_recorder.go#L43-L44

Added lines #L43 - L44 were not covered by tests
}

func (t *taskExecutionID) GetUniqueNodeID() string {
return t.nodeID
}

type arrayEventRecorder interface {
interfaces.EventRecorder
process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error
Expand Down Expand Up @@ -83,7 +113,25 @@ func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx inter
})
}

var mapLogPlugin tasklog.Plugin
if config.GetConfig().ArrayNode.UseMapPluginLogs {
mapLogPlugin, err = logs.InitializeLogPlugins(&mapplugin.GetConfig().LogConfig.Config)
if err != nil {
logger.Warnf(ctx, "failed to initialize log plugin with error:%v", err)

Check warning on line 120 in flytepropeller/pkg/controller/nodes/array/event_recorder.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/event_recorder.go#L118-L120

Added lines #L118 - L120 were not covered by tests
}
}

for _, taskExecutionEvent := range e.taskEvents {
if mapLogPlugin != nil && len(taskExecutionEvent.Logs) > 0 {
// override log links for subNode execution with map plugin
logs, err := getPluginLogs(mapLogPlugin, nCtx, index, retryAttempt)
if err != nil {
logger.Warnf(ctx, "failed to compute logs for ArrayNode:%s index:%d retryAttempt:%d with error:%v", nCtx.NodeID(), index, retryAttempt, err)
} else {
taskExecutionEvent.Logs = logs

Check warning on line 131 in flytepropeller/pkg/controller/nodes/array/event_recorder.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/event_recorder.go#L127-L131

Added lines #L127 - L131 were not covered by tests
}
}

for _, log := range taskExecutionEvent.Logs {
log.Name = fmt.Sprintf("%s-%d", log.Name, index)
}
Expand Down Expand Up @@ -213,6 +261,85 @@ func newArrayEventRecorder(eventRecorder interfaces.EventRecorder) arrayEventRec
}
}

func getPluginLogs(logPlugin tasklog.Plugin, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) ([]*idlcore.TaskLog, error) {
subNodeSpec := nCtx.Node().GetArrayNode().GetSubNodeSpec()

// retrieve taskTemplate from subNode
taskID := subNodeSpec.GetTaskID()
executableTask, err := nCtx.ExecutionContext().GetTask(*taskID)
if err != nil {
return nil, err

Check warning on line 271 in flytepropeller/pkg/controller/nodes/array/event_recorder.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/event_recorder.go#L271

Added line #L271 was not covered by tests
}

taskTemplate := executableTask.CoreTask()

// build TaskExecutionID
taskExecutionIdentifier := &idlcore.TaskExecutionIdentifier{
TaskId: taskTemplate.GetId(), // use taskID from subNodeSpec
RetryAttempt: nCtx.CurrentAttempt(),
NodeExecutionId: nCtx.NodeExecutionMetadata().GetNodeExecutionID(), // use node metadata from ArrayNode
}

nodeID := nCtx.NodeID()
if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 {
var err error
nodeID, err = common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID())
if err != nil {
return nil, err

Check warning on line 288 in flytepropeller/pkg/controller/nodes/array/event_recorder.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/event_recorder.go#L288

Added line #L288 was not covered by tests
}
}

length := task.IDMaxLength
if l := pod.DefaultPodPlugin.GetProperties().GeneratedNameMaxLength; l != nil {
length = *l

Check warning on line 294 in flytepropeller/pkg/controller/nodes/array/event_recorder.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/event_recorder.go#L294

Added line #L294 was not covered by tests
}

uniqueID, err := encoding.FixedLengthUniqueIDForParts(length, []string{nCtx.NodeExecutionMetadata().GetOwnerID().Name, nodeID, strconv.Itoa(int(nCtx.CurrentAttempt()))})
if err != nil {
return nil, err

Check warning on line 299 in flytepropeller/pkg/controller/nodes/array/event_recorder.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/event_recorder.go#L299

Added line #L299 was not covered by tests
}

taskExecutionId := &taskExecutionID{
generatedName: uniqueID,
id: taskExecutionIdentifier,
nodeID: nodeID,
}

// compute podName and containerName
stCtx := mapplugin.NewSubTaskExecutionID(taskExecutionId, index, uint64(retryAttempt))

podName := stCtx.GetGeneratedName()
containerName := stCtx.GetGeneratedName()

// initialize map plugin specific LogTemplateVars
extraLogTemplateVars := []tasklog.TemplateVar{
{
Regex: mapplugin.LogTemplateRegexes.ExecutionIndex,
Value: strconv.FormatUint(uint64(index), 10),
},
{
Regex: mapplugin.LogTemplateRegexes.RetryAttempt,
Value: strconv.FormatUint(uint64(retryAttempt), 10),
},
}

logs, err := logPlugin.GetTaskLogs(
tasklog.Input{
PodName: podName,
Namespace: nCtx.NodeExecutionMetadata().GetNamespace(),
ContainerName: containerName,
TaskExecutionID: taskExecutionId,
ExtraTemplateVars: extraLogTemplateVars,
TaskTemplate: taskTemplate,
},
)
if err != nil {
return nil, err

Check warning on line 337 in flytepropeller/pkg/controller/nodes/array/event_recorder.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/event_recorder.go#L337

Added line #L337 was not covered by tests
}

return logs.TaskLogs, nil
}

func sendEvents(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32, nodePhase idlcore.NodeExecution_Phase,
taskPhase idlcore.TaskExecution_Phase, eventRecorder interfaces.EventRecorder, eventConfig *config.EventConfig) error {

Expand Down
79 changes: 79 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/event_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,19 @@ package array

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
execmocks "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors/mocks"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces/mocks"
)

type bufferedEventRecorder struct {
Expand All @@ -25,3 +35,72 @@ func (b *bufferedEventRecorder) RecordNodeEvent(ctx context.Context, nodeExecuti
func newBufferedEventRecorder() *bufferedEventRecorder {
return &bufferedEventRecorder{}
}

func TestGetPluginLogs(t *testing.T) {
// intitialize log plugin

Check failure on line 40 in flytepropeller/pkg/controller/nodes/array/event_recorder_test.go

View workflow job for this annotation

GitHub Actions / Check for spelling errors

intitialize ==> initialize
logConfig := &logs.LogConfig{
Templates: []tasklog.TemplateLogPlugin{
tasklog.TemplateLogPlugin{
Name: "foo",
DisplayName: "bar",
TemplateURIs: []tasklog.TemplateURI{
"/console/projects/{{.executionProject}}/domains/{{.executionDomain}}/executions/{{.executionName}}/nodeId/{{.nodeID}}/taskId/{{.taskID}}/attempt/{{.taskRetryAttempt}}/mappedIndex/{{.subtaskExecutionIndex}}/mappedAttempt/{{.subtaskRetryAttempt}}/view/logs?duration=all",
},
},
},
}

mapLogPlugin, err := logs.InitializeLogPlugins(logConfig)
assert.Nil(t, err)

// create NodeExecutionContext
nCtx := &mocks.NodeExecutionContext{}
nCtx.OnCurrentAttempt().Return(uint32(0))

executionContext := &execmocks.ExecutionContext{}
executionContext.OnGetEventVersion().Return(1)
executionContext.OnGetParentInfo().Return(nil)
executionContext.OnGetTaskMatch(taskRef).Return(
&v1alpha1.TaskSpec{
TaskTemplate: &idlcore.TaskTemplate{
Id: &idlcore.Identifier{
ResourceType: idlcore.ResourceType_TASK,
Project: "task_project",
Domain: "task_domain",
Name: "task_name",
Version: "task_version",
},
},
},
nil,
)
nCtx.OnExecutionContext().Return(executionContext)

nCtx.OnNode().Return(&arrayNodeSpec)

nodeExecutionMetadata := &mocks.NodeExecutionMetadata{}
nodeExecutionMetadata.OnGetNamespace().Return("node_namespace")
nodeExecutionMetadata.OnGetNodeExecutionID().Return(&idlcore.NodeExecutionIdentifier{
NodeId: "node_id",
ExecutionId: &idlcore.WorkflowExecutionIdentifier{
Project: "node_project",
Domain: "node_domain",
Name: "node_name",
},
})
nodeExecutionMetadata.OnGetOwnerID().Return(types.NamespacedName{
Namespace: "wf_namespace",
Name: "wf_name",
})
nCtx.OnNodeExecutionMetadata().Return(nodeExecutionMetadata)

nCtx.OnNodeID().Return("foo")

// call `getPluginLogs`
logs, err := getPluginLogs(mapLogPlugin, nCtx, 1, 0)
assert.Nil(t, err)

assert.Equal(t, len(logConfig.Templates), len(logs))
assert.Equal(t, "bar", logs[0].Name)
assert.Equal(t, "/console/projects/node_project/domains/node_domain/executions/node_name/nodeId/foo/taskId/task_name/attempt/0/mappedIndex/1/mappedAttempt/0/view/logs?duration=all", logs[0].Uri)
}
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
}
}
}

if err := eventRecorder.process(ctx, nCtx, index, subNodeStatus.GetAttempts()); err != nil {
return handler.UnknownTransition, err
}
Expand Down
10 changes: 10 additions & 0 deletions flytepropeller/pkg/controller/workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
nodemocks "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces/mocks"
recoveryMocks "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/recovery/mocks"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
taskconfig "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/fakeplugins"
wfErrors "github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflow/errors"
execStats "github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflowstore"
Expand Down Expand Up @@ -227,6 +228,15 @@ func createTaskExecutorErrorInCheck(t assert.TestingT) pluginCore.PluginEntry {
func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) {
ctx := context.Background()
scope := testScope.NewSubScope("12")

taskConfig := taskconfig.GetConfig()
taskConfig.TaskPlugins.DefaultForTaskTypes = map[string]string{
"python-task": "pod",
"container": "pod",
"raw-container": "pod",
"sidecar": "pod",
}

store := createInmemoryDataStore(t, scope.NewSubScope("data_store"))
recorder := StdOutEventRecorder()
_, err := events.ConstructEventSink(ctx, &events.Config{Type: events.EventSinkLog}, scope.NewSubScope("event_sink"))
Expand Down

0 comments on commit c19b179

Please sign in to comment.