diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 04aadb4b06af4..45f527959cbe1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -102,10 +102,10 @@ private[spark] object Config extends Logging { val KUBERNETES_ALLOCATION_BATCH_DELAY = ConfigBuilder("spark.kubernetes.allocation.batch.delay") - .doc("Number of seconds to wait between each round of executor allocation.") - .longConf - .checkValue(value => value > 0, "Allocation batch delay should be a positive integer") - .createWithDefault(1) + .doc("Time to wait between each round of executor allocation.") + .timeConf(TimeUnit.MILLISECONDS) + .checkValue(value => value > 0, "Allocation batch delay must be a positive time value.") + .createWithDefaultString("1s") val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS = ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index e79c987852db2..9de4b16c30d3c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -217,7 +217,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .watch(new ExecutorPodsWatcher())) allocatorExecutor.scheduleWithFixedDelay( - allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS) + allocatorRunnable, 0L, podAllocationInterval, TimeUnit.MILLISECONDS) if (!Utils.isDynamicAllocationEnabled(conf)) { doRequestTotalExecutors(initialExecutors) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 13c09033a50ee..b2f26f205a329 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -46,7 +46,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn private val NAMESPACE = "test-namespace" private val SPARK_DRIVER_HOST = "localhost" private val SPARK_DRIVER_PORT = 7077 - private val POD_ALLOCATION_INTERVAL = 60L + private val POD_ALLOCATION_INTERVAL = "1m" private val DRIVER_URL = RpcEndpointAddress( SPARK_DRIVER_HOST, SPARK_DRIVER_PORT, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString private val FIRST_EXECUTOR_POD = new PodBuilder() @@ -144,7 +144,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn .set(KUBERNETES_NAMESPACE, NAMESPACE) .set("spark.driver.host", SPARK_DRIVER_HOST) .set("spark.driver.port", SPARK_DRIVER_PORT.toString) - .set(KUBERNETES_ALLOCATION_BATCH_DELAY, POD_ALLOCATION_INTERVAL) + .set(KUBERNETES_ALLOCATION_BATCH_DELAY.key, POD_ALLOCATION_INTERVAL) executorPodsWatcherArgument = ArgumentCaptor.forClass(classOf[Watcher[Pod]]) allocatorRunnable = ArgumentCaptor.forClass(classOf[Runnable]) requestExecutorRunnable = ArgumentCaptor.forClass(classOf[Runnable]) @@ -162,8 +162,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn when(allocatorExecutor.scheduleWithFixedDelay( allocatorRunnable.capture(), mockitoEq(0L), - mockitoEq(POD_ALLOCATION_INTERVAL), - mockitoEq(TimeUnit.SECONDS))).thenReturn(null) + mockitoEq(TimeUnit.MINUTES.toMillis(1)), + mockitoEq(TimeUnit.MILLISECONDS))).thenReturn(null) // Creating Futures in Scala backed by a Java executor service resolves to running // ExecutorService#execute (as opposed to submit) doNothing().when(requestExecutorsService).execute(requestExecutorRunnable.capture())