diff --git a/.github/labeler.yml b/.github/labeler.yml index e76dad43902..717c996eef1 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -129,8 +129,7 @@ '.dockerignore', 'bin/docker-image-tool.sh', 'docker/**/*', - 'integration-tests/kyuubi-kubernetes-it/**/*', - 'tools/spark-block-cleaner/**/*' + 'integration-tests/kyuubi-kubernetes-it/**/*' ] "module:metrics": @@ -164,8 +163,7 @@ - changed-files: - any-glob-to-any-file: [ 'externals/kyuubi-spark-sql-engine/**/*', - 'extensions/spark/**/*', - 'tools/spark-block-cleaner/**/*' + 'extensions/spark/**/*' ] "module:extensions": diff --git a/.github/workflows/license.yml b/.github/workflows/license.yml index cc1ab623630..0aef73441ba 100644 --- a/.github/workflows/license.yml +++ b/.github/workflows/license.yml @@ -44,7 +44,7 @@ jobs: check-latest: false - run: >- build/mvn org.apache.rat:apache-rat-plugin:check - -Ptpcds -Pspark-block-cleaner -Pkubernetes-it + -Ptpcds -Pkubernetes-it -Pspark-3.1 -Pspark-3.2 -Pspark-3.3 -Pspark-3.4 -Pspark-3.5 - name: Upload rat report if: failure() diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml index 5b8b6a7048d..0c3dd1e6082 100644 --- a/.github/workflows/style.yml +++ b/.github/workflows/style.yml @@ -34,7 +34,7 @@ jobs: strategy: matrix: profiles: - - '-Pflink-provided,hive-provided,spark-provided,spark-block-cleaner,spark-3.5,spark-3.4,spark-3.3,spark-3.2,tpcds,kubernetes-it' + - '-Pflink-provided,hive-provided,spark-provided,spark-3.5,spark-3.4,spark-3.3,spark-3.2,tpcds,kubernetes-it' steps: - uses: actions/checkout@v4 @@ -65,7 +65,7 @@ jobs: if: steps.modules-check.conclusion == 'success' && steps.modules-check.outcome == 'failure' run: | MVN_OPT="-DskipTests -Dorg.slf4j.simpleLogger.defaultLogLevel=warn -Dmaven.javadoc.skip=true -Drat.skip=true -Dscalastyle.skip=true -Dspotless.check.skip" - build/mvn clean install ${MVN_OPT} -Pflink-provided,hive-provided,spark-provided,spark-block-cleaner,spark-3.2,tpcds + build/mvn clean install ${MVN_OPT} -Pflink-provided,hive-provided,spark-provided,spark-3.2,tpcds build/mvn clean install ${MVN_OPT} -pl extensions/spark/kyuubi-extension-spark-3-1 -Pspark-3.1 build/mvn clean install ${MVN_OPT} -pl extensions/spark/kyuubi-extension-spark-3-3,extensions/spark/kyuubi-spark-connector-hive -Pspark-3.3 build/mvn clean install ${MVN_OPT} -pl extensions/spark/kyuubi-extension-spark-3-4 -Pspark-3.4 diff --git a/build/dist b/build/dist index 2ea702b61af..cde515dc64a 100755 --- a/build/dist +++ b/build/dist @@ -330,14 +330,6 @@ for jar in $(ls "$DISTDIR/jars/"); do fi done -# Copy kyuubi tools -if [[ -f "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner_${SCALA_VERSION}-${VERSION}.jar" ]]; then - mkdir -p "$DISTDIR/tools/spark-block-cleaner/kubernetes" - mkdir -p "$DISTDIR/tools/spark-block-cleaner/jars" - cp -r "$KYUUBI_HOME"/tools/spark-block-cleaner/kubernetes/* "$DISTDIR/tools/spark-block-cleaner/kubernetes/" - cp "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/tools/spark-block-cleaner/jars/" -fi - # Copy Kyuubi Spark extension SPARK_EXTENSION_VERSIONS=('3-1' '3-2' '3-3' '3-4' '3-5') # shellcheck disable=SC2068 diff --git a/dev/reformat b/dev/reformat index 7ad26ae2e17..fe05408ccf4 100755 --- a/dev/reformat +++ b/dev/reformat @@ -20,7 +20,7 @@ set -x KYUUBI_HOME="$(cd "`dirname "$0"`/.."; pwd)" -PROFILES="-Pflink-provided,hive-provided,spark-provided,spark-block-cleaner,spark-3.5,spark-3.4,spark-3.3,spark-3.2,spark-3.1,tpcds,kubernetes-it" +PROFILES="-Pflink-provided,hive-provided,spark-provided,spark-3.5,spark-3.4,spark-3.3,spark-3.2,spark-3.1,tpcds,kubernetes-it" # python style checks rely on `black` in path if ! command -v black &> /dev/null diff --git a/docs/tools/spark_block_cleaner.md b/docs/tools/spark_block_cleaner.md index 4a1f20ff884..12e6cbee57d 100644 --- a/docs/tools/spark_block_cleaner.md +++ b/docs/tools/spark_block_cleaner.md @@ -17,119 +17,5 @@ # Kubernetes Tools Spark Block Cleaner -## Requirements - -You'd better have cognition upon the following things when you want to use spark-block-cleaner. - -* Read this article -* An active Kubernetes cluster -* [Kubectl](https://kubernetes.io/docs/reference/kubectl/overview/) -* [Docker](https://www.docker.com/) - -## Scenes - -When you're using Spark On Kubernetes with Client mode and don't use `emptyDir` for Spark `local-dir` type, you may face the same scenario that executor pods deleted without clean all the Block files. It may cause disk overflow. - -Therefore, we chose to use Spark Block Cleaner to clear the block files accumulated by Spark. - -## Principle - -When deploying Spark Block Cleaner, we will configure volumes for the destination folder. Spark Block Cleaner will perceive the folder by the parameter `CACHE_DIRS`. - -Spark Block Cleaner will clear the perceived folder in a fixed loop(which can be configured by `SCHEDULE_INTERVAL`). And Spark Block Cleaner will select folder start with `blockmgr` and `spark` for deletion using the logic Spark uses to create those folders. - -Before deleting those files, Spark Block Cleaner will determine whether it is a recently modified file(depending on whether the file has not been acted on within the specified time which configured by `FILE_EXPIRED_TIME`). Only delete files those beyond that time interval. - -And Spark Block Cleaner will check the disk utilization after clean, if the remaining space is less than the specified value(control by `FREE_SPACE_THRESHOLD`), will trigger deep clean(which file expired time control by `DEEP_CLEAN_FILE_EXPIRED_TIME`). - -## Usage - -Before you start using Spark Block Cleaner, you should build its docker images. - -### Build Block Cleaner Docker Image - -In the `KYUUBI_HOME` directory, you can use the following cmd to build docker image. - -```shell -docker build ./tools/spark-block-cleaner/kubernetes/docker -``` - -### Modify spark-block-cleaner.yml - -You need to modify the `${KYUUBI_HOME}/tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml` to fit your current environment. - -In Kyuubi tools, we recommend using `DaemonSet` to start, and we offer default yaml file in daemonSet way. - -Base file structure: - -```yaml -apiVersion -kind -metadata - name - namespace -spec - select - template - metadata - spce - containers - - image - - volumeMounts - - env - volumes -``` - -You can use affect the performance of Spark Block Cleaner through configure parameters in containers env part of `spark-block-cleaner.yml`. - -```yaml -env: - - name: CACHE_DIRS - value: /data/data1,/data/data2 - - name: FILE_EXPIRED_TIME - value: 604800 - - name: DEEP_CLEAN_FILE_EXPIRED_TIME - value: 432000 - - name: FREE_SPACE_THRESHOLD - value: 60 - - name: SCHEDULE_INTERVAL - value: 3600 -``` - -The most important thing, configure volumeMounts and volumes corresponding to Spark local-dirs. - -For example, Spark use /spark/shuffle1 as local-dir, you can configure like: - -```yaml -volumes: - - name: block-files-dir-1 - hostPath: - path: /spark/shuffle1 -``` - -```yaml -volumeMounts: - - name: block-files-dir-1 - mountPath: /data/data1 -``` - -```yaml -env: - - name: CACHE_DIRS - value: /data/data1 -``` - -### Start daemonSet - -After you finishing modifying the above, you can use the following command `kubectl apply -f ${KYUUBI_HOME}/tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml` to start daemonSet. - -## Related parameters - -| Name | Default | unit | Meaning | -|------------------------------|-------------------------|---------|-----------------------------------------------------------------------------------------------------------------------| -| CACHE_DIRS | /data/data1,/data/data2 | | The target dirs in container path which will clean block files. | -| FILE_EXPIRED_TIME | 604800 | seconds | Cleaner will clean the block files which current time - last modified time more than the fileExpiredTime. | -| DEEP_CLEAN_FILE_EXPIRED_TIME | 432000 | seconds | Deep clean will clean the block files which current time - last modified time more than the deepCleanFileExpiredTime. | -| FREE_SPACE_THRESHOLD | 60 | % | After first clean, if free Space low than threshold trigger deep clean. | -| SCHEDULE_INTERVAL | 3600 | seconds | Cleaner sleep between cleaning. | - +**Note**: +This tool has been removed since Kyuubi 1.9.0. diff --git a/pom.xml b/pom.xml index f3b3d57644c..c666058d6d4 100644 --- a/pom.xml +++ b/pom.xml @@ -2376,13 +2376,6 @@ - - spark-block-cleaner - - tools/spark-block-cleaner - - - spotless-python diff --git a/tools/spark-block-cleaner/kubernetes/docker/Dockerfile b/tools/spark-block-cleaner/kubernetes/docker/Dockerfile deleted file mode 100644 index 95a7b2cf8aa..00000000000 --- a/tools/spark-block-cleaner/kubernetes/docker/Dockerfile +++ /dev/null @@ -1,34 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -FROM eclipse-temurin:8-jdk-focal - -RUN apt-get update && \ - apt install -y tini && \ - rm -rf /var/cache/apt/* && \ - mkdir /data && \ - mkdir -p /opt/block-cleaner && \ - mkdir -p /log/cleanerLog - -COPY jars /opt/block-cleaner -COPY tools/spark-block-cleaner/jars /opt/block-cleaner -COPY tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh /opt/entrypoint.sh - -RUN chmod +x /opt/entrypoint.sh - -ENV CLEANER_CLASSPATH /opt/block-cleaner/* - -ENTRYPOINT ["/opt/entrypoint.sh"] diff --git a/tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh b/tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh deleted file mode 100755 index 953a80334a5..00000000000 --- a/tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# entrypoint for spark-block-cleaner - -# shellcheck disable=SC2046 -exec /usr/bin/tini -s -- java -cp "${CLASS_PATH}:${CLEANER_CLASSPATH}" \ - org.apache.kyuubi.tools.KubernetesSparkBlockCleaner diff --git a/tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml b/tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml deleted file mode 100644 index 408ee18aa00..00000000000 --- a/tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml +++ /dev/null @@ -1,75 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -apiVersion: apps/v1 -# A DaemonSet ensures that all (or some) Nodes run a copy of a Pod. -kind: DaemonSet -metadata: - name: kyuubi-kubernetes-spark-block-cleaner - # NameSpace help assigned daemonSet to the designated cluster resource - namespace: default -spec: - selector: - matchLabels: - name: block-cleaner - template: - metadata: - labels: - name: block-cleaner - spec: - containers: - # Container image which build by Dockerfile - # TODO official Image - - image: - name: cleaner - volumeMounts: - - name: block-files-dir-1 - mountPath: /data/data1 - - name: block-files-dir-2 - mountPath: /data/data2 - - name: cleaner-log - mountPath: /log/cleanerLog - env: - # Set env to manager cleaner running - # the target dirs which in container - - name: CACHE_DIRS - value: /data/data1,/data/data2 - # Cleaner will clean More distant block files, seconds - - name: FILE_EXPIRED_TIME - value: 604800 - # Deep clean fileExpiredTime, seconds - - name: DEEP_CLEAN_FILE_EXPIRED_TIME - value: 432000 - # After first clean, if free Space low than threshold - # trigger deep clean - - name: FREE_SPACE_THRESHOLD - value: 60 - # Cleaner clean sleep times after cleaning, seconds - - name: SCHEDULE_INTERVAL - value: 3600 - volumes: - # Directory on the host which store block dirs - - name: block-files-dir-1 - hostPath: - path: /blockFilesDirs/data1 - - name: block-files-dir-2 - hostPath: - path: /blockFilesDirs/data2 - # Directory on the host which you want to store clean log - - name: cleaner-log - hostPath: - path: /logDir diff --git a/tools/spark-block-cleaner/pom.xml b/tools/spark-block-cleaner/pom.xml deleted file mode 100644 index 9c777f12177..00000000000 --- a/tools/spark-block-cleaner/pom.xml +++ /dev/null @@ -1,53 +0,0 @@ - - - - 4.0.0 - - org.apache.kyuubi - kyuubi-parent - 1.9.0-SNAPSHOT - ../../pom.xml - - - spark-block-cleaner_${scala.binary.version} - jar - Kyuubi Project Spark Block Cleaner - https://kyuubi.apache.org/ - - - - org.apache.kyuubi - kyuubi-common_${scala.binary.version} - ${project.version} - - - - org.apache.kyuubi - kyuubi-common_${scala.binary.version} - ${project.version} - test-jar - test - - - - - target/scala-${scala.binary.version}/classes - target/scala-${scala.binary.version}/test-classes - - diff --git a/tools/spark-block-cleaner/src/main/resources/log4j-block-cleaner.properties b/tools/spark-block-cleaner/src/main/resources/log4j-block-cleaner.properties deleted file mode 100644 index 2649bc49b95..00000000000 --- a/tools/spark-block-cleaner/src/main/resources/log4j-block-cleaner.properties +++ /dev/null @@ -1,33 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the console -log4j.rootCategory=INFO, console, logFile - -### console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.out -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %d{yyyy} %p %c{2}: %m%n - -### logFile -log4j.appender.logFile=org.apache.log4j.RollingFileAppender -log4j.appender.logFile.File=/logs/spark-block-cleaner-log/cleaner-log.out -log4j.appender.logFile.MaxFileSize=20MB -log4j.appender.logFile.MaxBackupIndex=5 -log4j.appender.logFile.layout=org.apache.log4j.PatternLayout -log4j.appender.logFile.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %p %c{2}: %m%n diff --git a/tools/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala b/tools/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala deleted file mode 100644 index 6d4c1050b43..00000000000 --- a/tools/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.tools - -import java.io.File -import java.nio.file.{Files, Paths} -import java.util.concurrent.{CountDownLatch, Executors} - -import scala.util.control.NonFatal - -import org.apache.log4j.PropertyConfigurator - -import org.apache.kyuubi.Logging - -/* - * Spark storage shuffle data as the following structure. - * - * local-dir1/ - * blockmgr-uuid/ - * hash-sub-dir/ - * shuffle-data - * shuffle-index - * - * local-dir2/ - * blockmgr-uuid/ - * hash-sub-dir/ - * shuffle-data - * shuffle-index - * - * ... - */ -object KubernetesSparkBlockCleaner extends Logging { - import KubernetesSparkBlockCleanerConstants._ - - private val envMap = System.getenv() - - PropertyConfigurator.configure( - Thread.currentThread().getContextClassLoader.getResource("log4j-block-cleaner.properties")) - - private val freeSpaceThreshold = envMap.getOrDefault(FREE_SPACE_THRESHOLD_KEY, "60").toInt - private val fileExpiredTime = envMap.getOrDefault(FILE_EXPIRED_TIME_KEY, "604800").toLong * 1000 - private val scheduleInterval = envMap.getOrDefault(SCHEDULE_INTERVAL, "3600").toLong * 1000 - private val deepCleanFileExpiredTime = - envMap.getOrDefault(DEEP_CLEAN_FILE_EXPIRED_TIME_KEY, "432000").toLong * 1000 - private val cacheDirs = - if (envMap.containsKey(CACHE_DIRS_KEY)) { - envMap.get(CACHE_DIRS_KEY).split(",").filter(!_.equals("")) - } else { - throw new IllegalArgumentException(s"the env $CACHE_DIRS_KEY must be set") - } - private val isTesting = envMap.getOrDefault("kyuubi.testing", "false").toBoolean - checkConfiguration() - - /** - * one thread clean one dir - */ - private val threadPool = Executors.newFixedThreadPool(cacheDirs.length) - - private def checkConfiguration(): Unit = { - require(fileExpiredTime > 0, s"the env $FILE_EXPIRED_TIME_KEY should be greater than 0") - require( - deepCleanFileExpiredTime > 0, - s"the env $DEEP_CLEAN_FILE_EXPIRED_TIME_KEY should be greater than 0") - require(scheduleInterval > 0, s"the env $SCHEDULE_INTERVAL should be greater than 0") - require( - freeSpaceThreshold > 0 && freeSpaceThreshold < 100, - s"the env $FREE_SPACE_THRESHOLD_KEY should between 0 and 100") - require(cacheDirs.nonEmpty, s"the env $CACHE_DIRS_KEY must be set") - cacheDirs.foreach { dir => - val path = Paths.get(dir) - require(Files.exists(path), s"the input cache dir: $dir does not exists") - require(Files.isDirectory(path), s"the input cache dir: $dir should be a directory") - } - - info(s"finish initializing configuration, " + - s"use $CACHE_DIRS_KEY: ${cacheDirs.mkString(",")}, " + - s"$FILE_EXPIRED_TIME_KEY: $fileExpiredTime, " + - s"$FREE_SPACE_THRESHOLD_KEY: $freeSpaceThreshold, " + - s"$SCHEDULE_INTERVAL: $scheduleInterval, " + - s"$DEEP_CLEAN_FILE_EXPIRED_TIME_KEY: $deepCleanFileExpiredTime") - } - - private def doClean(dir: File, time: Long) { - // clean blockManager shuffle file - dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("blockmgr")) - .foreach { blockManagerDir => - info(s"start check blockManager dir ${blockManagerDir.getCanonicalPath}") - // check blockManager directory - val released = blockManagerDir.listFiles.filter(_.isDirectory).map { subDir => - debug(s"start check sub dir ${subDir.getCanonicalPath}") - // check sub directory - subDir.listFiles.map(file => checkAndDeleteFile(file, time)).sum - } - // delete empty blockManager directory and all empty sub directory - if (blockManagerDir.listFiles().forall(subDir => - subDir.isDirectory && subDir.listFiles().isEmpty)) { - blockManagerDir.listFiles().foreach(checkAndDeleteFile(_, time, true)) - checkAndDeleteFile(blockManagerDir, time, true) - } - info(s"finished clean blockManager dir ${blockManagerDir.getCanonicalPath}, " + - s"released space: ${released.sum / 1024 / 1024} MB") - } - - // clean spark cache file - dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("spark")) - .foreach { cacheDir => - info(s"start check cache dir ${cacheDir.getCanonicalPath}") - val released = cacheDir.listFiles.map(file => checkAndDeleteFile(file, time)) - // delete empty spark cache file - checkAndDeleteFile(cacheDir, time, true) - info(s"finished clean cache dir ${cacheDir.getCanonicalPath}, " + - s"released space: ${released.sum / 1024 / 1024} MB") - } - } - - private def checkAndDeleteFile(file: File, time: Long, isDir: Boolean = false): Long = { - debug(s"check file ${file.getName}") - val shouldDeleteFile = - if (isDir) { - file.listFiles.isEmpty && (System.currentTimeMillis() - file.lastModified() > time) - } else { - System.currentTimeMillis() - file.lastModified() > time - } - val length = if (isDir) 0 else file.length() - if (shouldDeleteFile) { - if (file.delete()) { - debug(s"delete file ${file.getAbsolutePath} success") - return length - } else { - warn(s"delete file ${file.getAbsolutePath} fail") - } - } - 0L - } - - import scala.sys.process._ - - private def needToDeepClean(dir: String): Boolean = { - try { - val used = (s"df $dir" #| s"grep $dir").!! - .split(" ").filter(_.endsWith("%")) { - 0 - }.replace("%", "") - info(s"$dir now used $used% space") - - used.toInt > (100 - freeSpaceThreshold) - } catch { - case NonFatal(e) => - error(s"An error occurs when querying the disk $dir capacity, " + - s"return true to make sure the disk space will not overruns: ${e.getMessage}") - true - } - } - - private def doCleanJob(dir: String): Unit = { - val startTime = System.currentTimeMillis() - val path = Paths.get(dir) - info(s"start clean job for $dir") - doClean(path.toFile, fileExpiredTime) - // re check if the disk has enough space - if (needToDeepClean(dir)) { - info(s"start deep clean job for $dir") - doClean(path.toFile, deepCleanFileExpiredTime) - if (needToDeepClean(dir)) { - warn(s"after deep clean $dir, used space still higher than $freeSpaceThreshold") - } - } - val finishedTime = System.currentTimeMillis() - info(s"clean job $dir finished, elapsed time: ${(finishedTime - startTime) / 1000} s") - } - - def main(args: Array[String]): Unit = { - do { - info(s"start all clean job") - val startTime = System.currentTimeMillis() - val hasFinished = new CountDownLatch(cacheDirs.length) - cacheDirs.foreach { dir => - threadPool.execute(() => { - try { - doCleanJob(dir) - } catch { - case NonFatal(e) => - error(s"failed to clean dir: $dir", e) - } finally { - hasFinished.countDown() - } - }) - } - hasFinished.await() - - val usedTime = System.currentTimeMillis() - startTime - info(s"finished to clean all dir, elapsed time ${usedTime / 1000} s") - if (usedTime > scheduleInterval) { - warn(s"clean job elapsed time $usedTime which is greater than $scheduleInterval") - } else { - Thread.sleep(scheduleInterval - usedTime) - } - } while (!isTesting) - } -} - -object KubernetesSparkBlockCleanerConstants { - val CACHE_DIRS_KEY = "CACHE_DIRS" - val FILE_EXPIRED_TIME_KEY = "FILE_EXPIRED_TIME" - val FREE_SPACE_THRESHOLD_KEY = "FREE_SPACE_THRESHOLD" - val SCHEDULE_INTERVAL = "SCHEDULE_INTERVAL" - val DEEP_CLEAN_FILE_EXPIRED_TIME_KEY = "DEEP_CLEAN_FILE_EXPIRED_TIME" -} diff --git a/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala b/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala deleted file mode 100644 index ae4651fe28f..00000000000 --- a/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.tools - -import java.io.File -import java.nio.file.Files -import java.util.{Map => JMap} -import java.util.UUID - -import org.apache.kyuubi.{KyuubiFunSuite, Utils} -import org.apache.kyuubi.util.reflect.ReflectUtils._ - -class KubernetesSparkBlockCleanerSuite extends KyuubiFunSuite { - import KubernetesSparkBlockCleanerConstants._ - - private val rootDir = Utils.createTempDir() - private val cacheDir = Seq("1", "2").map(rootDir.resolve) - private val block1 = new File(cacheDir.head.toFile, s"blockmgr-${UUID.randomUUID.toString}") - private val block2 = new File(cacheDir.head.toFile, s"blockmgr-${UUID.randomUUID.toString}") - - // do not remove - private val subDir1 = new File(block1, "01") - // do not remove - private val data11 = new File(subDir1, "shuffle_0_0_0") - // remove - private val data12 = new File(subDir1, "shuffle_0_0_1") - - // remove - private val subDir2 = new File(block2, "02") - // remove - private val data21 = new File(subDir1, "shuffle_0_1_0") - - private def deleteRecursive(path: File): Unit = { - path.listFiles.foreach { f => - if (f.isDirectory) { - deleteRecursive(f) - } else { - f.delete() - } - } - path.delete() - } - - override def beforeAll(): Unit = { - super.beforeAll() - cacheDir.foreach(Files.createDirectories(_)) - - // create some dir - Files.createDirectories(block1.toPath) - // hash sub dir - Files.createDirectory(subDir1.toPath) - data11.createNewFile() - data11.setLastModified(System.currentTimeMillis() - 10) - data12.createNewFile() - Files.write(data12.toPath, "111".getBytes()) - data12.setLastModified(System.currentTimeMillis() - 10000000) - - Files.createDirectories(block2.toPath) - Files.createDirectory(subDir2.toPath) - subDir2.setLastModified(System.currentTimeMillis() - 10000000) - data21.createNewFile() - data21.setLastModified(System.currentTimeMillis() - 10000000) - } - - override def afterAll(): Unit = { - deleteRecursive(block1) - deleteRecursive(block2) - - super.afterAll() - } - - private def updateEnv(name: String, value: String): Unit = { - getField[JMap[String, String]](System.getenv, "m").put(name, value) - } - - test("test clean") { - updateEnv(CACHE_DIRS_KEY, cacheDir.mkString(",")) - updateEnv(FILE_EXPIRED_TIME_KEY, "600") - updateEnv(SCHEDULE_INTERVAL, "1") - updateEnv("kyuubi.testing", "true") - - KubernetesSparkBlockCleaner.main(Array.empty) - - assert(block1.exists()) - assert(subDir1.exists()) - assert(data11.exists()) - assert(!data12.exists()) - - assert(block2.exists()) - assert(!subDir2.exists()) - assert(!data21.exists()) - } -}