Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-764] Fix celeborn on HDFS might clean using app directories. #1678

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c6b50e3
[CELEBORN-764] Fix celeborn on HDFS might clean using app directories.
FMX Jul 4, 2023
c407201
Update common/src/main/scala/org/apache/celeborn/common/CelebornConf.…
FMX Jul 5, 2023
d4e3595
update
FMX Jul 5, 2023
113f7df
update.
FMX Jul 5, 2023
cd37620
Update common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
pan3793 Jul 5, 2023
b21b056
update.
FMX Jul 5, 2023
344787e
Merge remote-tracking branch 'origin/CELEBORN-764' into CELEBORN-764
FMX Jul 5, 2023
dd6f55d
update.
FMX Jul 5, 2023
84ceda8
address comments.
FMX Jul 5, 2023
acbcdc8
Update common/src/main/scala/org/apache/celeborn/common/util/Celeborn…
pan3793 Jul 5, 2023
59b99c3
Update common/src/main/scala/org/apache/celeborn/common/util/Celeborn…
pan3793 Jul 5, 2023
393bb53
Add new configs to avoid checking frequently.
FMX Jul 5, 2023
fc2d0d7
fix ut.
FMX Jul 5, 2023
f23aca4
update spelling and docs.
FMX Jul 5, 2023
ad21966
fix ut.
FMX Jul 5, 2023
760b924
test on cluster.
FMX Jul 5, 2023
8df0d4b
address comments.
FMX Jul 5, 2023
af59f07
update.
FMX Jul 5, 2023
22b6989
refine.
FMX Jul 5, 2023
9e47b1a
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 Jul 5, 2023
ff715b8
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 Jul 5, 2023
59d0869
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 Jul 5, 2023
2999a49
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 Jul 5, 2023
1d793b7
refine.
FMX Jul 5, 2023
d18430d
Merge remote-tracking branch 'origin/CELEBORN-764' into CELEBORN-764
FMX Jul 5, 2023
2fbcc57
refine.
FMX Jul 5, 2023
9499121
refine.
FMX Jul 5, 2023
1643cb6
refine.
FMX Jul 5, 2023
93689d9
refine.
FMX Jul 5, 2023
063bc9c
refine.
FMX Jul 5, 2023
e6ff705
refine.
FMX Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ celeborn.worker.commitFiles.threads 128
celeborn.master.slot.assign.policy roundrobin
celeborn.rpc.askTimeout 240s
celeborn.worker.flusher.hdfs.buffer.size 4m
celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.worker.replicate.fastFail.duration 240s

# If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false
Expand Down Expand Up @@ -175,7 +175,7 @@ celeborn.worker.commitFiles.threads 128
celeborn.master.slot.assign.policy roundrobin
celeborn.rpc.askTimeout 240s
celeborn.worker.flusher.hdfs.buffer.size 4m
celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.worker.replicate.fastFail.duration 240s

# If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false
Expand Down Expand Up @@ -259,7 +259,7 @@ spark.celeborn.client.push.replicate.enabled true
spark.sql.adaptive.localShuffleReader.enabled false

# If Celeborn is using HDFS
spark.celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
spark.celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn

# we recommend enabling aqe support to gain better performance
spark.sql.adaptive.enabled true
Expand Down
14 changes: 3 additions & 11 deletions client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,7 +29,7 @@
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.util.CelebornHadoopUtils$;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.write.PushState;

