From c6b50e33ee07e1b87ad467eade131aed0fdf6fd6 Mon Sep 17 00:00:00 2001 From: mingji Date: Tue, 4 Jul 2023 22:18:32 +0800 Subject: [PATCH 01/29] [CELEBORN-764] Fix celeborn on HDFS might clean using app directories. --- README.md | 6 ++--- .../apache/celeborn/client/ShuffleClient.java | 14 +++------- .../apache/celeborn/common/CelebornConf.scala | 6 ++--- .../apache/celeborn/common/util/Utils.scala | 18 +++++++++++++ docs/configuration/client.md | 1 + docs/configuration/master.md | 1 + docs/configuration/worker.md | 2 +- .../clustermeta/AbstractMetaManager.java | 6 +++++ .../service/deploy/master/Master.scala | 25 +++++++++++++++++ .../worker/storage/StorageManager.scala | 27 ++----------------- 10 files changed, 63 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index d6fbd88a4d4..e12bbdbbce9 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ celeborn.worker.commitFiles.threads 128 celeborn.master.slot.assign.policy roundrobin celeborn.rpc.askTimeout 240s celeborn.worker.flusher.hdfs.buffer.size 4m -celeborn.worker.storage.hdfs.dir hdfs:///celeborn +celeborn.storage.hdfs.dir hdfs:///celeborn celeborn.worker.replicate.fastFail.duration 240s # If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false @@ -175,7 +175,7 @@ celeborn.worker.commitFiles.threads 128 celeborn.master.slot.assign.policy roundrobin celeborn.rpc.askTimeout 240s celeborn.worker.flusher.hdfs.buffer.size 4m -celeborn.worker.storage.hdfs.dir hdfs:///celeborn +celeborn.storage.hdfs.dir hdfs:///celeborn celeborn.worker.replicate.fastFail.duration 240s # If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false @@ -259,7 +259,7 @@ spark.celeborn.client.push.replicate.enabled true spark.sql.adaptive.localShuffleReader.enabled false # If Celeborn is using HDFS -spark.celeborn.worker.storage.hdfs.dir hdfs:///celeborn +spark.celeborn.storage.hdfs.dir hdfs:///celeborn # we recommend enabling aqe support to gain better performance spark.sql.adaptive.enabled true diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index 389d0f6eac3..459f07f4f3a 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +29,7 @@ import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.common.rpc.RpcEndpointRef; -import org.apache.celeborn.common.util.CelebornHadoopUtils$; +import org.apache.celeborn.common.util.Utils; import org.apache.celeborn.common.write.PushState; /** @@ -85,16 +84,9 @@ public static FileSystem getHdfsFs(CelebornConf conf) { if (null == hdfsFs) { synchronized (ShuffleClient.class) { if (null == hdfsFs) { - Configuration hdfsConfiguration = CelebornHadoopUtils$.MODULE$.newConfiguration(conf); - // enable fs cache to avoid too many fs instances - hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false"); - hdfsConfiguration.set("fs.viewfs.impl.disable.cache", "false"); - logger.info( - "Celeborn client will ignore cluster" - + " settings about fs.hdfs/viewfs.impl.disable.cache and set it to false"); try { - hdfsFs = FileSystem.get(hdfsConfiguration); - } catch (IOException e) { + hdfsFs = Utils.getHadoopFS(conf); + } catch (Exception e) { System.err.println("Celeborn initialize hdfs failed."); e.printStackTrace(System.err); } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index b24b7c03a25..533def51e9b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1955,9 +1955,9 @@ object CelebornConf extends Logging { .createWithDefault(16) val HDFS_DIR: OptionalConfigEntry[String] = - buildConf("celeborn.worker.storage.hdfs.dir") - .withAlternative("celeborn.storage.hdfs.dir") - .categories("worker") + buildConf("celeborn.storage.hdfs.dir") + .withAlternative("celeborn.worker.storage.hdfs.dir") + .categories("worker", "master", "client") .version("0.2.0") .doc("HDFS dir configuration for Celeborn to access HDFS.") .stringConf diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index a4eb900e070..2c6f7d8b2f8 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -38,6 +38,8 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.google.protobuf.{ByteString, GeneratedMessageV3} import io.netty.channel.unix.Errors.NativeIoException import org.apache.commons.lang3.SystemUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} import org.roaringbitmap.RoaringBitmap import org.apache.celeborn.common.CelebornConf @@ -1072,4 +1074,20 @@ object Utils extends Logging { } labelPart(0).trim -> labelPart(1).trim } + + def getHadoopFS(conf: CelebornConf): FileSystem = { + val path = new Path(conf.hdfsDir) + val scheme = path.toUri.getScheme + val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) + val hdfsConfiguration = new Configuration() + hdfsConfiguration.set("dfs.replication", "2") + hdfsConfiguration.set(disableCacheName, "false") + for (elem <- CelebornHadoopUtils.newConfiguration(conf).iterator().asScala) { + hdfsConfiguration.set(elem.getKey, elem.getValue) + } + logInfo("Celeborn will ignore cluster settings " + + disableCacheName + " and set it to false") + path.getFileSystem(hdfsConfiguration) + } + } diff --git a/docs/configuration/client.md b/docs/configuration/client.md index d88c1b64beb..c8ea8cbd688 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -97,4 +97,5 @@ license: | | celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. | 0.3.0 | | celeborn.master.endpoints | <localhost>:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `:[,:]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | +| celeborn.storage.hdfs.dir | <undefined> | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 | diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 8842b6c8c18..bc1b24f7f5b 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -34,4 +34,5 @@ license: | | celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.activeTypes` | 0.3.0 | | celeborn.master.userResourceConsumption.update.interval | 30s | Time length for a window about compute user resource consumption. | 0.3.0 | | celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 | +| celeborn.storage.hdfs.dir | <undefined> | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 29e3b99afc2..2f0856bead7 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -23,6 +23,7 @@ license: | | celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | | celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 | +| celeborn.storage.hdfs.dir | <undefined> | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 | | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for read buffer per mount point. | 0.3.0 | | celeborn.worker.closeIdleConnections | false | Whether worker will close idle connections. | 0.2.0 | | celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | @@ -92,7 +93,6 @@ license: | | celeborn.worker.storage.checkDirsEmpty.timeout | 1000ms | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | 0.3.0 | | celeborn.worker.storage.dirs | <undefined> | Directory list to store shuffle data. It's recommended to configure one directory on each disk. Storage size limit can be set for each directory. For the sake of performance, there should be no more than 2 flush threads on the same disk partition if you are using HDD, and should be 8 or more flush threads on the same disk partition if you are using SSD. For example: `dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=]` | 0.2.0 | | celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved space for each disk. | 0.3.0 | -| celeborn.worker.storage.hdfs.dir | <undefined> | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 | | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's working dir path name. | 0.3.0 | | celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to close | 0.2.0 | | celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file writer to create if its creation was failed. | 0.2.0 | diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index d0ef66612ce..794f3250193 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -138,6 +138,12 @@ public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, l partitionTotalFileCount.add(fileCount); } + public Set getActiveAppIds() { + return registeredShuffle.stream() + .map(key -> Utils.splitShuffleKey(key)._1) + .collect(Collectors.toSet()); + } + public void updateAppLostMeta(String appId) { registeredShuffle.stream() .filter(shuffle -> shuffle.startsWith(appId)) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index cb5b2216cc7..7ba9a3d1e17 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -25,6 +25,8 @@ import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} import scala.collection.JavaConverters._ import scala.util.Random +import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.identity.UserIdentifier @@ -141,6 +143,7 @@ private[celeborn] class Master( // init and register master metrics val resourceConsumptionSource = new ResourceConsumptionSource(conf) private val masterSource = new MasterSource(conf) + private var hadoopFs: FileSystem = _ masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () => statusSystem.registeredShuffle.size } @@ -653,11 +656,33 @@ private[celeborn] class Master( override def run(): Unit = { statusSystem.handleAppLost(appId, requestId) logInfo(s"Removed application $appId") + // only leader can clean hdfs dirs + if (conf.hasHDFSStorage && !conf.hdfsDir.isEmpty) { + cleanExpiredAppDirsOnHDFS() + } context.reply(ApplicationLostResponse(StatusCode.SUCCESS)) } }) } + private def cleanExpiredAppDirsOnHDFS(): Unit = { + val activeAppIds = statusSystem.getActiveAppIds + if (hadoopFs == null) { + hadoopFs = Utils.getHadoopFS(conf) + } + val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir) + if (hadoopFs.exists(hdfsWorkPath)) { + val iter = hadoopFs.listStatusIterator(hdfsWorkPath) + while (iter.hasNext) { + val fileStatus = iter.next() + if (!activeAppIds.contains(fileStatus.getPath.getName)) { + logDebug(s"Clean hdfs dir ${fileStatus.getPath.toString}") + hadoopFs.delete(fileStatus.getPath, true) + } + } + } + } + private def handleHeartbeatFromApplication( context: RpcCallContext, appId: String, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index d2099df2abf..9af051987b1 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -120,22 +120,12 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs deviceMonitor.startCheck() val hdfsDir = conf.hdfsDir - if (!hdfsDir.isEmpty && conf.hasHDFSStorage) { - logInfo(s"Initialize HDFS support with path ${hdfsDir}") - } val hdfsPermission = new FsPermission("755") val hdfsWriters = JavaUtils.newConcurrentHashMap[String, FileWriter]() val (hdfsFlusher, _totalHdfsFlusherThread) = if (!hdfsDir.isEmpty && conf.hasHDFSStorage) { - val path = new Path(hdfsDir) - val scheme = path.toUri.getScheme - val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) - val hdfsConfiguration = CelebornHadoopUtils.newConfiguration(conf) - hdfsConfiguration.set("dfs.replication", "2") - hdfsConfiguration.set(disableCacheName, "false") - logInfo("Celeborn will ignore cluster settings " + - disableCacheName + " and set it to false") - StorageManager.hadoopFs = path.getFileSystem(hdfsConfiguration) + logInfo(s"Initialize HDFS support with path ${hdfsDir}") + StorageManager.hadoopFs = Utils.getHadoopFS(conf) ( Some(new HdfsFlusher( workerSource, @@ -527,19 +517,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs case _ => // do nothing } } - - if (hadoopFs != null) { - val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir) - if (hadoopFs.exists(hdfsWorkPath)) { - val iter = hadoopFs.listStatusIterator(hdfsWorkPath) - while (iter.hasNext) { - val fileStatus = iter.next() - if (!appIds.contains(fileStatus.getPath.getName)) { - hadoopFs.delete(fileStatus.getPath, true) - } - } - } - } } private def deleteDirectory(dir: File, threadPool: ThreadPoolExecutor): Unit = { From c407201c895a76290b2f91d999834565b5ebcd68 Mon Sep 17 00:00:00 2001 From: Ethan Feng Date: Wed, 5 Jul 2023 13:43:15 +0800 Subject: [PATCH 02/29] Update common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala Update common/src/main/scala/org/apache/celeborn/common/util/Utils.scala Co-authored-by: Cheng Pan --- .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 2 +- .../main/scala/org/apache/celeborn/common/util/Utils.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 533def51e9b..7bfabf682e1 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1959,7 +1959,7 @@ object CelebornConf extends Logging { .withAlternative("celeborn.worker.storage.hdfs.dir") .categories("worker", "master", "client") .version("0.2.0") - .doc("HDFS dir configuration for Celeborn to access HDFS.") + .doc("HDFS base directory for Celeborn to store shuffle data.") .stringConf .createOptional diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 2c6f7d8b2f8..e54999b620e 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -1085,8 +1085,8 @@ object Utils extends Logging { for (elem <- CelebornHadoopUtils.newConfiguration(conf).iterator().asScala) { hdfsConfiguration.set(elem.getKey, elem.getValue) } - logInfo("Celeborn will ignore cluster settings " + - disableCacheName + " and set it to false") + logInfo("Celeborn will ignore cluster settings $disableCacheName and " + "set it to false") path.getFileSystem(hdfsConfiguration) } From d4e359597aa2c7154d167b7496096a64d66f45c3 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 13:54:06 +0800 Subject: [PATCH 03/29] update --- .../org/apache/celeborn/common/CelebornConf.scala | 1 - .../celeborn/common/util/CelebornHadoopUtils.scala | 13 ++++++++++++- .../org/apache/celeborn/common/util/Utils.scala | 13 +------------ docs/configuration/client.md | 2 +- docs/configuration/master.md | 2 +- docs/configuration/worker.md | 2 +- 6 files changed, 16 insertions(+), 17 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 7bfabf682e1..d2b1cf2c4fc 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1956,7 +1956,6 @@ object CelebornConf extends Logging { val HDFS_DIR: OptionalConfigEntry[String] = buildConf("celeborn.storage.hdfs.dir") - .withAlternative("celeborn.worker.storage.hdfs.dir") .categories("worker", "master", "client") .version("0.2.0") .doc("HDFS base directory for Celeborn to store shuffle data.") diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 39b70ff1892..3f73926f388 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -18,12 +18,23 @@ package org.apache.celeborn.common.util import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging -object CelebornHadoopUtils { +object CelebornHadoopUtils extends Logging { private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = { val hadoopConf = new Configuration() + if (!conf.hdfsDir.isEmpty) { + val path = new Path(conf.hdfsDir) + val scheme = path.toUri.getScheme + val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) + hadoopConf.set("dfs.replication", "2") + hadoopConf.set(disableCacheName, "false") + logInfo(s"Celeborn will ignore cluster settings $disableCacheName and " + + "set it to false") + } appendSparkHadoopConfigs(conf, hadoopConf) hadoopConf } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index e54999b620e..e3c14ff9393 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -1076,18 +1076,7 @@ object Utils extends Logging { } def getHadoopFS(conf: CelebornConf): FileSystem = { - val path = new Path(conf.hdfsDir) - val scheme = path.toUri.getScheme - val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) - val hdfsConfiguration = new Configuration() - hdfsConfiguration.set("dfs.replication", "2") - hdfsConfiguration.set(disableCacheName, "false") - for (elem <- CelebornHadoopUtils.newConfiguration(conf).iterator().asScala) { - hdfsConfiguration.set(elem.getKey, elem.getValue) - } - logInfo("Celeborn will ignore cluster settings $disableCacheName and " - "set it to false") - path.getFileSystem(hdfsConfiguration) + new Path(conf.hdfsDir).getFileSystem(CelebornHadoopUtils.newConfiguration(conf)) } } diff --git a/docs/configuration/client.md b/docs/configuration/client.md index c8ea8cbd688..15885c8a61f 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -97,5 +97,5 @@ license: | | celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. | 0.3.0 | | celeborn.master.endpoints | <localhost>:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `:[,:]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | -| celeborn.storage.hdfs.dir | <undefined> | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 | +| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | diff --git a/docs/configuration/master.md b/docs/configuration/master.md index bc1b24f7f5b..6152a5b633f 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -34,5 +34,5 @@ license: | | celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.activeTypes` | 0.3.0 | | celeborn.master.userResourceConsumption.update.interval | 30s | Time length for a window about compute user resource consumption. | 0.3.0 | | celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 | -| celeborn.storage.hdfs.dir | <undefined> | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 | +| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 2f0856bead7..887788ceff2 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -23,7 +23,7 @@ license: | | celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | | celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 | -| celeborn.storage.hdfs.dir | <undefined> | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 | +| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for read buffer per mount point. | 0.3.0 | | celeborn.worker.closeIdleConnections | false | Whether worker will close idle connections. | 0.2.0 | | celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | From 113f7df7a1fe360206b77b24903c226e9d25ac3b Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 14:42:55 +0800 Subject: [PATCH 04/29] update. --- .../apache/celeborn/common/util/CelebornHadoopUtils.scala | 6 +++++- .../main/scala/org/apache/celeborn/common/util/Utils.scala | 6 ------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 3f73926f388..3e81bf5bb6f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -18,7 +18,7 @@ package org.apache.celeborn.common.util import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging @@ -45,4 +45,8 @@ object CelebornHadoopUtils extends Logging { hadoopConf.set(key.substring("celeborn.hadoop.".length), value) } } + + def getHadoopFS(conf: CelebornConf): FileSystem = { + new Path(conf.hdfsDir).getFileSystem(CelebornHadoopUtils.newConfiguration(conf)) + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index e3c14ff9393..f08169a5bc6 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -38,8 +38,6 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.google.protobuf.{ByteString, GeneratedMessageV3} import io.netty.channel.unix.Errors.NativeIoException import org.apache.commons.lang3.SystemUtils -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} import org.roaringbitmap.RoaringBitmap import org.apache.celeborn.common.CelebornConf @@ -1075,8 +1073,4 @@ object Utils extends Logging { labelPart(0).trim -> labelPart(1).trim } - def getHadoopFS(conf: CelebornConf): FileSystem = { - new Path(conf.hdfsDir).getFileSystem(CelebornHadoopUtils.newConfiguration(conf)) - } - } From cd37620fa36aad1c8e68c9206923350c9374a49f Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 5 Jul 2023 14:43:39 +0800 Subject: [PATCH 05/29] Update common/src/main/scala/org/apache/celeborn/common/util/Utils.scala --- .../src/main/scala/org/apache/celeborn/common/util/Utils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index f08169a5bc6..a4eb900e070 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -1072,5 +1072,4 @@ object Utils extends Logging { } labelPart(0).trim -> labelPart(1).trim } - } From b21b0564d2c4d696c750a325b1118abf9ba30680 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 14:44:09 +0800 Subject: [PATCH 06/29] update. --- .../java/org/apache/celeborn/client/ShuffleClient.java | 4 ++-- .../org/apache/celeborn/service/deploy/master/Master.scala | 7 ++----- .../service/deploy/worker/storage/StorageManager.scala | 3 +-- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index 459f07f4f3a..a36b53fa504 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; +import org.apache.celeborn.common.util.CelebornHadoopUtils; import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +30,6 @@ import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.common.rpc.RpcEndpointRef; -import org.apache.celeborn.common.util.Utils; import org.apache.celeborn.common.write.PushState; /** @@ -85,7 +85,7 @@ public static FileSystem getHdfsFs(CelebornConf conf) { synchronized (ShuffleClient.class) { if (null == hdfsFs) { try { - hdfsFs = Utils.getHadoopFS(conf); + hdfsFs = CelebornHadoopUtils.getHadoopFS(conf); } catch (Exception e) { System.err.println("Celeborn initialize hdfs failed."); e.printStackTrace(System.err); diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 7ba9a3d1e17..6a5f89d8172 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -21,12 +21,9 @@ import java.io.IOException import java.net.BindException import java.util import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} - import scala.collection.JavaConverters._ import scala.util.Random - import org.apache.hadoop.fs.{FileSystem, Path} - import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.identity.UserIdentifier @@ -39,7 +36,7 @@ import org.apache.celeborn.common.protocol.message.{ControlMessages, StatusCode} import org.apache.celeborn.common.protocol.message.ControlMessages._ import org.apache.celeborn.common.quota.{QuotaManager, ResourceConsumption} import org.apache.celeborn.common.rpc._ -import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, ThreadUtils, Utils} +import org.apache.celeborn.common.util.{CelebornHadoopUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils} import org.apache.celeborn.server.common.{HttpService, Service} import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler} @@ -668,7 +665,7 @@ private[celeborn] class Master( private def cleanExpiredAppDirsOnHDFS(): Unit = { val activeAppIds = statusSystem.getActiveAppIds if (hadoopFs == null) { - hadoopFs = Utils.getHadoopFS(conf) + hadoopFs = CelebornHadoopUtils.getHadoopFS(conf) } val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir) if (hadoopFs.exists(hdfsWorkPath)) { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 9af051987b1..774b8f54d09 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -28,7 +28,6 @@ import java.util.function.{BiConsumer, IntUnaryOperator} import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.permission.FsPermission import org.iq80.leveldb.DB @@ -125,7 +124,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val (hdfsFlusher, _totalHdfsFlusherThread) = if (!hdfsDir.isEmpty && conf.hasHDFSStorage) { logInfo(s"Initialize HDFS support with path ${hdfsDir}") - StorageManager.hadoopFs = Utils.getHadoopFS(conf) + StorageManager.hadoopFs = CelebornHadoopUtils.getHadoopFS(conf) ( Some(new HdfsFlusher( workerSource, From dd6f55d85164b5909401b05270c626cad7fe4bc5 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 14:48:22 +0800 Subject: [PATCH 07/29] update. --- .../main/java/org/apache/celeborn/client/ShuffleClient.java | 2 +- .../org/apache/celeborn/service/deploy/master/Master.scala | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index a36b53fa504..6d30ceb94b1 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; -import org.apache.celeborn.common.util.CelebornHadoopUtils; import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +29,7 @@ import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.common.rpc.RpcEndpointRef; +import org.apache.celeborn.common.util.CelebornHadoopUtils; import org.apache.celeborn.common.write.PushState; /** diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 6a5f89d8172..071cfce50c9 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -21,9 +21,12 @@ import java.io.IOException import java.net.BindException import java.util import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} + import scala.collection.JavaConverters._ import scala.util.Random + import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.identity.UserIdentifier From 84ceda8bf6c0049a64656cfa41a97e35f949cfd4 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 14:58:26 +0800 Subject: [PATCH 08/29] address comments. --- .../apache/celeborn/common/util/CelebornHadoopUtils.scala | 3 +-- .../apache/celeborn/service/deploy/master/Master.scala | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 3e81bf5bb6f..dac0e31b226 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -32,8 +32,7 @@ object CelebornHadoopUtils extends Logging { val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) hadoopConf.set("dfs.replication", "2") hadoopConf.set(disableCacheName, "false") - logInfo(s"Celeborn will ignore cluster settings $disableCacheName and " + - "set it to false") + logInfo(s"Celeborn overrides some HDFS settings defined in Hadoop configuration files, including overriding '$disableCacheName=false' and 'dfs.replication=2'. It can be overridden again in the Celeborn configuration with the additional prefix 'celeborn.hadoop.'") } appendSparkHadoopConfigs(conf, hadoopConf) hadoopConf diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 071cfce50c9..7bca93ee1d4 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -401,6 +401,10 @@ private[celeborn] class Master( } } } + // only leader can clean hdfs dirs + if (conf.hasHDFSStorage && !conf.hdfsDir.isEmpty) { + cleanExpiredAppDirsOnHDFS() + } } private def handleHeartbeatFromWorker( @@ -656,10 +660,6 @@ private[celeborn] class Master( override def run(): Unit = { statusSystem.handleAppLost(appId, requestId) logInfo(s"Removed application $appId") - // only leader can clean hdfs dirs - if (conf.hasHDFSStorage && !conf.hdfsDir.isEmpty) { - cleanExpiredAppDirsOnHDFS() - } context.reply(ApplicationLostResponse(StatusCode.SUCCESS)) } }) From acbcdc82d888ac7095b38f74199b66dbb4801ce7 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 5 Jul 2023 14:59:43 +0800 Subject: [PATCH 09/29] Update common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala --- .../org/apache/celeborn/common/util/CelebornHadoopUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index dac0e31b226..0eb0934c84d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -32,7 +32,7 @@ object CelebornHadoopUtils extends Logging { val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) hadoopConf.set("dfs.replication", "2") hadoopConf.set(disableCacheName, "false") - logInfo(s"Celeborn overrides some HDFS settings defined in Hadoop configuration files, including overriding '$disableCacheName=false' and 'dfs.replication=2'. It can be overridden again in the Celeborn configuration with the additional prefix 'celeborn.hadoop.'") + logInfo(s"Celeborn overrides some HDFS settings defined in Hadoop configuration files, including '$disableCacheName=false' and 'dfs.replication=2'. It can be overridden again in the Celeborn configuration with the additional prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'") } appendSparkHadoopConfigs(conf, hadoopConf) hadoopConf From 59b99c36d7962f902d020bfe2b9e34e95b3c5ad3 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 5 Jul 2023 15:00:18 +0800 Subject: [PATCH 10/29] Update common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala --- .../org/apache/celeborn/common/util/CelebornHadoopUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 0eb0934c84d..2dbd4abef0e 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -32,7 +32,7 @@ object CelebornHadoopUtils extends Logging { val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) hadoopConf.set("dfs.replication", "2") hadoopConf.set(disableCacheName, "false") - logInfo(s"Celeborn overrides some HDFS settings defined in Hadoop configuration files, including '$disableCacheName=false' and 'dfs.replication=2'. It can be overridden again in the Celeborn configuration with the additional prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'") + logInfo(s"Celeborn overrides some HDFS settings defined in Hadoop configuration files, including '$disableCacheName=false' and 'dfs.replication=2'. It can be overridden again in Celeborn configuration with the additional prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'") } appendSparkHadoopConfigs(conf, hadoopConf) hadoopConf From 393bb53df72d6a4079193fa4ce131a9ff710fc4e Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 15:26:59 +0800 Subject: [PATCH 11/29] Add new configs to avoid checking frequently. --- common/src/main/proto/TransportMessages.proto | 1 + .../apache/celeborn/common/CelebornConf.scala | 14 +++++- .../protocol/message/ControlMessages.scala | 8 ++++ docs/configuration/master.md | 1 + .../clustermeta/AbstractMetaManager.java | 6 --- .../service/deploy/master/Master.scala | 45 ++++++++++++++----- .../worker/storage/StorageManager.scala | 2 +- 7 files changed, 56 insertions(+), 21 deletions(-) diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 37d320d0b95..89fc385fa3a 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -68,6 +68,7 @@ enum MessageType { PARTITION_SPLIT = 47; REGISTER_MAP_PARTITION_TASK = 48; HEARTBEAT_FROM_APPLICATION_RESPONSE = 49; + CHECK_FOR_HDFS_REMNANT_DIRS_TIMEOUT = 50; } message PbStorageInfo { diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index d2b1cf2c4fc..092edd28476 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -499,8 +499,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def masterSlotAssignPolicy: SlotsAssignPolicy = SlotsAssignPolicy.valueOf(get(MASTER_SLOT_ASSIGN_POLICY)) - def hasHDFSStorage: Boolean = get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) - + def hasHDFSStorage: Boolean = + get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) && get(HDFS_DIR).isDefined def masterSlotAssignLoadAwareDiskGroupNum: Int = get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM) def masterSlotAssignLoadAwareDiskGroupGradient: Double = get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT) @@ -692,6 +692,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientCommitFilesIgnoreExcludedWorkers: Boolean = get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS) def clientRpcMaxParallelism: Int = get(CLIENT_RPC_MAX_PARALLELISM) def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT) + def hdfsRemnantDirsTimeoutMS: Long = get(HDFS_REMNANET_DIRS_TIMEOUT) def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL) def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS) def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT) @@ -1509,6 +1510,15 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("300s") + val HDFS_REMNANET_DIRS_TIMEOUT: ConfigEntry[Long] = + buildConf("celeborn.master.hdfs.remnantDirs.timeout") + .withAlternative("celeborn.application.heartbeat.timeout") + .categories("master") + .version("0.3.0") + .doc("Application heartbeat timeout.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1h") + val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.master.heartbeat.worker.timeout") .withAlternative("celeborn.worker.heartbeat.timeout") diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 2c31d29146b..32806edff77 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -67,6 +67,8 @@ object ControlMessages extends Logging { case object CheckForApplicationTimeOut extends Message + case object CheckForHDFSRemanetDirsTimeout extends Message + case object RemoveExpiredShuffle extends Message /** @@ -427,6 +429,9 @@ object ControlMessages extends Logging { case CheckForApplicationTimeOut => new TransportMessage(MessageType.CHECK_FOR_APPLICATION_TIMEOUT, null) + case CheckForHDFSRemanetDirsTimeout => + new TransportMessage(MessageType.CHECK_FOR_HDFS_REMNANT_DIRS_TIMEOUT, null) + case RemoveExpiredShuffle => new TransportMessage(MessageType.REMOVE_EXPIRED_SHUFFLE, null) @@ -1074,6 +1079,9 @@ object ControlMessages extends Logging { case CHECK_FOR_APPLICATION_TIMEOUT_VALUE => CheckForApplicationTimeOut + case CHECK_FOR_HDFS_REMNANT_DIRS_TIMEOUT_VALUE => + CheckForHDFSRemanetDirsTimeout + case WORKER_LOST_VALUE => PbWorkerLost.parseFrom(message.getPayload) diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 6152a5b633f..5c235b99205 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -22,6 +22,7 @@ license: | | celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of updating partition size for estimation. | 0.3.0 | +| celeborn.master.hdfs.remnantDirs.timeout | 1h | Application heartbeat timeout. | 0.3.0 | | celeborn.master.heartbeat.application.timeout | 300s | Application heartbeat timeout. | 0.3.0 | | celeborn.master.heartbeat.worker.timeout | 120s | Worker heartbeat timeout. | 0.3.0 | | celeborn.master.host | <localhost> | Hostname for master to bind. | 0.2.0 | diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 794f3250193..d0ef66612ce 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -138,12 +138,6 @@ public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, l partitionTotalFileCount.add(fileCount); } - public Set getActiveAppIds() { - return registeredShuffle.stream() - .map(key -> Utils.splitShuffleKey(key)._1) - .collect(Collectors.toSet()); - } - public void updateAppLostMeta(String appId) { registeredShuffle.stream() .filter(shuffle -> shuffle.startsWith(appId)) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 7bca93ee1d4..dc17ba46c6b 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -92,11 +92,13 @@ private[celeborn] class Master( ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _ private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _ + private var checkForHDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _ private val nonEagerHandler = ThreadUtils.newDaemonCachedThreadPool("master-noneager-handler", 64) // Config constants private val workerHeartbeatTimeoutMs = conf.workerHeartbeatTimeout private val appHeartbeatTimeoutMs = conf.appHeartbeatTimeoutMs + private val hdfsRemnantDirsTimeoutMS = conf.hdfsRemnantDirsTimeoutMS private val quotaManager = QuotaManager.instantiate(conf) private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval @@ -184,6 +186,19 @@ private[celeborn] class Master( 0, appHeartbeatTimeoutMs / 2, TimeUnit.MILLISECONDS) + + if (conf.hasHDFSStorage) { + checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleAtFixedRate( + new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + self.send(CheckForHDFSRemanetDirsTimeout) + } + }, + hdfsRemnantDirsTimeoutMS, + hdfsRemnantDirsTimeoutMS, + TimeUnit.MILLISECONDS) + } + } override def onStop(): Unit = { @@ -194,6 +209,9 @@ private[celeborn] class Master( if (checkForApplicationTimeOutTask != null) { checkForApplicationTimeOutTask.cancel(true) } + if (checkForHDFSRemnantDirsTimeOutTask != null) { + checkForHDFSRemnantDirsTimeOutTask.cancel(true) + } forwardMessageThread.shutdownNow() logInfo("Celeborn Master is stopped.") } @@ -218,6 +236,8 @@ private[celeborn] class Master( executeWithLeaderChecker(null, timeoutDeadWorkers()) case CheckForApplicationTimeOut => executeWithLeaderChecker(null, timeoutDeadApplications()) + case CheckForHDFSRemanetDirsTimeout => + executeWithLeaderChecker(null, cleanExpiredAppDirsOnHDFS()) case pb: PbWorkerLost => val host = pb.getHost val rpcPort = pb.getRpcPort @@ -401,10 +421,6 @@ private[celeborn] class Master( } } } - // only leader can clean hdfs dirs - if (conf.hasHDFSStorage && !conf.hdfsDir.isEmpty) { - cleanExpiredAppDirsOnHDFS() - } } private def handleHeartbeatFromWorker( @@ -660,24 +676,29 @@ private[celeborn] class Master( override def run(): Unit = { statusSystem.handleAppLost(appId, requestId) logInfo(s"Removed application $appId") + cleanExpiredAppDirsOnHDFS(appId) context.reply(ApplicationLostResponse(StatusCode.SUCCESS)) } }) } - private def cleanExpiredAppDirsOnHDFS(): Unit = { - val activeAppIds = statusSystem.getActiveAppIds + private def cleanExpiredAppDirsOnHDFS(dir: String = ""): Unit = { if (hadoopFs == null) { hadoopFs = CelebornHadoopUtils.getHadoopFS(conf) } val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir) if (hadoopFs.exists(hdfsWorkPath)) { - val iter = hadoopFs.listStatusIterator(hdfsWorkPath) - while (iter.hasNext) { - val fileStatus = iter.next() - if (!activeAppIds.contains(fileStatus.getPath.getName)) { - logDebug(s"Clean hdfs dir ${fileStatus.getPath.toString}") - hadoopFs.delete(fileStatus.getPath, true) + if (!dir.isEmpty) { + // delete specific app dir on application lost + hadoopFs.delete(new Path(hdfsWorkPath, dir), true) + } else { + val iter = hadoopFs.listStatusIterator(hdfsWorkPath) + while (iter.hasNext) { + val fileStatus = iter.next() + if (!statusSystem.appHeartbeatTime.contains(fileStatus.getPath.getName)) { + logDebug(s"Clean hdfs dir ${fileStatus.getPath.toString}") + hadoopFs.delete(fileStatus.getPath, true) + } } } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 774b8f54d09..f81d5e9c002 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -122,7 +122,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val hdfsPermission = new FsPermission("755") val hdfsWriters = JavaUtils.newConcurrentHashMap[String, FileWriter]() val (hdfsFlusher, _totalHdfsFlusherThread) = - if (!hdfsDir.isEmpty && conf.hasHDFSStorage) { + if (conf.hasHDFSStorage) { logInfo(s"Initialize HDFS support with path ${hdfsDir}") StorageManager.hadoopFs = CelebornHadoopUtils.getHadoopFS(conf) ( From fc2d0d746f01ab33c07671ea1272f6db5044f147 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 15:39:22 +0800 Subject: [PATCH 12/29] fix ut. --- .../java/org/apache/celeborn/client/ShuffleClient.java | 2 +- .../apache/celeborn/client/read/DfsPartitionReader.java | 8 ++++---- .../java/org/apache/celeborn/common/meta/FileInfo.java | 2 +- .../apache/celeborn/common/util/CelebornHadoopUtils.scala | 8 +++++++- .../org/apache/celeborn/common/CelebornConfSuite.scala | 1 + .../org/apache/celeborn/common/util/UtilsSuite.scala | 2 +- .../apache/celeborn/service/deploy/master/Master.scala | 6 ++++-- .../deploy/worker/storage/PartitionFilesSorter.java | 4 ++-- .../service/deploy/worker/storage/StorageManager.scala | 4 ++-- 9 files changed, 23 insertions(+), 14 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index 6d30ceb94b1..4fc119ca14f 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -87,7 +87,7 @@ public static FileSystem getHdfsFs(CelebornConf conf) { try { hdfsFs = CelebornHadoopUtils.getHadoopFS(conf); } catch (Exception e) { - System.err.println("Celeborn initialize hdfs failed."); + System.err.println("Celeborn initialize HDFS failed."); e.printStackTrace(System.err); } } diff --git a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java index ff06febca75..316f11ef167 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java @@ -84,7 +84,7 @@ public DfsPartitionReader( // Parse this message to ensure sort is done. } catch (IOException | InterruptedException e) { throw new IOException( - "read shuffle file from hdfs failed, filePath: " + "read shuffle file from HDFS failed, filePath: " + location.getStorageInfo().getFilePath(), e); } @@ -121,7 +121,7 @@ public DfsPartitionReader( hdfsInputStream.readFully(offset, buffer); } catch (IOException e) { logger.warn( - "read hdfs {} failed will retry, error detail {}", + "read HDFS {} failed will retry, error detail {}", location.getStorageInfo().getFilePath(), e); try { @@ -135,7 +135,7 @@ public DfsPartitionReader( hdfsInputStream.readFully(offset, buffer); } catch (IOException ex) { logger.warn( - "retry read hdfs {} failed, error detail {} ", + "retry read HDFS {} failed, error detail {} ", location.getStorageInfo().getFilePath(), e); exception.set(ex); @@ -238,7 +238,7 @@ public void close() { try { hdfsInputStream.close(); } catch (IOException e) { - logger.warn("close hdfs input stream failed.", e); + logger.warn("close HDFS input stream failed.", e); } if (results.size() > 0) { results.forEach(ReferenceCounted::release); diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java index cffd7cd10f3..426f9285094 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java @@ -169,7 +169,7 @@ public void deleteAllFiles(FileSystem hdfsFs) { } catch (Exception e) { // ignore delete exceptions because some other workers might be deleting the directory logger.debug( - "delete hdfs file {},{},{},{} failed {}", + "delete HDFS file {},{},{},{} failed {}", getHdfsPath(), getHdfsWriterSuccessPath(), getHdfsIndexPath(), diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 2dbd4abef0e..5892542c27c 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -17,6 +17,8 @@ package org.apache.celeborn.common.util +import java.util.concurrent.atomic.AtomicBoolean + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -24,6 +26,7 @@ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging object CelebornHadoopUtils extends Logging { + var logFlag = new AtomicBoolean(false) private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = { val hadoopConf = new Configuration() if (!conf.hdfsDir.isEmpty) { @@ -32,7 +35,10 @@ object CelebornHadoopUtils extends Logging { val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) hadoopConf.set("dfs.replication", "2") hadoopConf.set(disableCacheName, "false") - logInfo(s"Celeborn overrides some HDFS settings defined in Hadoop configuration files, including '$disableCacheName=false' and 'dfs.replication=2'. It can be overridden again in Celeborn configuration with the additional prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'") + if (logFlag.compareAndSet(false, true)) { + logInfo( + s"Celeborn overrides some HDFS settings defined in Hadoop configuration files, including '$disableCacheName=false' and 'dfs.replication=2'. It can be overridden again in Celeborn configuration with the additional prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'") + } } appendSparkHadoopConfigs(conf, hadoopConf) hadoopConf diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala index 4c5ce3b558b..2bfff2186a2 100644 --- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala @@ -202,6 +202,7 @@ class CelebornConfSuite extends CelebornFunSuite { test("Test commit file threads") { val conf = new CelebornConf() conf.set("celeborn.storage.activeTypes", "HDFS") + conf.set("celeborn.storage.hdfs.dir", "hdfds:///xxx") assert(conf.workerCommitThreads === 128) conf.set("celeborn.storage.activeTypes", "SDD,HDD") diff --git a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala index cca2439d085..e3e812a0db0 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala @@ -105,7 +105,7 @@ class UtilsSuite extends CelebornFunSuite { assert(mapperEnd == mapperEndTrans) } - test("validate hdfs compatible fs path") { + test("validate HDFS compatible fs path") { val hdfsPath = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x" val simpleHdfsPath = "hdfs:///xxxx/xx-xx/x-x-x" val sortedHdfsPath = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x.sorted" diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index dc17ba46c6b..6bbab61ea73 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -676,7 +676,9 @@ private[celeborn] class Master( override def run(): Unit = { statusSystem.handleAppLost(appId, requestId) logInfo(s"Removed application $appId") - cleanExpiredAppDirsOnHDFS(appId) + if (conf.hasHDFSStorage) { + cleanExpiredAppDirsOnHDFS(appId) + } context.reply(ApplicationLostResponse(StatusCode.SUCCESS)) } }) @@ -696,7 +698,7 @@ private[celeborn] class Master( while (iter.hasNext) { val fileStatus = iter.next() if (!statusSystem.appHeartbeatTime.contains(fileStatus.getPath.getName)) { - logDebug(s"Clean hdfs dir ${fileStatus.getPath.toString}") + logDebug(s"Clean HDFS dir ${fileStatus.getPath.toString}") hadoopFs.delete(fileStatus.getPath, true) } } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index f7963c5d64a..1d88069a820 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -193,8 +193,8 @@ public FileInfo getSortedFileInfo( throw new IOException( "Sort scheduler thread is interrupted means worker is shutting down.", e); } catch (IOException e) { - logger.error("File sorter access hdfs failed.", e); - throw new IOException("File sorter access hdfs failed.", e); + logger.error("File sorter access HDFS failed.", e); + throw new IOException("File sorter access HDFS failed.", e); } } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index f81d5e9c002..2178f117ecd 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -473,7 +473,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId"), true) } catch { - case e: Exception => logWarning("Clean expired hdfs shuffle failed.", e) + case e: Exception => logWarning("Clean expired HDFS shuffle failed.", e) } } } @@ -570,7 +570,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val hdfsCleaned = hadoopFs match { case hdfs: FileSystem => val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir) - // hdfs path not exist when first time initialize + // HDFS path not exist when first time initialize if (hdfs.exists(hdfsWorkPath)) { !hdfs.listFiles(hdfsWorkPath, false).hasNext } else { From f23aca456aabfead7079aacb98f8cb92ec075192 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 15:46:04 +0800 Subject: [PATCH 13/29] update spelling and docs. --- .../scala/org/apache/celeborn/common/CelebornConf.scala | 7 +++---- docs/configuration/master.md | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 092edd28476..12842cdd884 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -692,7 +692,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientCommitFilesIgnoreExcludedWorkers: Boolean = get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS) def clientRpcMaxParallelism: Int = get(CLIENT_RPC_MAX_PARALLELISM) def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT) - def hdfsRemnantDirsTimeoutMS: Long = get(HDFS_REMNANET_DIRS_TIMEOUT) + def hdfsRemnantDirsTimeoutMS: Long = get(HDFS_REMNANTDIRS_TIMEOUT) def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL) def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS) def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT) @@ -1510,12 +1510,11 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("300s") - val HDFS_REMNANET_DIRS_TIMEOUT: ConfigEntry[Long] = + val HDFS_REMNANTDIRS_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.master.hdfs.remnantDirs.timeout") - .withAlternative("celeborn.application.heartbeat.timeout") .categories("master") .version("0.3.0") - .doc("Application heartbeat timeout.") + .doc("Timeout before HDFS remnant shuffle dirs are deleted.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1h") diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 5c235b99205..9a3d41df6bb 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -22,7 +22,7 @@ license: | | celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of updating partition size for estimation. | 0.3.0 | -| celeborn.master.hdfs.remnantDirs.timeout | 1h | Application heartbeat timeout. | 0.3.0 | +| celeborn.master.hdfs.remnantDirs.timeout | 1h | Timeout before HDFS remnant shuffle dirs are deleted. | 0.3.0 | | celeborn.master.heartbeat.application.timeout | 300s | Application heartbeat timeout. | 0.3.0 | | celeborn.master.heartbeat.worker.timeout | 120s | Worker heartbeat timeout. | 0.3.0 | | celeborn.master.host | <localhost> | Hostname for master to bind. | 0.2.0 | From ad21966d799ad9ed77d63a28b20cc8ce91adb487 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 15:51:29 +0800 Subject: [PATCH 14/29] fix ut. --- .../scala/org/apache/celeborn/common/CelebornConfSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala index 2bfff2186a2..f229518d9f4 100644 --- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala @@ -190,9 +190,11 @@ class CelebornConfSuite extends CelebornFunSuite { test("Test empty working dir") { val conf = new CelebornConf() conf.set("celeborn.storage.activeTypes", "HDFS") + conf.set("celeborn.storage.hdfs.dir", "hdfds:///xxx") assert(conf.workerBaseDirs.isEmpty) conf.set("celeborn.storage.activeTypes", "SDD,HDD,HDFS") + conf.set("celeborn.storage.hdfs.dir", "hdfds:///xxx") assert(conf.workerBaseDirs.isEmpty) conf.set("celeborn.storage.activeTypes", "SDD,HDD") From 760b9244e6efa41dfc3a03923281fb7dbb93e663 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 17:41:43 +0800 Subject: [PATCH 15/29] test on cluster. --- .../org/apache/celeborn/common/util/CelebornHadoopUtils.scala | 2 +- .../org/apache/celeborn/service/deploy/master/Master.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 5892542c27c..a2e5811617f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -52,6 +52,6 @@ object CelebornHadoopUtils extends Logging { } def getHadoopFS(conf: CelebornConf): FileSystem = { - new Path(conf.hdfsDir).getFileSystem(CelebornHadoopUtils.newConfiguration(conf)) + new Path(conf.hdfsDir).getFileSystem(newConfiguration(conf)) } } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 6bbab61ea73..5c779dd3bd0 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -697,7 +697,7 @@ private[celeborn] class Master( val iter = hadoopFs.listStatusIterator(hdfsWorkPath) while (iter.hasNext) { val fileStatus = iter.next() - if (!statusSystem.appHeartbeatTime.contains(fileStatus.getPath.getName)) { + if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { logDebug(s"Clean HDFS dir ${fileStatus.getPath.toString}") hadoopFs.delete(fileStatus.getPath, true) } From 8df0d4bc5931fede867dbe10836ccbf2f37865e7 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 19:26:39 +0800 Subject: [PATCH 16/29] address comments. --- .../common/protocol/message/ControlMessages.scala | 2 +- .../apache/celeborn/common/CelebornConfSuite.scala | 6 +++--- .../celeborn/service/deploy/master/Master.scala | 11 ++++++++--- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 32806edff77..b7cb2915b3f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -67,7 +67,7 @@ object ControlMessages extends Logging { case object CheckForApplicationTimeOut extends Message - case object CheckForHDFSRemanetDirsTimeout extends Message + case object CheckForHDFSRemnantDirsTimeout extends Message case object RemoveExpiredShuffle extends Message diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala index f229518d9f4..3105d25d744 100644 --- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala @@ -190,11 +190,11 @@ class CelebornConfSuite extends CelebornFunSuite { test("Test empty working dir") { val conf = new CelebornConf() conf.set("celeborn.storage.activeTypes", "HDFS") - conf.set("celeborn.storage.hdfs.dir", "hdfds:///xxx") + conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx") assert(conf.workerBaseDirs.isEmpty) conf.set("celeborn.storage.activeTypes", "SDD,HDD,HDFS") - conf.set("celeborn.storage.hdfs.dir", "hdfds:///xxx") + conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx") assert(conf.workerBaseDirs.isEmpty) conf.set("celeborn.storage.activeTypes", "SDD,HDD") @@ -204,7 +204,7 @@ class CelebornConfSuite extends CelebornFunSuite { test("Test commit file threads") { val conf = new CelebornConf() conf.set("celeborn.storage.activeTypes", "HDFS") - conf.set("celeborn.storage.hdfs.dir", "hdfds:///xxx") + conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx") assert(conf.workerCommitThreads === 128) conf.set("celeborn.storage.activeTypes", "SDD,HDD") diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 5c779dd3bd0..29036707fab 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -23,6 +23,7 @@ import java.util import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.util.Random import org.apache.hadoop.fs.{FileSystem, Path} @@ -191,7 +192,7 @@ private[celeborn] class Master( checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleAtFixedRate( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - self.send(CheckForHDFSRemanetDirsTimeout) + self.send(CheckForHDFSRemnantDirsTimeout) } }, hdfsRemnantDirsTimeoutMS, @@ -236,7 +237,7 @@ private[celeborn] class Master( executeWithLeaderChecker(null, timeoutDeadWorkers()) case CheckForApplicationTimeOut => executeWithLeaderChecker(null, timeoutDeadApplications()) - case CheckForHDFSRemanetDirsTimeout => + case CheckForHDFSRemnantDirsTimeout => executeWithLeaderChecker(null, cleanExpiredAppDirsOnHDFS()) case pb: PbWorkerLost => val host = pb.getHost @@ -694,14 +695,18 @@ private[celeborn] class Master( // delete specific app dir on application lost hadoopFs.delete(new Path(hdfsWorkPath, dir), true) } else { + val startTime = System.currentTimeMillis() + val cleanDirs = new mutable.ListBuffer[String] val iter = hadoopFs.listStatusIterator(hdfsWorkPath) while (iter.hasNext) { val fileStatus = iter.next() if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { - logDebug(s"Clean HDFS dir ${fileStatus.getPath.toString}") + cleanDirs.append(fileStatus.getPath.toString) hadoopFs.delete(fileStatus.getPath, true) } } + logInfo( + s"Clean all remnant HDFS dirs using ${System.currentTimeMillis() - startTime} ms, cleaned ${cleanDirs.mkString(",")}") } } } From af59f07c4c8e53a179d9efdc973ad2b7887cea3f Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 19:31:18 +0800 Subject: [PATCH 17/29] update. --- .../celeborn/common/protocol/message/ControlMessages.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index b7cb2915b3f..895f6f49422 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -429,7 +429,7 @@ object ControlMessages extends Logging { case CheckForApplicationTimeOut => new TransportMessage(MessageType.CHECK_FOR_APPLICATION_TIMEOUT, null) - case CheckForHDFSRemanetDirsTimeout => + case CheckForHDFSRemnantDirsTimeout => new TransportMessage(MessageType.CHECK_FOR_HDFS_REMNANT_DIRS_TIMEOUT, null) case RemoveExpiredShuffle => @@ -1080,7 +1080,7 @@ object ControlMessages extends Logging { CheckForApplicationTimeOut case CHECK_FOR_HDFS_REMNANT_DIRS_TIMEOUT_VALUE => - CheckForHDFSRemanetDirsTimeout + CheckForHDFSRemnantDirsTimeout case WORKER_LOST_VALUE => PbWorkerLost.parseFrom(message.getPayload) From 22b69896af959d9094b50a941be618ec7470db66 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 19:43:12 +0800 Subject: [PATCH 18/29] refine. --- .../protocol/message/ControlMessages.scala | 1 - .../celeborn/service/deploy/master/Master.scala | 16 ++++++++-------- .../deploy/worker/storage/StorageManager.scala | 8 +++++--- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 895f6f49422..5afe4478dd5 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -19,7 +19,6 @@ package org.apache.celeborn.common.protocol.message import java.util import java.util.UUID -import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 29036707fab..30fa65de49f 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -100,6 +100,7 @@ private[celeborn] class Master( private val workerHeartbeatTimeoutMs = conf.workerHeartbeatTimeout private val appHeartbeatTimeoutMs = conf.appHeartbeatTimeoutMs private val hdfsRemnantDirsTimeoutMS = conf.hdfsRemnantDirsTimeoutMS + private val hasHDFSStorage = conf.hasHDFSStorage private val quotaManager = QuotaManager.instantiate(conf) private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval @@ -188,7 +189,7 @@ private[celeborn] class Master( appHeartbeatTimeoutMs / 2, TimeUnit.MILLISECONDS) - if (conf.hasHDFSStorage) { + if (hasHDFSStorage) { checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleAtFixedRate( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { @@ -568,7 +569,7 @@ private[celeborn] class Master( val slots = masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") { statusSystem.workers.synchronized { - if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && !conf.hasHDFSStorage) { + if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && !hasHDFSStorage) { SlotsAllocator.offerSlotsLoadAware( availableWorkers, requestSlots.partitionIdList, @@ -677,7 +678,7 @@ private[celeborn] class Master( override def run(): Unit = { statusSystem.handleAppLost(appId, requestId) logInfo(s"Removed application $appId") - if (conf.hasHDFSStorage) { + if (hasHDFSStorage) { cleanExpiredAppDirsOnHDFS(appId) } context.reply(ApplicationLostResponse(StatusCode.SUCCESS)) @@ -695,18 +696,17 @@ private[celeborn] class Master( // delete specific app dir on application lost hadoopFs.delete(new Path(hdfsWorkPath, dir), true) } else { - val startTime = System.currentTimeMillis() - val cleanDirs = new mutable.ListBuffer[String] val iter = hadoopFs.listStatusIterator(hdfsWorkPath) while (iter.hasNext) { + val startTime = System.currentTimeMillis() val fileStatus = iter.next() if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { - cleanDirs.append(fileStatus.getPath.toString) + logInfo( + s"Clean HDFS dir ${fileStatus.getPath.toString} using ${Utils.msDurationToString( + System.currentTimeMillis() - startTime)}") hadoopFs.delete(fileStatus.getPath, true) } } - logInfo( - s"Clean all remnant HDFS dirs using ${System.currentTimeMillis() - startTime} ms, cleaned ${cleanDirs.mkString(",")}") } } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 2178f117ecd..37fe0581d75 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -51,6 +51,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val workingDirWriters = JavaUtils.newConcurrentHashMap[File, ConcurrentHashMap[String, FileWriter]]() + val hasHDFSStorage = conf.hasHDFSStorage + // (deviceName -> deviceInfo) and (mount point -> diskInfo) val (deviceInfos, diskInfos) = { val workingDirInfos = @@ -58,7 +60,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs (new File(workdir, conf.workerWorkingDir), maxSpace, flusherThread, storageType) } - if (workingDirInfos.size <= 0 && !conf.hasHDFSStorage) { + if (workingDirInfos.size <= 0 && !hasHDFSStorage) { throw new IOException("Empty working directory configuration!") } @@ -122,7 +124,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val hdfsPermission = new FsPermission("755") val hdfsWriters = JavaUtils.newConcurrentHashMap[String, FileWriter]() val (hdfsFlusher, _totalHdfsFlusherThread) = - if (conf.hasHDFSStorage) { + if (hasHDFSStorage) { logInfo(s"Initialize HDFS support with path ${hdfsDir}") StorageManager.hadoopFs = CelebornHadoopUtils.getHadoopFS(conf) ( @@ -254,7 +256,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs partitionType: PartitionType, rangeReadFilter: Boolean, userIdentifier: UserIdentifier): FileWriter = { - if (healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage) { + if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage) { throw new IOException("No available working dirs!") } From 9e47b1a7e47fde1d4d47029db40018d965b0df7c Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 5 Jul 2023 19:50:56 +0800 Subject: [PATCH 19/29] Update master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala --- .../org/apache/celeborn/service/deploy/master/Master.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 30fa65de49f..4e21d6d3ce1 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -702,8 +702,8 @@ private[celeborn] class Master( val fileStatus = iter.next() if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { logInfo( - s"Clean HDFS dir ${fileStatus.getPath.toString} using ${Utils.msDurationToString( - System.currentTimeMillis() - startTime)}") + s"Clean HDFS dir ${fileStatus.getPath.toString} costs + ${Utils.msDurationToString(System.currentTimeMillis() - startTime)}") hadoopFs.delete(fileStatus.getPath, true) } } From ff715b8805819a0c23fa406adf801df558ada1ed Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 5 Jul 2023 19:51:41 +0800 Subject: [PATCH 20/29] Update master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala --- .../org/apache/celeborn/service/deploy/master/Master.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 4e21d6d3ce1..477892dcf35 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -702,8 +702,8 @@ private[celeborn] class Master( val fileStatus = iter.next() if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { logInfo( - s"Clean HDFS dir ${fileStatus.getPath.toString} costs - ${Utils.msDurationToString(System.currentTimeMillis() - startTime)}") + s"Clean HDFS dir ${fileStatus.getPath.toString} costs " + + Utils.msDurationToString(System.currentTimeMillis() - startTime)) hadoopFs.delete(fileStatus.getPath, true) } } From 59d0869d1d84d502fa92573a48f7a56c479d4bc0 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 5 Jul 2023 19:52:34 +0800 Subject: [PATCH 21/29] Update master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala --- .../org/apache/celeborn/service/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 477892dcf35..387748bd478 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -701,10 +701,10 @@ private[celeborn] class Master( val startTime = System.currentTimeMillis() val fileStatus = iter.next() if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { + hadoopFs.delete(fileStatus.getPath, true) logInfo( s"Clean HDFS dir ${fileStatus.getPath.toString} costs " + Utils.msDurationToString(System.currentTimeMillis() - startTime)) - hadoopFs.delete(fileStatus.getPath, true) } } } From 2999a49f6d749e388247bfc02414be543c17b142 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 5 Jul 2023 19:54:04 +0800 Subject: [PATCH 22/29] Update master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala --- .../org/apache/celeborn/service/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 387748bd478..9500d4f8182 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -703,7 +703,7 @@ private[celeborn] class Master( if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { hadoopFs.delete(fileStatus.getPath, true) logInfo( - s"Clean HDFS dir ${fileStatus.getPath.toString} costs " + + s"Clean HDFS dir ${fileStatus.getPath} costs " + Utils.msDurationToString(System.currentTimeMillis() - startTime)) } } From 1d793b7bf2e839b046261ba131f5f53c3fa572cc Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 19:59:23 +0800 Subject: [PATCH 23/29] refine. --- .../apache/celeborn/common/util/CelebornHadoopUtils.scala | 7 +++++-- .../org/apache/celeborn/service/deploy/master/Master.scala | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index a2e5811617f..cd44ff9c299 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -29,7 +29,7 @@ object CelebornHadoopUtils extends Logging { var logFlag = new AtomicBoolean(false) private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = { val hadoopConf = new Configuration() - if (!conf.hdfsDir.isEmpty) { + if (conf.hdfsDir.nonEmpty) { val path = new Path(conf.hdfsDir) val scheme = path.toUri.getScheme val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) @@ -37,7 +37,10 @@ object CelebornHadoopUtils extends Logging { hadoopConf.set(disableCacheName, "false") if (logFlag.compareAndSet(false, true)) { logInfo( - s"Celeborn overrides some HDFS settings defined in Hadoop configuration files, including '$disableCacheName=false' and 'dfs.replication=2'. It can be overridden again in Celeborn configuration with the additional prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'") + s"""Celeborn overrides some HDFS settings defined in Hadoop configuration files, + including '$disableCacheName=false' and 'dfs.replication=2'. + It can be overridden again in Celeborn configuration with the additional + prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'""") } } appendSparkHadoopConfigs(conf, hadoopConf) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 30fa65de49f..08e091c1df7 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -702,7 +702,7 @@ private[celeborn] class Master( val fileStatus = iter.next() if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { logInfo( - s"Clean HDFS dir ${fileStatus.getPath.toString} using ${Utils.msDurationToString( + s"Clean HDFS dir ${fileStatus.getPath} costs ${Utils.msDurationToString( System.currentTimeMillis() - startTime)}") hadoopFs.delete(fileStatus.getPath, true) } From 2fbcc5770dcd7302d663a0cbf98c8c91ea262164 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 20:10:20 +0800 Subject: [PATCH 24/29] refine. --- .../apache/celeborn/common/util/CelebornHadoopUtils.scala | 8 ++++---- .../apache/celeborn/service/deploy/master/Master.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index cd44ff9c299..75143662aee 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -37,10 +37,10 @@ object CelebornHadoopUtils extends Logging { hadoopConf.set(disableCacheName, "false") if (logFlag.compareAndSet(false, true)) { logInfo( - s"""Celeborn overrides some HDFS settings defined in Hadoop configuration files, - including '$disableCacheName=false' and 'dfs.replication=2'. - It can be overridden again in Celeborn configuration with the additional - prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'""") + "Celeborn overrides some HDFS settings defined in Hadoop configuration files, " + + s"including '$disableCacheName=false' and 'dfs.replication=2'. " + + "It can be overridden again in Celeborn configuration with the additional " + + "prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'") } } appendSparkHadoopConfigs(conf, hadoopConf) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 9500d4f8182..99b288068f9 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -694,14 +694,14 @@ private[celeborn] class Master( if (hadoopFs.exists(hdfsWorkPath)) { if (!dir.isEmpty) { // delete specific app dir on application lost - hadoopFs.delete(new Path(hdfsWorkPath, dir), true) + Utils.tryLogNonFatalError(hadoopFs.delete(new Path(hdfsWorkPath, dir), true)) } else { val iter = hadoopFs.listStatusIterator(hdfsWorkPath) while (iter.hasNext) { val startTime = System.currentTimeMillis() val fileStatus = iter.next() if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { - hadoopFs.delete(fileStatus.getPath, true) + Utils.tryLogNonFatalError(hadoopFs.delete(new Path(hdfsWorkPath, dir), true)) logInfo( s"Clean HDFS dir ${fileStatus.getPath} costs " + Utils.msDurationToString(System.currentTimeMillis() - startTime)) From 94991215a0ab34e27792611fea41faeb4f06daf1 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 20:42:59 +0800 Subject: [PATCH 25/29] refine. --- common/src/main/proto/TransportMessages.proto | 2 +- .../apache/celeborn/common/CelebornConf.scala | 8 +++---- .../protocol/message/ControlMessages.scala | 10 ++++---- .../common/util/CelebornHadoopUtils.scala | 4 ++-- .../apache/celeborn/common/util/Utils.scala | 10 ++++++++ docs/configuration/master.md | 2 +- .../service/deploy/master/Master.scala | 24 ++++++++++--------- 7 files changed, 36 insertions(+), 24 deletions(-) diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 89fc385fa3a..f168941b7a1 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -68,7 +68,7 @@ enum MessageType { PARTITION_SPLIT = 47; REGISTER_MAP_PARTITION_TASK = 48; HEARTBEAT_FROM_APPLICATION_RESPONSE = 49; - CHECK_FOR_HDFS_REMNANT_DIRS_TIMEOUT = 50; + CHECK_FOR_HDFS_EXPIRE_DIRS_TIMEOUT = 50; } message PbStorageInfo { diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 12842cdd884..57dffd4a8ef 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -692,7 +692,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientCommitFilesIgnoreExcludedWorkers: Boolean = get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS) def clientRpcMaxParallelism: Int = get(CLIENT_RPC_MAX_PARALLELISM) def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT) - def hdfsRemnantDirsTimeoutMS: Long = get(HDFS_REMNANTDIRS_TIMEOUT) + def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT) def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL) def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS) def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT) @@ -1510,11 +1510,11 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("300s") - val HDFS_REMNANTDIRS_TIMEOUT: ConfigEntry[Long] = - buildConf("celeborn.master.hdfs.remnantDirs.timeout") + val HDFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] = + buildConf("celeborn.master.hdfs.expireDirs.timeout") .categories("master") .version("0.3.0") - .doc("Timeout before HDFS remnant shuffle dirs are deleted.") + .doc("The timeout for a expire dirs to be deleted on HDFS.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1h") diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 5afe4478dd5..f6669fcc04a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -66,7 +66,7 @@ object ControlMessages extends Logging { case object CheckForApplicationTimeOut extends Message - case object CheckForHDFSRemnantDirsTimeout extends Message + case object CheckForHDFSExpireDirsTimeout extends Message case object RemoveExpiredShuffle extends Message @@ -428,8 +428,8 @@ object ControlMessages extends Logging { case CheckForApplicationTimeOut => new TransportMessage(MessageType.CHECK_FOR_APPLICATION_TIMEOUT, null) - case CheckForHDFSRemnantDirsTimeout => - new TransportMessage(MessageType.CHECK_FOR_HDFS_REMNANT_DIRS_TIMEOUT, null) + case CheckForHDFSExpireDirsTimeout => + new TransportMessage(MessageType.CHECK_FOR_HDFS_EXPIRE_DIRS_TIMEOUT, null) case RemoveExpiredShuffle => new TransportMessage(MessageType.REMOVE_EXPIRED_SHUFFLE, null) @@ -1078,8 +1078,8 @@ object ControlMessages extends Logging { case CHECK_FOR_APPLICATION_TIMEOUT_VALUE => CheckForApplicationTimeOut - case CHECK_FOR_HDFS_REMNANT_DIRS_TIMEOUT_VALUE => - CheckForHDFSRemnantDirsTimeout + case CHECK_FOR_HDFS_EXPIRE_DIRS_TIMEOUT_VALUE => + CheckForHDFSExpireDirsTimeout case WORKER_LOST_VALUE => PbWorkerLost.parseFrom(message.getPayload) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 75143662aee..bf6d43772e2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -26,7 +26,7 @@ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging object CelebornHadoopUtils extends Logging { - var logFlag = new AtomicBoolean(false) + private var logPrinted = new AtomicBoolean(false) private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = { val hadoopConf = new Configuration() if (conf.hdfsDir.nonEmpty) { @@ -35,7 +35,7 @@ object CelebornHadoopUtils extends Logging { val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) hadoopConf.set("dfs.replication", "2") hadoopConf.set(disableCacheName, "false") - if (logFlag.compareAndSet(false, true)) { + if (logPrinted.compareAndSet(false, true)) { logInfo( "Celeborn overrides some HDFS settings defined in Hadoop configuration files, " + s"including '$disableCacheName=false' and 'dfs.replication=2'. " + diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index a4eb900e070..00d7964e868 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -38,6 +38,7 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.google.protobuf.{ByteString, GeneratedMessageV3} import io.netty.channel.unix.Errors.NativeIoException import org.apache.commons.lang3.SystemUtils +import org.apache.hadoop.fs.Path import org.roaringbitmap.RoaringBitmap import org.apache.celeborn.common.CelebornConf @@ -211,6 +212,15 @@ object Utils extends Logging { } } + def tryLogDeleteHadoopFSError(block: => Unit, path: Path): Unit = { + try { + block + } catch { + case e: IOException => + logError(s"Delete HDFS File ${path} failed", e) + } + } + def tryOrExit(block: => Unit): Unit = { try { block diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 9a3d41df6bb..695b3d84a70 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -22,7 +22,7 @@ license: | | celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of updating partition size for estimation. | 0.3.0 | -| celeborn.master.hdfs.remnantDirs.timeout | 1h | Timeout before HDFS remnant shuffle dirs are deleted. | 0.3.0 | +| celeborn.master.hdfs.expireDirs.timeout | 1h | The timeout for a expire dirs to be deleted on HDFS. | 0.3.0 | | celeborn.master.heartbeat.application.timeout | 300s | Application heartbeat timeout. | 0.3.0 | | celeborn.master.heartbeat.worker.timeout | 120s | Worker heartbeat timeout. | 0.3.0 | | celeborn.master.host | <localhost> | Hostname for master to bind. | 0.2.0 | diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 99b288068f9..a28f37bf13c 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -99,7 +99,7 @@ private[celeborn] class Master( // Config constants private val workerHeartbeatTimeoutMs = conf.workerHeartbeatTimeout private val appHeartbeatTimeoutMs = conf.appHeartbeatTimeoutMs - private val hdfsRemnantDirsTimeoutMS = conf.hdfsRemnantDirsTimeoutMS + private val hdfsExpireDirsTimeoutMS = conf.hdfsExpireDirsTimeoutMS private val hasHDFSStorage = conf.hasHDFSStorage private val quotaManager = QuotaManager.instantiate(conf) @@ -193,11 +193,11 @@ private[celeborn] class Master( checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleAtFixedRate( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - self.send(CheckForHDFSRemnantDirsTimeout) + self.send(CheckForHDFSExpireDirsTimeout) } }, - hdfsRemnantDirsTimeoutMS, - hdfsRemnantDirsTimeoutMS, + hdfsExpireDirsTimeoutMS, + hdfsExpireDirsTimeoutMS, TimeUnit.MILLISECONDS) } @@ -238,8 +238,8 @@ private[celeborn] class Master( executeWithLeaderChecker(null, timeoutDeadWorkers()) case CheckForApplicationTimeOut => executeWithLeaderChecker(null, timeoutDeadApplications()) - case CheckForHDFSRemnantDirsTimeout => - executeWithLeaderChecker(null, cleanExpiredAppDirsOnHDFS()) + case CheckForHDFSExpireDirsTimeout => + executeWithLeaderChecker(null, checkAndCleanExpiredAppDirsOnHDFS()) case pb: PbWorkerLost => val host = pb.getHost val rpcPort = pb.getRpcPort @@ -679,29 +679,31 @@ private[celeborn] class Master( statusSystem.handleAppLost(appId, requestId) logInfo(s"Removed application $appId") if (hasHDFSStorage) { - cleanExpiredAppDirsOnHDFS(appId) + checkAndCleanExpiredAppDirsOnHDFS(appId) } context.reply(ApplicationLostResponse(StatusCode.SUCCESS)) } }) } - private def cleanExpiredAppDirsOnHDFS(dir: String = ""): Unit = { + private def checkAndCleanExpiredAppDirsOnHDFS(dir: String = ""): Unit = { if (hadoopFs == null) { hadoopFs = CelebornHadoopUtils.getHadoopFS(conf) } val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir) if (hadoopFs.exists(hdfsWorkPath)) { if (!dir.isEmpty) { + val dirToDelete = new Path(hdfsWorkPath, dir) // delete specific app dir on application lost - Utils.tryLogNonFatalError(hadoopFs.delete(new Path(hdfsWorkPath, dir), true)) + Utils.tryLogDeleteHadoopFSError(hadoopFs.delete(dirToDelete, true), dirToDelete) } else { val iter = hadoopFs.listStatusIterator(hdfsWorkPath) - while (iter.hasNext) { + while (iter.hasNext && isMasterActive == 1) { val startTime = System.currentTimeMillis() val fileStatus = iter.next() + val dirToDelete = new Path(hdfsWorkPath, dir) if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { - Utils.tryLogNonFatalError(hadoopFs.delete(new Path(hdfsWorkPath, dir), true)) + Utils.tryLogDeleteHadoopFSError(hadoopFs.delete(dirToDelete, true), dirToDelete) logInfo( s"Clean HDFS dir ${fileStatus.getPath} costs " + Utils.msDurationToString(System.currentTimeMillis() - startTime)) From 1643cb6b0bbe01ebbdd61fef6e461a348faf59f4 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 20:46:34 +0800 Subject: [PATCH 26/29] refine. --- .../main/java/org/apache/celeborn/client/ShuffleClient.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index 4fc119ca14f..b77c827805e 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -87,8 +87,7 @@ public static FileSystem getHdfsFs(CelebornConf conf) { try { hdfsFs = CelebornHadoopUtils.getHadoopFS(conf); } catch (Exception e) { - System.err.println("Celeborn initialize HDFS failed."); - e.printStackTrace(System.err); + logger.error("Celeborn initialize HDFS failed.", e); } } } From 93689d9eff2fe62f2133bcf6aa7e826f00b69555 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 20:59:21 +0800 Subject: [PATCH 27/29] refine. --- common/src/main/proto/TransportMessages.proto | 2 +- .../protocol/message/ControlMessages.scala | 10 +++++----- .../org/apache/celeborn/common/util/Utils.scala | 2 +- .../celeborn/service/deploy/master/Master.scala | 17 +++++++++-------- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index f168941b7a1..50dcdc7a1d6 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -68,7 +68,7 @@ enum MessageType { PARTITION_SPLIT = 47; REGISTER_MAP_PARTITION_TASK = 48; HEARTBEAT_FROM_APPLICATION_RESPONSE = 49; - CHECK_FOR_HDFS_EXPIRE_DIRS_TIMEOUT = 50; + CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT = 50; } message PbStorageInfo { diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index f6669fcc04a..a0ba251a873 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -66,7 +66,7 @@ object ControlMessages extends Logging { case object CheckForApplicationTimeOut extends Message - case object CheckForHDFSExpireDirsTimeout extends Message + case object CheckForHDFSExpiredDirsTimeout extends Message case object RemoveExpiredShuffle extends Message @@ -428,8 +428,8 @@ object ControlMessages extends Logging { case CheckForApplicationTimeOut => new TransportMessage(MessageType.CHECK_FOR_APPLICATION_TIMEOUT, null) - case CheckForHDFSExpireDirsTimeout => - new TransportMessage(MessageType.CHECK_FOR_HDFS_EXPIRE_DIRS_TIMEOUT, null) + case CheckForHDFSExpiredDirsTimeout => + new TransportMessage(MessageType.CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT, null) case RemoveExpiredShuffle => new TransportMessage(MessageType.REMOVE_EXPIRED_SHUFFLE, null) @@ -1078,8 +1078,8 @@ object ControlMessages extends Logging { case CHECK_FOR_APPLICATION_TIMEOUT_VALUE => CheckForApplicationTimeOut - case CHECK_FOR_HDFS_EXPIRE_DIRS_TIMEOUT_VALUE => - CheckForHDFSExpireDirsTimeout + case CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT_VALUE => + CheckForHDFSExpiredDirsTimeout case WORKER_LOST_VALUE => PbWorkerLost.parseFrom(message.getPayload) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 00d7964e868..0d310290ba7 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -212,7 +212,7 @@ object Utils extends Logging { } } - def tryLogDeleteHadoopFSError(block: => Unit, path: Path): Unit = { + def tryLogDeleteHDFSPathError(block: => Unit, path: Path): Unit = { try { block } catch { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index a28f37bf13c..0d06ed24b7c 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -193,7 +193,7 @@ private[celeborn] class Master( checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleAtFixedRate( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - self.send(CheckForHDFSExpireDirsTimeout) + self.send(CheckForHDFSExpiredDirsTimeout) } }, hdfsExpireDirsTimeoutMS, @@ -238,7 +238,7 @@ private[celeborn] class Master( executeWithLeaderChecker(null, timeoutDeadWorkers()) case CheckForApplicationTimeOut => executeWithLeaderChecker(null, timeoutDeadApplications()) - case CheckForHDFSExpireDirsTimeout => + case CheckForHDFSExpiredDirsTimeout => executeWithLeaderChecker(null, checkAndCleanExpiredAppDirsOnHDFS()) case pb: PbWorkerLost => val host = pb.getHost @@ -686,24 +686,25 @@ private[celeborn] class Master( }) } - private def checkAndCleanExpiredAppDirsOnHDFS(dir: String = ""): Unit = { + private def checkAndCleanExpiredAppDirsOnHDFS(expiredDir: String = ""): Unit = { if (hadoopFs == null) { hadoopFs = CelebornHadoopUtils.getHadoopFS(conf) } val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir) if (hadoopFs.exists(hdfsWorkPath)) { - if (!dir.isEmpty) { - val dirToDelete = new Path(hdfsWorkPath, dir) + if (!expiredDir.isEmpty) { + val dirToDelete = new Path(hdfsWorkPath, expiredDir) // delete specific app dir on application lost - Utils.tryLogDeleteHadoopFSError(hadoopFs.delete(dirToDelete, true), dirToDelete) + Utils.tryLogDeleteHDFSPathError(hadoopFs.delete(dirToDelete, true), dirToDelete) } else { val iter = hadoopFs.listStatusIterator(hdfsWorkPath) while (iter.hasNext && isMasterActive == 1) { val startTime = System.currentTimeMillis() val fileStatus = iter.next() - val dirToDelete = new Path(hdfsWorkPath, dir) if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { - Utils.tryLogDeleteHadoopFSError(hadoopFs.delete(dirToDelete, true), dirToDelete) + Utils.tryLogDeleteHDFSPathError( + hadoopFs.delete(fileStatus.getPath, true), + fileStatus.getPath) logInfo( s"Clean HDFS dir ${fileStatus.getPath} costs " + Utils.msDurationToString(System.currentTimeMillis() - startTime)) From 063bc9cae07634dae8586c85f46a59e027b25b03 Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 21:05:54 +0800 Subject: [PATCH 28/29] refine. --- .../celeborn/common/util/CelebornHadoopUtils.scala | 14 ++++++++++++++ .../org/apache/celeborn/common/util/Utils.scala | 9 --------- .../celeborn/service/deploy/master/Master.scala | 9 ++------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index bf6d43772e2..166c2a234e7 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -17,6 +17,7 @@ package org.apache.celeborn.common.util +import java.io.IOException import java.util.concurrent.atomic.AtomicBoolean import org.apache.hadoop.conf.Configuration @@ -57,4 +58,17 @@ object CelebornHadoopUtils extends Logging { def getHadoopFS(conf: CelebornConf): FileSystem = { new Path(conf.hdfsDir).getFileSystem(newConfiguration(conf)) } + + def deleteHDFSPathOrLogError(hadoopFs: FileSystem, path: Path, recursive: Boolean): Unit = { + try { + val startTime = System.currentTimeMillis() + hadoopFs.delete(path, recursive) + logInfo( + s"Delete HDFS ${path}(recursive=$recursive) costs " + + Utils.msDurationToString(System.currentTimeMillis() - startTime)) + } catch { + case e: IOException => + logError(s"Failed to delete HDFS ${path}(recursive=$recursive) due to: ", e) + } + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 0d310290ba7..8617f547d47 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -212,15 +212,6 @@ object Utils extends Logging { } } - def tryLogDeleteHDFSPathError(block: => Unit, path: Path): Unit = { - try { - block - } catch { - case e: IOException => - logError(s"Delete HDFS File ${path} failed", e) - } - } - def tryOrExit(block: => Unit): Unit = { try { block diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 0d06ed24b7c..3cec8dd3b68 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -695,19 +695,14 @@ private[celeborn] class Master( if (!expiredDir.isEmpty) { val dirToDelete = new Path(hdfsWorkPath, expiredDir) // delete specific app dir on application lost - Utils.tryLogDeleteHDFSPathError(hadoopFs.delete(dirToDelete, true), dirToDelete) + CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, dirToDelete, true) } else { val iter = hadoopFs.listStatusIterator(hdfsWorkPath) while (iter.hasNext && isMasterActive == 1) { val startTime = System.currentTimeMillis() val fileStatus = iter.next() if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { - Utils.tryLogDeleteHDFSPathError( - hadoopFs.delete(fileStatus.getPath, true), - fileStatus.getPath) - logInfo( - s"Clean HDFS dir ${fileStatus.getPath} costs " + - Utils.msDurationToString(System.currentTimeMillis() - startTime)) + CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, fileStatus.getPath, true) } } } From e6ff705e2a5f07e842ee11af9bff3f6e4bff3c5d Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 5 Jul 2023 21:08:42 +0800 Subject: [PATCH 29/29] refine. --- .../scala/org/apache/celeborn/service/deploy/master/Master.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 3cec8dd3b68..ca5016d5402 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -699,7 +699,6 @@ private[celeborn] class Master( } else { val iter = hadoopFs.listStatusIterator(hdfsWorkPath) while (iter.hasNext && isMasterActive == 1) { - val startTime = System.currentTimeMillis() val fileStatus = iter.next() if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, fileStatus.getPath, true)