Skip to content

Commit

Permalink
Add pod template support for init containers (#5750)
Browse files Browse the repository at this point in the history
* Add pod template support for init containers

Signed-off-by: Jason Parraga <[email protected]>

* Update docs

Signed-off-by: Jason Parraga <[email protected]>

* fix comment

Signed-off-by: Jason Parraga <[email protected]>

---------

Signed-off-by: Jason Parraga <[email protected]>
  • Loading branch information
Sovietaced authored Oct 24, 2024
1 parent 761f122 commit ffd72a0
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 25 deletions.
3 changes: 3 additions & 0 deletions docs/deployment/configuration/general.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ as the base container configuration for all primary containers. If both containe
names exist in the default PodTemplate, Flyte first applies the default
configuration, followed by the primary configuration.

Note: Init containers can be configured with similar granularity using "default-init"
and "primary-init" init container names.

The ``containers`` field is required in each k8s PodSpec. If no default
configuration is desired, specifying a container with a name other than "default"
or "primary" (for example, "noop") is considered best practice. Since Flyte only
Expand Down
16 changes: 9 additions & 7 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,15 @@ func AddCoPilotToContainer(ctx context.Context, cfg config.FlyteCoPilotConfig, c
return nil
}

func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilotPod *v1.PodSpec, iFace *core.TypedInterface, taskExecMetadata core2.TaskExecutionMetadata, inputPaths io.InputFilePaths, outputPaths io.OutputFilePaths, pilot *core.DataLoadingConfig) error {
func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilotPod *v1.PodSpec, iFace *core.TypedInterface, taskExecMetadata core2.TaskExecutionMetadata, inputPaths io.InputFilePaths, outputPaths io.OutputFilePaths, pilot *core.DataLoadingConfig) (string, error) {
if pilot == nil || !pilot.Enabled {
return nil
return "", nil
}

logger.Infof(ctx, "CoPilot Enabled for task [%s]", taskExecMetadata.GetTaskExecutionID().GetID().TaskId.Name)
shareProcessNamespaceEnabled := true
coPilotPod.ShareProcessNamespace = &shareProcessNamespaceEnabled
primaryInitContainerName := ""
if iFace != nil {
if iFace.Inputs != nil && len(iFace.Inputs.Variables) > 0 {
inPath := cfg.DefaultInputDataPath
Expand All @@ -231,13 +232,14 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot
// Lets add the Inputs init container
args, err := DownloadCommandArgs(inputPaths.GetInputPath(), outputPaths.GetOutputPrefixPath(), inPath, format, iFace.Inputs)
if err != nil {
return err
return primaryInitContainerName, err
}
downloader, err := FlyteCoPilotContainer(flyteInitContainerName, cfg, args, inputsVolumeMount)
if err != nil {
return err
return primaryInitContainerName, err
}
coPilotPod.InitContainers = append(coPilotPod.InitContainers, downloader)
primaryInitContainerName = downloader.Name
}

if iFace.Outputs != nil && len(iFace.Outputs.Variables) > 0 {
Expand All @@ -260,15 +262,15 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot
// Lets add the Inputs init container
args, err := SidecarCommandArgs(outPath, outputPaths.GetOutputPrefixPath(), outputPaths.GetRawOutputPrefix(), cfg.StartTimeout.Duration, iFace)
if err != nil {
return err
return primaryInitContainerName, err
}
sidecar, err := FlyteCoPilotContainer(flyteSidecarContainerName, cfg, args, outputsVolumeMount)
if err != nil {
return err
return primaryInitContainerName, err
}
coPilotPod.Containers = append(coPilotPod.Containers, sidecar)
}
}

return nil
return primaryInitContainerName, nil
}
24 changes: 18 additions & 6 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/copilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,9 @@ func TestAddCoPilotToPod(t *testing.T) {
InputPath: "in",
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)
assert.NoError(t, err)
assert.Equal(t, "test-downloader", primaryInitContainerName)
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, iface, &pod)
})
Expand All @@ -545,7 +547,9 @@ func TestAddCoPilotToPod(t *testing.T) {
InputPath: "in",
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, nil, taskMetadata, inputPaths, opath, pilot))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, nil, taskMetadata, inputPaths, opath, pilot)
assert.NoError(t, err)
assert.Empty(t, primaryInitContainerName)
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, nil, &pod)
})
Expand All @@ -565,7 +569,9 @@ func TestAddCoPilotToPod(t *testing.T) {
InputPath: "in",
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)
assert.NoError(t, err)
assert.Equal(t, "test-downloader", primaryInitContainerName)
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, iface, &pod)
})
Expand All @@ -584,7 +590,9 @@ func TestAddCoPilotToPod(t *testing.T) {
InputPath: "in",
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)
assert.NoError(t, err)
assert.Empty(t, primaryInitContainerName)
assertPodHasSNPS(t, &pod)
assertPodHasCoPilot(t, cfg, pilot, iface, &pod)
})
Expand All @@ -603,11 +611,15 @@ func TestAddCoPilotToPod(t *testing.T) {
InputPath: "in",
OutputPath: "out",
}
assert.NoError(t, AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, &pod, iface, taskMetadata, inputPaths, opath, pilot)
assert.NoError(t, err)
assert.Empty(t, primaryInitContainerName)
assert.Len(t, pod.Volumes, 0)
})