/**
Expand Down Expand Up @@ -85,16 +84,9 @@ public static FileSystem getHdfsFs(CelebornConf conf) {
if (null == hdfsFs) {
synchronized (ShuffleClient.class) {
if (null == hdfsFs) {
Configuration hdfsConfiguration = CelebornHadoopUtils$.MODULE$.newConfiguration(conf);
// enable fs cache to avoid too many fs instances
hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false");
hdfsConfiguration.set("fs.viewfs.impl.disable.cache", "false");
logger.info(
"Celeborn client will ignore cluster"
+ " settings about fs.hdfs/viewfs.impl.disable.cache and set it to false");
try {
hdfsFs = FileSystem.get(hdfsConfiguration);
} catch (IOException e) {
hdfsFs = Utils.getHadoopFS(conf);
} catch (Exception e) {
System.err.println("Celeborn initialize hdfs failed.");
e.printStackTrace(System.err);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1955,11 +1955,10 @@ object CelebornConf extends Logging {
.createWithDefault(16)

val HDFS_DIR: OptionalConfigEntry[String] =
buildConf("celeborn.worker.storage.hdfs.dir")
.withAlternative("celeborn.storage.hdfs.dir")
.categories("worker")
buildConf("celeborn.storage.hdfs.dir")
.categories("worker", "master", "client")
.version("0.2.0")
.doc("HDFS dir configuration for Celeborn to access HDFS.")
.doc("HDFS base directory for Celeborn to store shuffle data.")
.stringConf
.createOptional

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,23 @@
package org.apache.celeborn.common.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging

object CelebornHadoopUtils {
object CelebornHadoopUtils extends Logging {
private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = {
val hadoopConf = new Configuration()
if (!conf.hdfsDir.isEmpty) {
val path = new Path(conf.hdfsDir)
val scheme = path.toUri.getScheme
val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme)
hadoopConf.set("dfs.replication", "2")
hadoopConf.set(disableCacheName, "false")
logInfo(s"Celeborn will ignore cluster settings $disableCacheName and " +
"set it to false")
pan3793 marked this conversation as resolved.
Show resolved Hide resolved
}
appendSparkHadoopConfigs(conf, hadoopConf)
hadoopConf
}
Expand All @@ -34,4 +45,8 @@ object CelebornHadoopUtils {
hadoopConf.set(key.substring("celeborn.hadoop.".length), value)
}
}

def getHadoopFS(conf: CelebornConf): FileSystem = {
new Path(conf.hdfsDir).getFileSystem(CelebornHadoopUtils.newConfiguration(conf))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1072,4 +1072,5 @@ object Utils extends Logging {
}
labelPart(0).trim -> labelPart(1).trim
}

pan3793 marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,5 @@ license: |
| celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. | 0.3.0 |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 |
<!--end-include-->
1 change: 1 addition & 0 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ license: |
| celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.activeTypes` | 0.3.0 |
| celeborn.master.userResourceConsumption.update.interval | 30s | Time length for a window about compute user resource consumption. | 0.3.0 |
| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 |
<!--end-include-->
2 changes: 1 addition & 1 deletion docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ license: |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |
| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 |
| celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for read buffer per mount point. | 0.3.0 |
| celeborn.worker.closeIdleConnections | false | Whether worker will close idle connections. | 0.2.0 |
| celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
Expand Down Expand Up @@ -92,7 +93,6 @@ license: |
| celeborn.worker.storage.checkDirsEmpty.timeout | 1000ms | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | 0.3.0 |
| celeborn.worker.storage.dirs | &lt;undefined&gt; | Directory list to store shuffle data. It's recommended to configure one directory on each disk. Storage size limit can be set for each directory. For the sake of performance, there should be no more than 2 flush threads on the same disk partition if you are using HDD, and should be 8 or more flush threads on the same disk partition if you are using SSD. For example: `dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=]` | 0.2.0 |
| celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved space for each disk. | 0.3.0 |
| celeborn.worker.storage.hdfs.dir | &lt;undefined&gt; | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's working dir path name. | 0.3.0 |
| celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to close | 0.2.0 |
| celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file writer to create if its creation was failed. | 0.2.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, l
partitionTotalFileCount.add(fileCount);
}

public Set<String> getActiveAppIds() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use appHeartbeatTime map to get active appIds. Master may encounter corner case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, We can directly use appHeartbeatTime keySet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, We can directly use appHeartbeatTime keySet.

+1, one application may have many shuffleKeys

return registeredShuffle.stream()
.map(key -> Utils.splitShuffleKey(key)._1)
.collect(Collectors.toSet());
}

public void updateAppLostMeta(String appId) {
registeredShuffle.stream()
.filter(shuffle -> shuffle.startsWith(appId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.Random

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.identity.UserIdentifier
Expand Down Expand Up @@ -141,6 +143,7 @@ private[celeborn] class Master(
// init and register master metrics
val resourceConsumptionSource = new ResourceConsumptionSource(conf)
private val masterSource = new MasterSource(conf)
private var hadoopFs: FileSystem = _
masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
statusSystem.registeredShuffle.size
}
Expand Down Expand Up @@ -653,11 +656,33 @@ private[celeborn] class Master(
override def run(): Unit = {
statusSystem.handleAppLost(appId, requestId)
logInfo(s"Removed application $appId")
// only leader can clean hdfs dirs
if (conf.hasHDFSStorage && !conf.hdfsDir.isEmpty) {
cleanExpiredAppDirsOnHDFS()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may cost lot as applications may lost frequently in a big cluster, we would better not clean expired app dirs in handleApplicationLost, instead we need do this in timeoutDeadApplications after we handleApplicationLost. And if this cost lot, I suggest we can cache this list and refresh every x(3) min turn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable. I'll move the clean logic to timeoutDeadApplications. I think listing directories won't cost a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may cost lot as applications may lost frequently in a big cluster, we would better not clean expired app dirs in handleApplicationLost, instead we need do this in timeoutDeadApplications after we handleApplicationLost. And if this cost lot, I suggest we can cache this list and refresh every x(3) min turn?

I also think it's not a good idea to call cleanExpiredAppDirsOnHDFS when app lost. IMO we can just use forwardMessageThread.scheduleAtFixedRate to check like checkForWorkerTimeOutTask and checkForApplicationTimeOutTask. Also we need to change forwardMessageThread from single thread to multiple threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we need to ensure that the leader has replayed all raft logs before cleanup.

}
context.reply(ApplicationLostResponse(StatusCode.SUCCESS))
}
})
}

private def cleanExpiredAppDirsOnHDFS(): Unit = {
val activeAppIds = statusSystem.getActiveAppIds
if (hadoopFs == null) {
hadoopFs = Utils.getHadoopFS(conf)
}
val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir)
if (hadoopFs.exists(hdfsWorkPath)) {
val iter = hadoopFs.listStatusIterator(hdfsWorkPath)
while (iter.hasNext) {
val fileStatus = iter.next()
if (!activeAppIds.contains(fileStatus.getPath.getName)) {
logDebug(s"Clean hdfs dir ${fileStatus.getPath.toString}")
pan3793 marked this conversation as resolved.
Show resolved Hide resolved
hadoopFs.delete(fileStatus.getPath, true)
}
}
}
}

private def handleHeartbeatFromApplication(
context: RpcCallContext,
appId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,12 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
deviceMonitor.startCheck()

val hdfsDir = conf.hdfsDir
if (!hdfsDir.isEmpty && conf.hasHDFSStorage) {
logInfo(s"Initialize HDFS support with path ${hdfsDir}")
}
val hdfsPermission = new FsPermission("755")
val hdfsWriters = JavaUtils.newConcurrentHashMap[String, FileWriter]()
val (hdfsFlusher, _totalHdfsFlusherThread) =
if (!hdfsDir.isEmpty && conf.hasHDFSStorage) {
val path = new Path(hdfsDir)
val scheme = path.toUri.getScheme
val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme)
val hdfsConfiguration = CelebornHadoopUtils.newConfiguration(conf)
hdfsConfiguration.set("dfs.replication", "2")
hdfsConfiguration.set(disableCacheName, "false")
logInfo("Celeborn will ignore cluster settings " +
disableCacheName + " and set it to false")
StorageManager.hadoopFs = path.getFileSystem(hdfsConfiguration)
logInfo(s"Initialize HDFS support with path ${hdfsDir}")
StorageManager.hadoopFs = Utils.getHadoopFS(conf)
(
Some(new HdfsFlusher(
workerSource,
Expand Down Expand Up @@ -527,19 +517,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
case _ => // do nothing
}
}

if (hadoopFs != null) {
val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir)
if (hadoopFs.exists(hdfsWorkPath)) {
val iter = hadoopFs.listStatusIterator(hdfsWorkPath)
while (iter.hasNext) {
val fileStatus = iter.next()
if (!appIds.contains(fileStatus.getPath.getName)) {
hadoopFs.delete(fileStatus.getPath, true)
}
}
}
}
}

private def deleteDirectory(dir: File, threadPool: ThreadPoolExecutor): Unit = {
Expand Down
Loading