Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Spark Task tolerations not applied with PodTemplate #4378

Closed
2 tasks done
tionichm opened this issue Nov 7, 2023 · 7 comments
Closed
2 tasks done

[BUG] Spark Task tolerations not applied with PodTemplate #4378

tionichm opened this issue Nov 7, 2023 · 7 comments
Assignees
Labels
bug Something isn't working waiting for reporter Used for when we need input from the bug reporter

Comments

@tionichm
Copy link

tionichm commented Nov 7, 2023

Describe the bug

We're running flyte binary on EKS. Everything is working at this point, including spark tasks. Recently it has become a requirement to move all flyte-related to a specific node group with a label and taint combo.

Tolerations for spark tasks were added in this PR (PR#4183). I've spent some time trying to implement exactly what is shown in the PR, and I have had no success.

Note, the node selector portion of this is done using spark configs and not PodTemplate, and it works as expected.

I can confirm that the following task works as expected

"""Simple tasks that can be used to debug flyte."""
import os

from flytekit import PodTemplate, Resources, task
from flytekitplugins.spark import Spark
from kubernetes.client import V1Container, V1PodSpec, V1Toleration

from .utils import SparkTaskHelper

task_helper = SparkTaskHelper(
    client=os.environ.get("CLIENT", "client"),
)


@task(
    requests=Resources(cpu="0.1", mem="128Mi"),
    limits=Resources(cpu="0.2", mem="256Mi"),
    container_image=task_helper.container_image,
    pod_template=PodTemplate(
        primary_container_name="primary",
        pod_spec=V1PodSpec(
            containers=[
                V1Container(
                    name="primary",
                    image="hello-world",
                ),
            ],
            node_selector={"app": "flyte"},
            tolerations=[
                V1Toleration(
                    key="app",
                    operator="Equal",
                    value="flyte",
                    effect="NoSchedule",
                ),
            ],
        ),
    ),
)
def hello_world_no_spark() -> None:
    """Print "hello world"."""
    print("hello world")


@task(
    task_config=Spark(
        spark_conf=task_helper.spark_confs,
        executor_path=task_helper.ex_path,
        applications_path=task_helper.app_path,
    ),
    container_image=task_helper.container_image,
    pod_template=PodTemplate(
        primary_container_name="primary",
        pod_spec=V1PodSpec(
            containers=[
                V1Container(
                    name="primary",
                    image="hello-world",
                ),
            ],
            node_selector={"app": "flyte"},
            tolerations=[
                V1Toleration(
                    key="app",
                    operator="Equal",
                    value="flyte",
                    effect="NoSchedule",
                ),
            ],
        ),
    ),
)
def hello_world_spark() -> None:
    """Print "hello world"."""
    print("hello world")

yields

{"config":{"primary_container_name":"primary"},"id":{"resourceType":1,"project":"flint","domain":"development","name":"flint_tasks.test.hello_world.hello_world_no_spark","version":"5.2.0-5dca779"},"type":"python-task","metadata":{"tags":{},"runtime":{"type":1,"version":"1.10.0","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{}},"outputs":{"variables":{}}},"k8sPod":{"metadata":{"labels":{},"annotations":{}},"podSpec":{"fields":{"containers":{"listValue":{"values":[{"structValue":{"fields":{"env":{"listValue":{"values":[]}},"image":{"stringValue":"275184421095.dkr.ecr.af-south-1.amazonaws.com/flint-spark:5.2.0-5dca779-flint"},"name":{"stringValue":"primary"},"command":{"listValue":{"values":[]}},"resources":{"structValue":{"fields":{"limits":{"structValue":{"fields":{"memory":{"stringValue":"256Mi"},"cpu":{"stringValue":"0.2"}}}},"requests":{"structValue":{"fields":{"cpu":{"stringValue":"0.1"},"memory":{"stringValue":"128Mi"}}}}}}},"args":{"listValue":{"values":[{"stringValue":"pyflyte-fast-execute"},{"stringValue":"--additional-distribution"},{"stringValue":*****},{"stringValue":"--dest-dir"},{"stringValue":"/root"},{"stringValue":"--"},{"stringValue":"pyflyte-execute"},{"stringValue":"--inputs"},{"stringValue":"{{.input}}"},{"stringValue":"--output-prefix"},{"stringValue":"{{.outputPrefix}}"},{"stringValue":"--raw-output-data-prefix"},{"stringValue":"{{.rawOutputDataPrefix}}"},{"stringValue":"--checkpoint-path"},{"stringValue":"{{.checkpointOutputPrefix}}"},{"stringValue":"--prev-checkpoint"},{"stringValue":"{{.prevCheckpointPrefix}}"},{"stringValue":"--resolver"},{"stringValue":"flytekit.core.python_auto_container.default_task_resolver"},{"stringValue":"--"},{"stringValue":"task-module"},{"stringValue":"flint_tasks.test.hello_world"},{"stringValue":"task-name"},{"stringValue":"hello_world_no_spark"}]}}}}}]}},"tolerations":{"listValue":{"values":[{"structValue":{"fields":{"effect":{"stringValue":"NoSchedule"},"operator":{"stringValue":"Equal"},"value":{"stringValue":"flyte"},"key":{"stringValue":"app"}}}}]}},"nodeSelector":{"structValue":{"fields":{"app":{"stringValue":"flyte"}}}}}}}}

for hello_world_no_spark and

{"config":{"primary_container_name":"primary"},"id":{"resourceType":1,"project":"flint","domain":"development","name":"flint_tasks.test.hello_world.hello_world_spark","version":"5.2.0-dd4654f"},"type":"spark","metadata":{"tags":{},"runtime":{"type":1,"version":"1.10.0","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{}},"outputs":{"variables":{}}},"custom":{"fields":{"sparkConf":{"structValue":{"fields":{"spark.kubernetes.executor.request.cores":{"stringValue":"1"},"spark.executor.instances":{"stringValue":"4"},"spark.kubernetes.driver.limit.cores":{"stringValue":"1"},"spark.sql.hive.metastore.jars.path":{"stringValue":"file:///opt/spark/jars/hive/*.jar"},"spark.sql.hive.metastore.version":{"stringValue":"3.1.3"},"spark.hive.metastore.uris":{"stringValue":"thrift://poc-flint-metastore-thrift.flyte:9083"},"spark.sql.catalogImplementation":{"stringValue":"hive"},"spark.kubernetes.driver.request.cores":{"stringValue":"1"},"spark.executor.memory":{"stringValue":"2g"},"spark.kubernetes.executor.limit.cores":{"stringValue":"4"},"spark.sql.hive.metastore.jars":{"stringValue":"path"}}}},"databricksConf":{"structValue":{"fields":{}}},"mainApplicationFile":{"stringValue":"local:///usr/local/bin/entrypoint.py"},"executorPath":{"stringValue":"python"}}},"k8sPod":{"metadata":{"labels":{},"annotations":{}},"podSpec":{"fields":{"nodeSelector":{"structValue":{"fields":{"app":{"stringValue":"flyte"}}}},"containers":{"listValue":{"values":[{"structValue":{"fields":{"args":{"listValue":{"values":[{"stringValue":"pyflyte-fast-execute"},{"stringValue":"--additional-distribution"},{"stringValue":"s3://dev-flyte-s3-storage/flint/development/4LLRY7MSZOQFAUROC42QEWSXOY======/fast07b0f2c937e3074c9a67215953b107e2.tar.gz"},{"stringValue":"--dest-dir"},{"stringValue":"/root"},{"stringValue":"--"},{"stringValue":"pyflyte-execute"},{"stringValue":"--inputs"},{"stringValue":"{{.input}}"},{"stringValue":"--output-prefix"},{"stringValue":"{{.outputPrefix}}"},{"stringValue":"--raw-output-data-prefix"},{"stringValue":"{{.rawOutputDataPrefix}}"},{"stringValue":"--checkpoint-path"},{"stringValue":"{{.checkpointOutputPrefix}}"},{"stringValue":"--prev-checkpoint"},{"stringValue":"{{.prevCheckpointPrefix}}"},{"stringValue":"--resolver"},{"stringValue":"flytekit.core.python_auto_container.default_task_resolver"},{"stringValue":"--"},{"stringValue":"task-module"},{"stringValue":"flint_tasks.test.hello_world"},{"stringValue":"task-name"},{"stringValue":"hello_world_spark"}]}},"env":{"listValue":{"values":[]}},"image":{"stringValue":"275184421095.dkr.ecr.af-south-1.amazonaws.com/flint-spark:5.2.0-dd4654f-flint"},"name":{"stringValue":"primary"},"command":{"listValue":{"values":[]}}}}}]}},"tolerations":{"listValue":{"values":[{"structValue":{"fields":{"value":{"stringValue":"flyte"},"key":{"stringValue":"app"},"effect":{"stringValue":"NoSchedule"},"operator":{"stringValue":"Equal"}}}}]}}}}}}

for hello_world_spark

On creation, the manifests look like this:

hello_world_no_spark

apiVersion: v1
kind: Pod
...
  nodeSelector:
    app: flyte
...
  tolerations:
  - effect: NoSchedule
    key: app
    operator: Equal
    value: flyte
  - effect: NoExecute
    key: node.kubernetes.io/not-ready
    operator: Exists
    tolerationSeconds: 300
  - effect: NoExecute
    key: node.kubernetes.io/unreachable
    operator: Exists
    tolerationSeconds: 300
...

hello_world_spark

apiVersion: v1
kind: Pod
...
  nodeSelector:
    app: flyte
...
  tolerations:
  - effect: NoExecute
    key: node.kubernetes.io/not-ready
    operator: Exists
    tolerationSeconds: 300
  - effect: NoExecute
    key: node.kubernetes.io/unreachable
    operator: Exists
    tolerationSeconds: 300
...

Expected behavior

Spark tasks that are registered with tolerations via PodTemplate are expected to have the same tolerations applied to the driver and executor pods when the task is executed.

Additional context to reproduce

  1. Install flyte binary on a k8s cluster with the Spark plugin enabled. (Flyte release 1.10, Spark operator 1.1.27)
  2. Create a base image that will be used for spark tasks using a combination of the Apache Spark docker image helper and the normal flyte docker image. (Tested and works). (Flytekit 1.10)
  3. Taint a node with app=flyte:NoSchedule
  4. Label the same node with app=flyte.
  5. Set node selector for spark jobs via spark configs: spark.kubernetes.node.selector.app: "flyte"
  6. Create a Spark task using the following:
# tolerations are in the PodTemplate here
@task(task_config=Spark(...), container_image="...", pod_template=PodTemplate(...)
def my_spark_task(...):
    ...
  1. Register task - this should work without issue.
  2. Inspect task on flyte console UI - you should see the toleration in the task structure. (See screenshot below).
  3. Launch task.
  4. The driver will attempt to schedule to the node with the app=flyte label.
  5. Describe the pod. You will see that the toleration has not been passed into the driver pod. (See the second screenshot).
  6. The task will remain in limbo.

Screenshots

image001
image002

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@tionichm tionichm added bug Something isn't working untriaged This issues has not yet been looked at by the Maintainers labels Nov 7, 2023
@pingsutw
Copy link
Member

pingsutw commented Nov 7, 2023

FYI @andrewwdye. I'll test it later today.

@andrewwdye
Copy link
Contributor

I just reran the steps from the test plan at #4183 and can confirm that the pod template toleration is added to the pod

@tionichm tionichm closed this as completed Nov 8, 2023
@tionichm tionichm reopened this Nov 8, 2023
@tionichm
Copy link
Author

tionichm commented Nov 8, 2023

I just reran the steps from the test plan at #4183 and can confirm that the pod template toleration is added to the pod

What do you think is happening on my side?

@andrewwdye @pingsutw I will add some code on top to help reproduce my issues.

@eapolinario
Copy link
Contributor

@tionichm , how are you deploying Flyte? Are you using the official helm charts? Can you double-check which version you're using? Also, can you confirm which version of Spark you're using?

@eapolinario eapolinario removed the untriaged This issues has not yet been looked at by the Maintainers label Nov 9, 2023
@eapolinario eapolinario added the waiting for reporter Used for when we need input from the bug reporter label Nov 9, 2023
@tionichm
Copy link
Author

tionichm commented Nov 21, 2023

@eapolinario Sorry for only getting back to you now. I'll share what information I have, sure.

My setup:

  • flyte binary v1.10.0.
  • spark operator 1.1.27.
  • spark 3.5.0 in jobs.

Values file:

configuration:
  database:
    password: *****
    host: *****
    dbname: flyte
    username: postgres
    port: 5432
  storage:
    metadataContainer: *****
    userDataContainer: *****
    provider: s3 
    providerConfig:
      s3:
        region: af-south-1
        authType: iam
  auth:
    enabled: false
  inline:
    cluster_resources:
      customData:
      - production:
        - defaultIamRole:
            value: *****
      - staging:
        - defaultIamRole:
            value: *****
      - development:
        - defaultIamRole:
            value: *****
    flyteadmin:
      roleNameKey: "iam.amazonaws.com/role"
    plugins:
      k8s:
        inject-finalizer: true
        default-env-vars:
          - AWS_METADATA_SERVICE_TIMEOUT: 5
          - AWS_METADATA_SERVICE_NUM_ATTEMPTS: 20
      spark:
        spark-config-default:
          - spark.hadoop.fs.s3a.aws.credentials.provider: com.amazonaws.auth.DefaultAWSCredentialsProviderChain
          - spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2"
          - spark.hadoop.fs.s3a.acl.default: BucketOwnerFullControl
          - spark.hadoop.fs.s3n.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
          - spark.hadoop.fs.AbstractFileSystem.s3n.impl: org.apache.hadoop.fs.s3a.S3A
          - spark.hadoop.fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
          - spark.hadoop.fs.AbstractFileSystem.s3.impl: org.apache.hadoop.fs.s3a.S3A
          - spark.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
          - spark.hadoop.fs.AbstractFileSystem.s3a.impl: org.apache.hadoop.fs.s3a.S3A
          - spark.hadoop.fs.s3a.multipart.threshold: "536870912"
          - spark.task.maxfailures: "8"
          - spark.kubernetes.allocation.batch.size: "50"
          - spark.kubernetes.node.selector.app: "flyte"
    storage:
      cache:
        max_size_mbs: 10
        target_gc_percent: 100
    tasks:
      task-plugins:
        enabled-plugins:
          - container
          - sidecar
          - K8S-ARRAY
          - spark
        default-for-task-types:
          - container: container
          - container_array: K8S-ARRAY
          - spark: spark
deployment:
  extraPodSpec:
    nodeSelector:
      app: flyte
    tolerations:
    - key: "app"
      operator: "Equal"
      value: "flyte"
      effect: "NoSchedule"
clusterResourceTemplates:
  inline:
    001_namespace.yaml: |
      apiVersion: v1
      kind: Namespace
      metadata:
        name: '{{ namespace }}'
    # 002_serviceaccount.yaml: |
    #   apiVersion: v1
    #   kind: ServiceAccount
    #   metadata:
    #     name: flyte-runner
    #     namespace: '{{ namespace }}'
    #     annotations:
    #       eks.amazonaws.com/role-arn: '{{ defaultIamRole }}'
    010_spark_role.yaml: |
      apiVersion: rbac.authorization.k8s.io/v1
      kind: Role
      metadata:
        name: flyte-spark-role
        namespace: '{{ namespace }}'
      rules:
      - apiGroups:
        - ""
        resources:
        - '*'
        verbs:
        - '*'
    011_spark_service_account.yaml: |
      apiVersion: v1
      kind: ServiceAccount
      metadata:
        name: spark
        namespace: '{{ namespace }}'
        annotations:
          eks.amazonaws.com/role-arn: '{{ defaultIamRole }}'
    012_spark_role_binding.yaml: |
      apiVersion: rbac.authorization.k8s.io/v1
      kind: RoleBinding
      metadata:
        name: flyte-spark-role-binding
        namespace: '{{ namespace }}'
      roleRef:
        apiGroup: rbac.authorization.k8s.io
        kind: Role
        name: flyte-spark-role
      subjects:
      - kind: ServiceAccount
        name: spark
        namespace: '{{ namespace }}'

ingress:
  create: true
  commonAnnotations:
    kubernetes.io/ingress.class: nginx
  httpAnnotations:
    nginx.ingress.kubernetes.io/app-root: /console
  grpcAnnotations:
    nginx.ingress.kubernetes.io/backend-protocol: GRPC
  host: *****
  tls:
  - hosts:
      - *****
    secretName: flyte.deploy-tls-secret
rbac:
  extraRules:
    - apiGroups:
      - ""
      resources:
      - '*'
      verbs:
      - "*"
    - apiGroups:
      - ""
      resources:
      - serviceaccounts
      verbs:
      - '*'
    - apiGroups:
      - rbac.authorization.k8s.io
      resources:
      - '*'
      verbs:
      - '*'
    - apiGroups:
      - sparkoperator.k8s.io
      resources:
      - sparkapplications
      verbs:
      - "*"
serviceAccount:
  create: true
  name: flyte-sa
  annotations:
    eks.amazonaws.com/role-arn: *****

@tionichm
Copy link
Author

tionichm commented Dec 11, 2023

Found the problem.

Our spark operator for k8s deployment was missing the mutating admission webhook, which is a hard requirement for this to work.

We enabled it on the helm chart and set the webhook port to 443.

@hamersaw
Copy link
Contributor

@tionichm this is great to hear! Thanks so much for updating this ticket, it really helps future users debug problems. Is it reasonable to close this now?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working waiting for reporter Used for when we need input from the bug reporter
Projects
None yet
Development

No branches or pull requests

5 participants