t.Run("nil", func(t *testing.T) {
assert.NoError(t, AddCoPilotToPod(ctx, cfg, nil, nil, taskMetadata, inputPaths, opath, nil))
primaryInitContainerName, err := AddCoPilotToPod(ctx, cfg, nil, nil, taskMetadata, inputPaths, opath, nil)
assert.NoError(t, err)
assert.Empty(t, primaryInitContainerName)
})
}
64 changes: 58 additions & 6 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ const PrimaryContainerNotFound = "PrimaryContainerNotFound"
const SIGKILL = 137

const defaultContainerTemplateName = "default"
const defaultInitContainerTemplateName = "default-init"
const primaryContainerTemplateName = "primary"
const primaryInitContainerTemplateName = "primary-init"
const PrimaryContainerKey = "primary_container_name"

// AddRequiredNodeSelectorRequirements adds the provided v1.NodeSelectorRequirement
Expand Down Expand Up @@ -387,14 +389,17 @@ func ApplyFlytePodConfiguration(ctx context.Context, tCtx pluginsCore.TaskExecut
dataLoadingConfig = pod.GetDataConfig()
}

primaryInitContainerName := ""

if dataLoadingConfig != nil {
if err := AddCoPilotToContainer(ctx, config.GetK8sPluginConfig().CoPilot,
primaryContainer, taskTemplate.Interface, dataLoadingConfig); err != nil {
return nil, nil, err
}

if err := AddCoPilotToPod(ctx, config.GetK8sPluginConfig().CoPilot, podSpec, taskTemplate.GetInterface(),
tCtx.TaskExecutionMetadata(), tCtx.InputReader(), tCtx.OutputWriter(), dataLoadingConfig); err != nil {
primaryInitContainerName, err = AddCoPilotToPod(ctx, config.GetK8sPluginConfig().CoPilot, podSpec, taskTemplate.GetInterface(),
tCtx.TaskExecutionMetadata(), tCtx.InputReader(), tCtx.OutputWriter(), dataLoadingConfig)
if err != nil {
return nil, nil, err
}
}
Expand All @@ -406,7 +411,7 @@ func ApplyFlytePodConfiguration(ctx context.Context, tCtx pluginsCore.TaskExecut
}

// merge PodSpec and ObjectMeta with configuration pod template (if exists)
podSpec, objectMeta, err = MergeWithBasePodTemplate(ctx, tCtx, podSpec, objectMeta, primaryContainerName)
podSpec, objectMeta, err = MergeWithBasePodTemplate(ctx, tCtx, podSpec, objectMeta, primaryContainerName, primaryInitContainerName)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -495,7 +500,7 @@ func getBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutionConte
// MergeWithBasePodTemplate attempts to merge the provided PodSpec and ObjectMeta with the configuration PodTemplate for
// this task.
func MergeWithBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutionContext,
podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, primaryContainerName string) (*v1.PodSpec, *metav1.ObjectMeta, error) {
podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, primaryContainerName string, primaryInitContainerName string) (*v1.PodSpec, *metav1.ObjectMeta, error) {

// attempt to retrieve base PodTemplate
podTemplate, err := getBasePodTemplate(ctx, tCtx, DefaultPodTemplateStore)
Expand All @@ -507,7 +512,7 @@ func MergeWithBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutio
}

