Skip to content

Commit

Permalink
Passthrough unique node ID in task execution ID for generating log te…
Browse files Browse the repository at this point in the history
…mplate vars

Signed-off-by: Jeev B <[email protected]>
  • Loading branch information
jeevb committed Nov 8, 2023
1 parent bec7bbb commit 24da09b
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 61 deletions.
5 changes: 3 additions & 2 deletions flyteplugins/go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
v1 "k8s.io/api/core/v1"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/flyteorg/flyte/flytestdlib/logger"
)
Expand All @@ -18,7 +19,7 @@ type logPlugin struct {
}

// Internal
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID *core.TaskExecutionIdentifier, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) {
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) {
if logPlugin == nil {
return nil, nil
}
Expand Down Expand Up @@ -53,7 +54,7 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas
PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339),
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
TaskExecutionIdentifier: taskExecID,
TaskExecutionID: taskExecID,
ExtraTemplateVarsByScheme: extraLogTemplateVarsByScheme,
},
)
Expand Down
4 changes: 4 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type TaskExecutionID interface {

// GetID returns the underlying idl task identifier.
GetID() core.TaskExecutionIdentifier

// GetUniqueNodeID returns the fully-qualified Node ID that is unique within a
// given workflow execution.
GetUniqueNodeID() string
}

// TaskExecutionMetadata represents any execution information for a Task. It is used to communicate meta information about the
Expand Down
3 changes: 2 additions & 1 deletion flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"regexp"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
)

//go:generate enumer --type=TemplateScheme --trimprefix=TemplateScheme -json -yaml
Expand Down Expand Up @@ -42,7 +43,7 @@ type Input struct {
PodUnixStartTime int64
PodUnixFinishTime int64
PodUID string
TaskExecutionIdentifier *core.TaskExecutionIdentifier
TaskExecutionID pluginsCore.TaskExecutionID
ExtraTemplateVarsByScheme *TemplateVarsByScheme
}

Expand Down
96 changes: 48 additions & 48 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,55 +114,55 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.Pod...)
}
case TemplateSchemeTaskExecution:
if input.TaskExecutionIdentifier != nil {
vars = append(vars, TemplateVar{
taskExecutionIdentifier := input.TaskExecutionID.GetID()
vars = append(
vars,
TemplateVar{
defaultRegexes.NodeID,
input.TaskExecutionID.GetUniqueNodeID(),
},
TemplateVar{
defaultRegexes.TaskRetryAttempt,
strconv.FormatUint(uint64(input.TaskExecutionIdentifier.RetryAttempt), 10),
})
if input.TaskExecutionIdentifier.TaskId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.TaskID,
input.TaskExecutionIdentifier.TaskId.Name,
},
TemplateVar{
defaultRegexes.TaskVersion,
input.TaskExecutionIdentifier.TaskId.Version,
},
TemplateVar{
defaultRegexes.TaskProject,
input.TaskExecutionIdentifier.TaskId.Project,
},
TemplateVar{
defaultRegexes.TaskDomain,
input.TaskExecutionIdentifier.TaskId.Domain,
},
)
}
if input.TaskExecutionIdentifier.NodeExecutionId != nil {
vars = append(vars, TemplateVar{
defaultRegexes.NodeID,
input.TaskExecutionIdentifier.NodeExecutionId.NodeId,
})
if input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.ExecutionName,
input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId.Name,
},
TemplateVar{
defaultRegexes.ExecutionProject,
input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId.Project,
},
TemplateVar{
defaultRegexes.ExecutionDomain,
input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId.Domain,
},
)
}
}
strconv.FormatUint(uint64(taskExecutionIdentifier.RetryAttempt), 10),
},
)
if taskExecutionIdentifier.TaskId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.TaskID,
taskExecutionIdentifier.TaskId.Name,
},
TemplateVar{
defaultRegexes.TaskVersion,
taskExecutionIdentifier.TaskId.Version,
},
TemplateVar{
defaultRegexes.TaskProject,
taskExecutionIdentifier.TaskId.Project,
},
TemplateVar{
defaultRegexes.TaskDomain,
taskExecutionIdentifier.TaskId.Domain,
},
)
}
if taskExecutionIdentifier.NodeExecutionId != nil && taskExecutionIdentifier.NodeExecutionId.ExecutionId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.ExecutionName,
taskExecutionIdentifier.NodeExecutionId.ExecutionId.Name,
},
TemplateVar{
defaultRegexes.ExecutionProject,
taskExecutionIdentifier.NodeExecutionId.ExecutionId.Project,
},
TemplateVar{
defaultRegexes.ExecutionDomain,
taskExecutionIdentifier.NodeExecutionId.ExecutionId.Domain,
},
)
}
if gotExtraTemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.TaskExecution...)
Expand Down
6 changes: 3 additions & 3 deletions flyteplugins/go/tasks/plugins/k8s/pod/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin
ReportedAt: &reportedAt,
}

taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID().GetID()
taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID()
if pod.Status.Phase != v1.PodPending && pod.Status.Phase != v1.PodUnknown {
taskLogs, err := logs.GetLogsForContainerInPod(ctx, logPlugin, &taskExecID, pod, 0, logSuffix, extraLogTemplateVarsByScheme)
taskLogs, err := logs.GetLogsForContainerInPod(ctx, logPlugin, taskExecID, pod, 0, logSuffix, extraLogTemplateVarsByScheme)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin
} else {
// if the primary container annotation exists, we use the status of the specified container
phaseInfo = flytek8s.DeterminePrimaryContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info)
if phaseInfo.Phase() == pluginsCore.PhasePermanentFailure && phaseInfo.Err() != nil &&
if phaseInfo.Phase() == pluginsCore.PhasePermanentFailure && phaseInfo.Err() != nil &&
phaseInfo.Err().GetCode() == flytek8s.PrimaryContainerNotFound {
// if the primary container status is not found ensure that the primary container exists.
// note: it should be impossible for the primary container to not exist at this point.
Expand Down
23 changes: 16 additions & 7 deletions flytepropeller/pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@ var (
const IDMaxLength = 50

type taskExecutionID struct {
execName string
id *core.TaskExecutionIdentifier
execName string
id *core.TaskExecutionIdentifier
uniqueNodeId string
}

func (te taskExecutionID) GetID() core.TaskExecutionIdentifier {
return *te.id
}

func (te taskExecutionID) GetUniqueNodeID() string {
return te.uniqueNodeId
}

func (te taskExecutionID) GetGeneratedName() string {
return te.execName
}
Expand Down Expand Up @@ -291,11 +296,15 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx interfaces.N
NodeExecutionContext: nCtx,
tm: taskExecutionMetadata{
NodeExecutionMetadata: nCtx.NodeExecutionMetadata(),
taskExecID: taskExecutionID{execName: uniqueID, id: id},
o: nCtx.Node(),
maxAttempts: maxAttempts,
platformResources: convertTaskResourcesToRequirements(nCtx.ExecutionContext().GetExecutionConfig().TaskResources),
environmentVariables: nCtx.ExecutionContext().GetExecutionConfig().EnvironmentVariables,
taskExecID: taskExecutionID{
execName: uniqueID,
id: id,
uniqueNodeId: currentNodeUniqueID,
},
o: nCtx.Node(),
maxAttempts: maxAttempts,
platformResources: convertTaskResourcesToRequirements(nCtx.ExecutionContext().GetExecutionConfig().TaskResources),
environmentVariables: nCtx.ExecutionContext().GetExecutionConfig().EnvironmentVariables,
},
rm: resourcemanager.GetTaskResourceManager(
t.resourceManager, resourceNamespacePrefix, id),
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestToTaskExecutionEvent(t *testing.T) {
generatedName := "generated_name"
tID.OnGetGeneratedName().Return(generatedName)
tID.OnGetID().Return(*id)
tID.OnGetUniqueNodeID("unique-node-id")

tMeta := &pluginMocks.TaskExecutionMetadata{}
tMeta.OnGetTaskExecutionID().Return(tID)
Expand Down Expand Up @@ -261,6 +262,7 @@ func TestToTaskExecutionEventWithParent(t *testing.T) {
generatedName := "generated_name"
tID.OnGetGeneratedName().Return(generatedName)
tID.OnGetID().Return(*id)
tID.OnGetUniqueNodeID("unique-node-id")

tMeta := &pluginMocks.TaskExecutionMetadata{}
tMeta.OnGetTaskExecutionID().Return(tID)
Expand Down

0 comments on commit 24da09b

Please sign in to comment.