diff --git a/README.md b/README.md index d6fbd88a4d4..e12bbdbbce9 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ celeborn.worker.commitFiles.threads 128 celeborn.master.slot.assign.policy roundrobin celeborn.rpc.askTimeout 240s celeborn.worker.flusher.hdfs.buffer.size 4m -celeborn.worker.storage.hdfs.dir hdfs:///celeborn +celeborn.storage.hdfs.dir hdfs:///celeborn celeborn.worker.replicate.fastFail.duration 240s # If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false @@ -175,7 +175,7 @@ celeborn.worker.commitFiles.threads 128 celeborn.master.slot.assign.policy roundrobin celeborn.rpc.askTimeout 240s celeborn.worker.flusher.hdfs.buffer.size 4m -celeborn.worker.storage.hdfs.dir hdfs:///celeborn +celeborn.storage.hdfs.dir hdfs:///celeborn celeborn.worker.replicate.fastFail.duration 240s # If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false @@ -259,7 +259,7 @@ spark.celeborn.client.push.replicate.enabled true spark.sql.adaptive.localShuffleReader.enabled false # If Celeborn is using HDFS -spark.celeborn.worker.storage.hdfs.dir hdfs:///celeborn +spark.celeborn.storage.hdfs.dir hdfs:///celeborn # we recommend enabling aqe support to gain better performance spark.sql.adaptive.enabled true diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index 389d0f6eac3..b77c827805e 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +29,7 @@ import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.common.rpc.RpcEndpointRef; -import org.apache.celeborn.common.util.CelebornHadoopUtils$; +import org.apache.celeborn.common.util.CelebornHadoopUtils; import org.apache.celeborn.common.write.PushState; /** @@ -85,18 +84,10 @@ public static FileSystem getHdfsFs(CelebornConf conf) { if (null == hdfsFs) { synchronized (ShuffleClient.class) { if (null == hdfsFs) { - Configuration hdfsConfiguration = CelebornHadoopUtils$.MODULE$.newConfiguration(conf); - // enable fs cache to avoid too many fs instances - hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false"); - hdfsConfiguration.set("fs.viewfs.impl.disable.cache", "false"); - logger.info( - "Celeborn client will ignore cluster" - + " settings about fs.hdfs/viewfs.impl.disable.cache and set it to false"); try { - hdfsFs = FileSystem.get(hdfsConfiguration); - } catch (IOException e) { - System.err.println("Celeborn initialize hdfs failed."); - e.printStackTrace(System.err); + hdfsFs = CelebornHadoopUtils.getHadoopFS(conf); + } catch (Exception e) { + logger.error("Celeborn initialize HDFS failed.", e); } } } diff --git a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java index ff06febca75..316f11ef167 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java @@ -84,7 +84,7 @@ public DfsPartitionReader( // Parse this message to ensure sort is done. } catch (IOException | InterruptedException e) { throw new IOException( - "read shuffle file from hdfs failed, filePath: " + "read shuffle file from HDFS failed, filePath: " + location.getStorageInfo().getFilePath(), e); } @@ -121,7 +121,7 @@ public DfsPartitionReader( hdfsInputStream.readFully(offset, buffer); } catch (IOException e) { logger.warn( - "read hdfs {} failed will retry, error detail {}", + "read HDFS {} failed will retry, error detail {}", location.getStorageInfo().getFilePath(), e); try { @@ -135,7 +135,7 @@ public DfsPartitionReader( hdfsInputStream.readFully(offset, buffer); } catch (IOException ex) { logger.warn( - "retry read hdfs {} failed, error detail {} ", + "retry read HDFS {} failed, error detail {} ", location.getStorageInfo().getFilePath(), e); exception.set(ex); @@ -238,7 +238,7 @@ public void close() { try { hdfsInputStream.close(); } catch (IOException e) { - logger.warn("close hdfs input stream failed.", e); + logger.warn("close HDFS input stream failed.", e); } if (results.size() > 0) { results.forEach(ReferenceCounted::release); diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java index cffd7cd10f3..426f9285094 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java @@ -169,7 +169,7 @@ public void deleteAllFiles(FileSystem hdfsFs) { } catch (Exception e) { // ignore delete exceptions because some other workers might be deleting the directory logger.debug( - "delete hdfs file {},{},{},{} failed {}", + "delete HDFS file {},{},{},{} failed {}", getHdfsPath(), getHdfsWriterSuccessPath(), getHdfsIndexPath(), diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 37d320d0b95..50dcdc7a1d6 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -68,6 +68,7 @@ enum MessageType { PARTITION_SPLIT = 47; REGISTER_MAP_PARTITION_TASK = 48; HEARTBEAT_FROM_APPLICATION_RESPONSE = 49; + CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT = 50; } message PbStorageInfo { diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index b24b7c03a25..57dffd4a8ef 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -499,8 +499,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def masterSlotAssignPolicy: SlotsAssignPolicy = SlotsAssignPolicy.valueOf(get(MASTER_SLOT_ASSIGN_POLICY)) - def hasHDFSStorage: Boolean = get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) - + def hasHDFSStorage: Boolean = + get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) && get(HDFS_DIR).isDefined def masterSlotAssignLoadAwareDiskGroupNum: Int = get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM) def masterSlotAssignLoadAwareDiskGroupGradient: Double = get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT) @@ -692,6 +692,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientCommitFilesIgnoreExcludedWorkers: Boolean = get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS) def clientRpcMaxParallelism: Int = get(CLIENT_RPC_MAX_PARALLELISM) def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT) + def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT) def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL) def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS) def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT) @@ -1509,6 +1510,14 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("300s") + val HDFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] = + buildConf("celeborn.master.hdfs.expireDirs.timeout") + .categories("master") + .version("0.3.0") + .doc("The timeout for a expire dirs to be deleted on HDFS.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1h") + val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.master.heartbeat.worker.timeout") .withAlternative("celeborn.worker.heartbeat.timeout") @@ -1955,11 +1964,10 @@ object CelebornConf extends Logging { .createWithDefault(16) val HDFS_DIR: OptionalConfigEntry[String] = - buildConf("celeborn.worker.storage.hdfs.dir") - .withAlternative("celeborn.storage.hdfs.dir") - .categories("worker") + buildConf("celeborn.storage.hdfs.dir") + .categories("worker", "master", "client") .version("0.2.0") - .doc("HDFS dir configuration for Celeborn to access HDFS.") + .doc("HDFS base directory for Celeborn to store shuffle data.") .stringConf .createOptional diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 2c31d29146b..a0ba251a873 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -19,7 +19,6 @@ package org.apache.celeborn.common.protocol.message import java.util import java.util.UUID -import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ @@ -67,6 +66,8 @@ object ControlMessages extends Logging { case object CheckForApplicationTimeOut extends Message + case object CheckForHDFSExpiredDirsTimeout extends Message + case object RemoveExpiredShuffle extends Message /** @@ -427,6 +428,9 @@ object ControlMessages extends Logging { case CheckForApplicationTimeOut => new TransportMessage(MessageType.CHECK_FOR_APPLICATION_TIMEOUT, null) + case CheckForHDFSExpiredDirsTimeout => + new TransportMessage(MessageType.CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT, null) + case RemoveExpiredShuffle => new TransportMessage(MessageType.REMOVE_EXPIRED_SHUFFLE, null) @@ -1074,6 +1078,9 @@ object ControlMessages extends Logging { case CHECK_FOR_APPLICATION_TIMEOUT_VALUE => CheckForApplicationTimeOut + case CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT_VALUE => + CheckForHDFSExpiredDirsTimeout + case WORKER_LOST_VALUE => PbWorkerLost.parseFrom(message.getPayload) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 39b70ff1892..166c2a234e7 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -17,13 +17,33 @@ package org.apache.celeborn.common.util +import java.io.IOException +import java.util.concurrent.atomic.AtomicBoolean + import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging -object CelebornHadoopUtils { +object CelebornHadoopUtils extends Logging { + private var logPrinted = new AtomicBoolean(false) private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = { val hadoopConf = new Configuration() + if (conf.hdfsDir.nonEmpty) { + val path = new Path(conf.hdfsDir) + val scheme = path.toUri.getScheme + val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) + hadoopConf.set("dfs.replication", "2") + hadoopConf.set(disableCacheName, "false") + if (logPrinted.compareAndSet(false, true)) { + logInfo( + "Celeborn overrides some HDFS settings defined in Hadoop configuration files, " + + s"including '$disableCacheName=false' and 'dfs.replication=2'. " + + "It can be overridden again in Celeborn configuration with the additional " + + "prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'") + } + } appendSparkHadoopConfigs(conf, hadoopConf) hadoopConf } @@ -34,4 +54,21 @@ object CelebornHadoopUtils { hadoopConf.set(key.substring("celeborn.hadoop.".length), value) } } + + def getHadoopFS(conf: CelebornConf): FileSystem = { + new Path(conf.hdfsDir).getFileSystem(newConfiguration(conf)) + } + + def deleteHDFSPathOrLogError(hadoopFs: FileSystem, path: Path, recursive: Boolean): Unit = { + try { + val startTime = System.currentTimeMillis() + hadoopFs.delete(path, recursive) + logInfo( + s"Delete HDFS ${path}(recursive=$recursive) costs " + + Utils.msDurationToString(System.currentTimeMillis() - startTime)) + } catch { + case e: IOException => + logError(s"Failed to delete HDFS ${path}(recursive=$recursive) due to: ", e) + } + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index a4eb900e070..8617f547d47 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -38,6 +38,7 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.google.protobuf.{ByteString, GeneratedMessageV3} import io.netty.channel.unix.Errors.NativeIoException import org.apache.commons.lang3.SystemUtils +import org.apache.hadoop.fs.Path import org.roaringbitmap.RoaringBitmap import org.apache.celeborn.common.CelebornConf diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala index 4c5ce3b558b..3105d25d744 100644 --- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala @@ -190,9 +190,11 @@ class CelebornConfSuite extends CelebornFunSuite { test("Test empty working dir") { val conf = new CelebornConf() conf.set("celeborn.storage.activeTypes", "HDFS") + conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx") assert(conf.workerBaseDirs.isEmpty) conf.set("celeborn.storage.activeTypes", "SDD,HDD,HDFS") + conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx") assert(conf.workerBaseDirs.isEmpty) conf.set("celeborn.storage.activeTypes", "SDD,HDD") @@ -202,6 +204,7 @@ class CelebornConfSuite extends CelebornFunSuite { test("Test commit file threads") { val conf = new CelebornConf() conf.set("celeborn.storage.activeTypes", "HDFS") + conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx") assert(conf.workerCommitThreads === 128) conf.set("celeborn.storage.activeTypes", "SDD,HDD") diff --git a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala index cca2439d085..e3e812a0db0 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala @@ -105,7 +105,7 @@ class UtilsSuite extends CelebornFunSuite { assert(mapperEnd == mapperEndTrans) } - test("validate hdfs compatible fs path") { + test("validate HDFS compatible fs path") { val hdfsPath = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x" val simpleHdfsPath = "hdfs:///xxxx/xx-xx/x-x-x" val sortedHdfsPath = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x.sorted" diff --git a/docs/configuration/client.md b/docs/configuration/client.md index d88c1b64beb..15885c8a61f 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -97,4 +97,5 @@ license: | | celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. | 0.3.0 | | celeborn.master.endpoints | <localhost>:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `:[,:]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | +| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 8842b6c8c18..695b3d84a70 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -22,6 +22,7 @@ license: | | celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of updating partition size for estimation. | 0.3.0 | +| celeborn.master.hdfs.expireDirs.timeout | 1h | The timeout for a expire dirs to be deleted on HDFS. | 0.3.0 | | celeborn.master.heartbeat.application.timeout | 300s | Application heartbeat timeout. | 0.3.0 | | celeborn.master.heartbeat.worker.timeout | 120s | Worker heartbeat timeout. | 0.3.0 | | celeborn.master.host | <localhost> | Hostname for master to bind. | 0.2.0 | @@ -34,4 +35,5 @@ license: | | celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.activeTypes` | 0.3.0 | | celeborn.master.userResourceConsumption.update.interval | 30s | Time length for a window about compute user resource consumption. | 0.3.0 | | celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 | +| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 29e3b99afc2..887788ceff2 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -23,6 +23,7 @@ license: | | celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | | celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 | +| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for read buffer per mount point. | 0.3.0 | | celeborn.worker.closeIdleConnections | false | Whether worker will close idle connections. | 0.2.0 | | celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | @@ -92,7 +93,6 @@ license: | | celeborn.worker.storage.checkDirsEmpty.timeout | 1000ms | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | 0.3.0 | | celeborn.worker.storage.dirs | <undefined> | Directory list to store shuffle data. It's recommended to configure one directory on each disk. Storage size limit can be set for each directory. For the sake of performance, there should be no more than 2 flush threads on the same disk partition if you are using HDD, and should be 8 or more flush threads on the same disk partition if you are using SSD. For example: `dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=]` | 0.2.0 | | celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved space for each disk. | 0.3.0 | -| celeborn.worker.storage.hdfs.dir | <undefined> | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 | | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's working dir path name. | 0.3.0 | | celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to close | 0.2.0 | | celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file writer to create if its creation was failed. | 0.2.0 | diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index cb5b2216cc7..ca5016d5402 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -23,8 +23,11 @@ import java.util import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.util.Random +import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.identity.UserIdentifier @@ -37,7 +40,7 @@ import org.apache.celeborn.common.protocol.message.{ControlMessages, StatusCode} import org.apache.celeborn.common.protocol.message.ControlMessages._ import org.apache.celeborn.common.quota.{QuotaManager, ResourceConsumption} import org.apache.celeborn.common.rpc._ -import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, ThreadUtils, Utils} +import org.apache.celeborn.common.util.{CelebornHadoopUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils} import org.apache.celeborn.server.common.{HttpService, Service} import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler} @@ -90,11 +93,14 @@ private[celeborn] class Master( ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _ private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _ + private var checkForHDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _ private val nonEagerHandler = ThreadUtils.newDaemonCachedThreadPool("master-noneager-handler", 64) // Config constants private val workerHeartbeatTimeoutMs = conf.workerHeartbeatTimeout private val appHeartbeatTimeoutMs = conf.appHeartbeatTimeoutMs + private val hdfsExpireDirsTimeoutMS = conf.hdfsExpireDirsTimeoutMS + private val hasHDFSStorage = conf.hasHDFSStorage private val quotaManager = QuotaManager.instantiate(conf) private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval @@ -141,6 +147,7 @@ private[celeborn] class Master( // init and register master metrics val resourceConsumptionSource = new ResourceConsumptionSource(conf) private val masterSource = new MasterSource(conf) + private var hadoopFs: FileSystem = _ masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () => statusSystem.registeredShuffle.size } @@ -181,6 +188,19 @@ private[celeborn] class Master( 0, appHeartbeatTimeoutMs / 2, TimeUnit.MILLISECONDS) + + if (hasHDFSStorage) { + checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleAtFixedRate( + new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + self.send(CheckForHDFSExpiredDirsTimeout) + } + }, + hdfsExpireDirsTimeoutMS, + hdfsExpireDirsTimeoutMS, + TimeUnit.MILLISECONDS) + } + } override def onStop(): Unit = { @@ -191,6 +211,9 @@ private[celeborn] class Master( if (checkForApplicationTimeOutTask != null) { checkForApplicationTimeOutTask.cancel(true) } + if (checkForHDFSRemnantDirsTimeOutTask != null) { + checkForHDFSRemnantDirsTimeOutTask.cancel(true) + } forwardMessageThread.shutdownNow() logInfo("Celeborn Master is stopped.") } @@ -215,6 +238,8 @@ private[celeborn] class Master( executeWithLeaderChecker(null, timeoutDeadWorkers()) case CheckForApplicationTimeOut => executeWithLeaderChecker(null, timeoutDeadApplications()) + case CheckForHDFSExpiredDirsTimeout => + executeWithLeaderChecker(null, checkAndCleanExpiredAppDirsOnHDFS()) case pb: PbWorkerLost => val host = pb.getHost val rpcPort = pb.getRpcPort @@ -544,7 +569,7 @@ private[celeborn] class Master( val slots = masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") { statusSystem.workers.synchronized { - if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && !conf.hasHDFSStorage) { + if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && !hasHDFSStorage) { SlotsAllocator.offerSlotsLoadAware( availableWorkers, requestSlots.partitionIdList, @@ -653,11 +678,36 @@ private[celeborn] class Master( override def run(): Unit = { statusSystem.handleAppLost(appId, requestId) logInfo(s"Removed application $appId") + if (hasHDFSStorage) { + checkAndCleanExpiredAppDirsOnHDFS(appId) + } context.reply(ApplicationLostResponse(StatusCode.SUCCESS)) } }) } + private def checkAndCleanExpiredAppDirsOnHDFS(expiredDir: String = ""): Unit = { + if (hadoopFs == null) { + hadoopFs = CelebornHadoopUtils.getHadoopFS(conf) + } + val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir) + if (hadoopFs.exists(hdfsWorkPath)) { + if (!expiredDir.isEmpty) { + val dirToDelete = new Path(hdfsWorkPath, expiredDir) + // delete specific app dir on application lost + CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, dirToDelete, true) + } else { + val iter = hadoopFs.listStatusIterator(hdfsWorkPath) + while (iter.hasNext && isMasterActive == 1) { + val fileStatus = iter.next() + if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { + CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, fileStatus.getPath, true) + } + } + } + } + } + private def handleHeartbeatFromApplication( context: RpcCallContext, appId: String, diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index f7963c5d64a..1d88069a820 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -193,8 +193,8 @@ public FileInfo getSortedFileInfo( throw new IOException( "Sort scheduler thread is interrupted means worker is shutting down.", e); } catch (IOException e) { - logger.error("File sorter access hdfs failed.", e); - throw new IOException("File sorter access hdfs failed.", e); + logger.error("File sorter access HDFS failed.", e); + throw new IOException("File sorter access HDFS failed.", e); } } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index d2099df2abf..37fe0581d75 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -28,7 +28,6 @@ import java.util.function.{BiConsumer, IntUnaryOperator} import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.permission.FsPermission import org.iq80.leveldb.DB @@ -52,6 +51,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val workingDirWriters = JavaUtils.newConcurrentHashMap[File, ConcurrentHashMap[String, FileWriter]]() + val hasHDFSStorage = conf.hasHDFSStorage + // (deviceName -> deviceInfo) and (mount point -> diskInfo) val (deviceInfos, diskInfos) = { val workingDirInfos = @@ -59,7 +60,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs (new File(workdir, conf.workerWorkingDir), maxSpace, flusherThread, storageType) } - if (workingDirInfos.size <= 0 && !conf.hasHDFSStorage) { + if (workingDirInfos.size <= 0 && !hasHDFSStorage) { throw new IOException("Empty working directory configuration!") } @@ -120,22 +121,12 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs deviceMonitor.startCheck() val hdfsDir = conf.hdfsDir - if (!hdfsDir.isEmpty && conf.hasHDFSStorage) { - logInfo(s"Initialize HDFS support with path ${hdfsDir}") - } val hdfsPermission = new FsPermission("755") val hdfsWriters = JavaUtils.newConcurrentHashMap[String, FileWriter]() val (hdfsFlusher, _totalHdfsFlusherThread) = - if (!hdfsDir.isEmpty && conf.hasHDFSStorage) { - val path = new Path(hdfsDir) - val scheme = path.toUri.getScheme - val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme) - val hdfsConfiguration = CelebornHadoopUtils.newConfiguration(conf) - hdfsConfiguration.set("dfs.replication", "2") - hdfsConfiguration.set(disableCacheName, "false") - logInfo("Celeborn will ignore cluster settings " + - disableCacheName + " and set it to false") - StorageManager.hadoopFs = path.getFileSystem(hdfsConfiguration) + if (hasHDFSStorage) { + logInfo(s"Initialize HDFS support with path ${hdfsDir}") + StorageManager.hadoopFs = CelebornHadoopUtils.getHadoopFS(conf) ( Some(new HdfsFlusher( workerSource, @@ -265,7 +256,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs partitionType: PartitionType, rangeReadFilter: Boolean, userIdentifier: UserIdentifier): FileWriter = { - if (healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage) { + if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage) { throw new IOException("No available working dirs!") } @@ -484,7 +475,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId"), true) } catch { - case e: Exception => logWarning("Clean expired hdfs shuffle failed.", e) + case e: Exception => logWarning("Clean expired HDFS shuffle failed.", e) } } } @@ -527,19 +518,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs case _ => // do nothing } } - - if (hadoopFs != null) { - val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir) - if (hadoopFs.exists(hdfsWorkPath)) { - val iter = hadoopFs.listStatusIterator(hdfsWorkPath) - while (iter.hasNext) { - val fileStatus = iter.next() - if (!appIds.contains(fileStatus.getPath.getName)) { - hadoopFs.delete(fileStatus.getPath, true) - } - } - } - } } private def deleteDirectory(dir: File, threadPool: ThreadPoolExecutor): Unit = { @@ -594,7 +572,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val hdfsCleaned = hadoopFs match { case hdfs: FileSystem => val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir) - // hdfs path not exist when first time initialize + // HDFS path not exist when first time initialize if (hdfs.exists(hdfsWorkPath)) { !hdfs.listFiles(hdfsWorkPath, false).hasNext } else {