// merge podSpec with podTemplate
mergedPodSpec, err := mergePodSpecs(&podTemplate.Template.Spec, podSpec, primaryContainerName)
mergedPodSpec, err := mergePodSpecs(&podTemplate.Template.Spec, podSpec, primaryContainerName, primaryInitContainerName)
if err != nil {
return nil, nil, err
}
Expand All @@ -524,7 +529,7 @@ func MergeWithBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutio
// mergePodSpecs merges the two provided PodSpecs. This process uses the first as the base configuration, where values
// set by the first PodSpec are overwritten by the second in the return value. Additionally, this function applies
// container-level configuration from the basePodSpec.
func mergePodSpecs(basePodSpec *v1.PodSpec, podSpec *v1.PodSpec, primaryContainerName string) (*v1.PodSpec, error) {
func mergePodSpecs(basePodSpec *v1.PodSpec, podSpec *v1.PodSpec, primaryContainerName string, primaryInitContainerName string) (*v1.PodSpec, error) {
if basePodSpec == nil || podSpec == nil {
return nil, errors.New("neither the basePodSpec or the podSpec can be nil")
}
Expand All @@ -539,6 +544,16 @@ func mergePodSpecs(basePodSpec *v1.PodSpec, podSpec *v1.PodSpec, primaryContaine
}
}

// extract defaultInitContainerTemplate and primaryInitContainerTemplate
var defaultInitContainerTemplate, primaryInitContainerTemplate *v1.Container
for i := 0; i < len(basePodSpec.InitContainers); i++ {
if basePodSpec.InitContainers[i].Name == defaultInitContainerTemplateName {
defaultInitContainerTemplate = &basePodSpec.InitContainers[i]
} else if basePodSpec.InitContainers[i].Name == primaryInitContainerTemplateName {
primaryInitContainerTemplate = &basePodSpec.InitContainers[i]
}
}

// merge PodTemplate PodSpec with podSpec
var mergedPodSpec *v1.PodSpec = basePodSpec.DeepCopy()
if err := mergo.Merge(mergedPodSpec, podSpec, mergo.WithOverride, mergo.WithAppendSlice); err != nil {
Expand Down Expand Up @@ -580,6 +595,43 @@ func mergePodSpecs(basePodSpec *v1.PodSpec, podSpec *v1.PodSpec, primaryContaine
}

mergedPodSpec.Containers = mergedContainers

// merge PodTemplate init containers
var mergedInitContainers []v1.Container
for _, initContainer := range podSpec.InitContainers {
// if applicable start with defaultContainerTemplate
var mergedInitContainer *v1.Container
if defaultInitContainerTemplate != nil {
mergedInitContainer = defaultInitContainerTemplate.DeepCopy()
}

// if applicable merge with primaryInitContainerTemplate
if initContainer.Name == primaryInitContainerName && primaryInitContainerTemplate != nil {
if mergedInitContainer == nil {
mergedInitContainer = primaryInitContainerTemplate.DeepCopy()
} else {
err := mergo.Merge(mergedInitContainer, primaryInitContainerTemplate, mergo.WithOverride, mergo.WithAppendSlice)
if err != nil {
return nil, err
}
}
}

// if applicable merge with existing init initContainer
if mergedInitContainer == nil {
mergedInitContainers = append(mergedInitContainers, initContainer)
} else {
err := mergo.Merge(mergedInitContainer, initContainer, mergo.WithOverride, mergo.WithAppendSlice)
if err != nil {
return nil, err
}

mergedInitContainers = append(mergedInitContainers, *mergedInitContainer)
}
}

mergedPodSpec.InitContainers = mergedInitContainers

return mergedPodSpec, nil
}

Expand Down
Loading

0 comments on commit ffd72a0

Please sign in to comment.