Skip to content

Commit

Permalink
fix+refactor: fix error+make code more readable
Browse files Browse the repository at this point in the history
fix protobuf number mismatch

pass K8sPod instead of annotation and label separately

Signed-off-by: machichima <[email protected]>
  • Loading branch information
machichima committed Dec 6, 2024
1 parent d847a63 commit 241dc46
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 15 deletions.
4 changes: 2 additions & 2 deletions flyteidl/gen/pb-go/flyteidl/plugins/spark.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 5 additions & 13 deletions flyteplugins/go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ func serviceAccountName(metadata pluginsCore.TaskExecutionMetadata) string {
return name
}

func createSparkPodSpec(taskCtx pluginsCore.TaskExecutionContext, podSpec *v1.PodSpec, container *v1.Container, podAnnotations map[string]string, podLabels map[string]string) *sparkOp.SparkPodSpec {
func createSparkPodSpec(taskCtx pluginsCore.TaskExecutionContext, podSpec *v1.PodSpec, container *v1.Container, k8sPod core.K8SPod) *sparkOp.SparkPodSpec {
// TODO: check whether merge annotations/labels together or other ways?
annotations := utils.UnionMaps(config.GetK8sPluginConfig().DefaultAnnotations, utils.CopyMap(taskCtx.TaskExecutionMetadata().GetAnnotations()), podAnnotations)
labels := utils.UnionMaps(config.GetK8sPluginConfig().DefaultLabels, utils.CopyMap(taskCtx.TaskExecutionMetadata().GetLabels()), podLabels)
annotations := utils.UnionMaps(config.GetK8sPluginConfig().DefaultAnnotations, utils.CopyMap(taskCtx.TaskExecutionMetadata().GetAnnotations()), k8sPod.Metadata.Annotations)
labels := utils.UnionMaps(config.GetK8sPluginConfig().DefaultLabels, utils.CopyMap(taskCtx.TaskExecutionMetadata().GetLabels()), k8sPod.Metadata.Labels)

sparkEnv := make([]v1.EnvVar, 0)
for _, envVar := range container.Env {
Expand Down Expand Up @@ -183,22 +183,18 @@ func createDriverSpec(ctx context.Context, taskCtx pluginsCore.TaskExecutionCont

// TODO: Validate whether the following function is correct
// If DriverPod exist in sparkJob and is primary, use it instead
var podAnnotations map[string]string
var podLabels map[string]string
if sparkJob.DriverPod != nil {
podSpec, err = unmarshalK8sPod(podSpec, sparkJob.DriverPod, primaryContainerName)
if err != nil {
return nil, err
}
podAnnotations = sparkJob.DriverPod.Metadata.Annotations
podLabels = sparkJob.DriverPod.Metadata.Labels
}

primaryContainer, err := flytek8s.GetContainer(podSpec, primaryContainerName)
if err != nil {
return nil, err
}
sparkPodSpec := createSparkPodSpec(nonInterruptibleTaskCtx, podSpec, primaryContainer, podAnnotations, podLabels)
sparkPodSpec := createSparkPodSpec(nonInterruptibleTaskCtx, podSpec, primaryContainer, *sparkJob.DriverPod)
serviceAccountName := serviceAccountName(nonInterruptibleTaskCtx.TaskExecutionMetadata())
spec := driverSpec{
&sparkOp.DriverSpec{
Expand Down Expand Up @@ -260,22 +256,18 @@ func createExecutorSpec(ctx context.Context, taskCtx pluginsCore.TaskExecutionCo

// TODO: Validate whether the following function is correct
// If DriverPod exist in sparkJob and is primary, use it instead
var podAnnotations map[string]string
var podLabels map[string]string
if sparkJob.ExecutorPod != nil {
podSpec, err = unmarshalK8sPod(podSpec, sparkJob.ExecutorPod, primaryContainerName)
if err != nil {
return nil, err
}
podAnnotations = sparkJob.ExecutorPod.Metadata.Annotations
podLabels = sparkJob.ExecutorPod.Metadata.Labels
}

primaryContainer, err := flytek8s.GetContainer(podSpec, primaryContainerName)
if err != nil {
return nil, err
}
sparkPodSpec := createSparkPodSpec(taskCtx, podSpec, primaryContainer, podAnnotations, podLabels)
sparkPodSpec := createSparkPodSpec(taskCtx, podSpec, primaryContainer, *sparkJob.ExecutorPod)
serviceAccountName := serviceAccountName(taskCtx.TaskExecutionMetadata())
spec := executorSpec{
primaryContainer,
Expand Down

0 comments on commit 241dc46

Please sign in to comment.