Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-764] Fix celeborn on HDFS might clean using app directories. #1678

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c6b50e3
[CELEBORN-764] Fix celeborn on HDFS might clean using app directories.
FMX Jul 4, 2023
c407201
Update common/src/main/scala/org/apache/celeborn/common/CelebornConf.…
FMX Jul 5, 2023
d4e3595
update
FMX Jul 5, 2023
113f7df
update.
FMX Jul 5, 2023
cd37620
Update common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
pan3793 Jul 5, 2023
b21b056
update.
FMX Jul 5, 2023
344787e
Merge remote-tracking branch 'origin/CELEBORN-764' into CELEBORN-764
FMX Jul 5, 2023
dd6f55d
update.
FMX Jul 5, 2023
84ceda8
address comments.
FMX Jul 5, 2023
acbcdc8
Update common/src/main/scala/org/apache/celeborn/common/util/Celeborn…
pan3793 Jul 5, 2023
59b99c3
Update common/src/main/scala/org/apache/celeborn/common/util/Celeborn…
pan3793 Jul 5, 2023
393bb53
Add new configs to avoid checking frequently.
FMX Jul 5, 2023
fc2d0d7
fix ut.
FMX Jul 5, 2023
f23aca4
update spelling and docs.
FMX Jul 5, 2023
ad21966
fix ut.
FMX Jul 5, 2023
760b924
test on cluster.
FMX Jul 5, 2023
8df0d4b
address comments.
FMX Jul 5, 2023
af59f07
update.
FMX Jul 5, 2023
22b6989
refine.
FMX Jul 5, 2023
9e47b1a
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 Jul 5, 2023
ff715b8
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 Jul 5, 2023
59d0869
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 Jul 5, 2023
2999a49
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 Jul 5, 2023
1d793b7
refine.
FMX Jul 5, 2023
d18430d
Merge remote-tracking branch 'origin/CELEBORN-764' into CELEBORN-764
FMX Jul 5, 2023
2fbcc57
refine.
FMX Jul 5, 2023
9499121
refine.
FMX Jul 5, 2023
1643cb6
refine.
FMX Jul 5, 2023
93689d9
refine.
FMX Jul 5, 2023
063bc9c
refine.
FMX Jul 5, 2023
e6ff705
refine.
FMX Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ celeborn.worker.commitFiles.threads 128
celeborn.master.slot.assign.policy roundrobin
celeborn.rpc.askTimeout 240s
celeborn.worker.flusher.hdfs.buffer.size 4m
celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.worker.replicate.fastFail.duration 240s

# If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false
Expand Down Expand Up @@ -175,7 +175,7 @@ celeborn.worker.commitFiles.threads 128
celeborn.master.slot.assign.policy roundrobin
celeborn.rpc.askTimeout 240s
celeborn.worker.flusher.hdfs.buffer.size 4m
celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.worker.replicate.fastFail.duration 240s

# If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false
Expand Down Expand Up @@ -259,7 +259,7 @@ spark.celeborn.client.push.replicate.enabled true
spark.sql.adaptive.localShuffleReader.enabled false

# If Celeborn is using HDFS
spark.celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
spark.celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn

# we recommend enabling aqe support to gain better performance
spark.sql.adaptive.enabled true
Expand Down
16 changes: 4 additions & 12 deletions client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,7 +29,7 @@
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.util.CelebornHadoopUtils$;
import org.apache.celeborn.common.util.CelebornHadoopUtils;
import org.apache.celeborn.common.write.PushState;

/**
Expand Down Expand Up @@ -85,17 +84,10 @@ public static FileSystem getHdfsFs(CelebornConf conf) {
if (null == hdfsFs) {
synchronized (ShuffleClient.class) {
if (null == hdfsFs) {
Configuration hdfsConfiguration = CelebornHadoopUtils$.MODULE$.newConfiguration(conf);
// enable fs cache to avoid too many fs instances
hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false");
hdfsConfiguration.set("fs.viewfs.impl.disable.cache", "false");
logger.info(
"Celeborn client will ignore cluster"
+ " settings about fs.hdfs/viewfs.impl.disable.cache and set it to false");
try {
hdfsFs = FileSystem.get(hdfsConfiguration);
} catch (IOException e) {
System.err.println("Celeborn initialize hdfs failed.");
hdfsFs = CelebornHadoopUtils.getHadoopFS(conf);
} catch (Exception e) {
System.err.println("Celeborn initialize HDFS failed.");
e.printStackTrace(System.err);
pan3793 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public DfsPartitionReader(
// Parse this message to ensure sort is done.
} catch (IOException | InterruptedException e) {
throw new IOException(
"read shuffle file from hdfs failed, filePath: "
"read shuffle file from HDFS failed, filePath: "
+ location.getStorageInfo().getFilePath(),
e);
}
Expand Down Expand Up @@ -121,7 +121,7 @@ public DfsPartitionReader(
hdfsInputStream.readFully(offset, buffer);
} catch (IOException e) {
logger.warn(
"read hdfs {} failed will retry, error detail {}",
"read HDFS {} failed will retry, error detail {}",
location.getStorageInfo().getFilePath(),
e);
try {
Expand All @@ -135,7 +135,7 @@ public DfsPartitionReader(
hdfsInputStream.readFully(offset, buffer);
} catch (IOException ex) {
logger.warn(
"retry read hdfs {} failed, error detail {} ",
"retry read HDFS {} failed, error detail {} ",
location.getStorageInfo().getFilePath(),
e);
exception.set(ex);
Expand Down Expand Up @@ -238,7 +238,7 @@ public void close() {
try {
hdfsInputStream.close();
} catch (IOException e) {
logger.warn("close hdfs input stream failed.", e);
logger.warn("close HDFS input stream failed.", e);
}
if (results.size() > 0) {
results.forEach(ReferenceCounted::release);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void deleteAllFiles(FileSystem hdfsFs) {
} catch (Exception e) {
// ignore delete exceptions because some other workers might be deleting the directory
logger.debug(
"delete hdfs file {},{},{},{} failed {}",
"delete HDFS file {},{},{},{} failed {}",
getHdfsPath(),
getHdfsWriterSuccessPath(),
getHdfsIndexPath(),
Expand Down
1 change: 1 addition & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ enum MessageType {
PARTITION_SPLIT = 47;
REGISTER_MAP_PARTITION_TASK = 48;
HEARTBEAT_FROM_APPLICATION_RESPONSE = 49;
CHECK_FOR_HDFS_REMNANT_DIRS_TIMEOUT = 50;
}

message PbStorageInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def masterSlotAssignPolicy: SlotsAssignPolicy =
SlotsAssignPolicy.valueOf(get(MASTER_SLOT_ASSIGN_POLICY))

def hasHDFSStorage: Boolean = get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name())

def hasHDFSStorage: Boolean =
get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) && get(HDFS_DIR).isDefined
def masterSlotAssignLoadAwareDiskGroupNum: Int = get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM)
def masterSlotAssignLoadAwareDiskGroupGradient: Double =
get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT)
Expand Down Expand Up @@ -692,6 +692,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientCommitFilesIgnoreExcludedWorkers: Boolean = get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS)
def clientRpcMaxParallelism: Int = get(CLIENT_RPC_MAX_PARALLELISM)
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
def hdfsRemnantDirsTimeoutMS: Long = get(HDFS_REMNANTDIRS_TIMEOUT)
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
Expand Down Expand Up @@ -1509,6 +1510,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("300s")

val HDFS_REMNANTDIRS_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.hdfs.remnantDirs.timeout")
Copy link
Member

@pan3793 pan3793 Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The storage namespace should be used, and we'd better emphasize it applies to app level in the name

Suggested change
buildConf("celeborn.master.hdfs.remnantDirs.timeout")
buildConf("celeborn.master.storage.hdfs.appDirs.expiredDuration")

Or celeborn.master.storage.hdfs.appRemnantDirs.expiredDuration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the ding-ding group, I explained the reason why I choose this name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu do you have suggestion for its name?

.categories("master")
.version("0.3.0")
.doc("Timeout before HDFS remnant shuffle dirs are deleted.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")

val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.heartbeat.worker.timeout")
.withAlternative("celeborn.worker.heartbeat.timeout")
Expand Down Expand Up @@ -1955,11 +1964,10 @@ object CelebornConf extends Logging {
.createWithDefault(16)

val HDFS_DIR: OptionalConfigEntry[String] =
buildConf("celeborn.worker.storage.hdfs.dir")
.withAlternative("celeborn.storage.hdfs.dir")
.categories("worker")
buildConf("celeborn.storage.hdfs.dir")
.categories("worker", "master", "client")
.version("0.2.0")
.doc("HDFS dir configuration for Celeborn to access HDFS.")
.doc("HDFS base directory for Celeborn to store shuffle data.")
.stringConf
.createOptional

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.celeborn.common.protocol.message

import java.util
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -67,6 +66,8 @@ object ControlMessages extends Logging {

case object CheckForApplicationTimeOut extends Message

case object CheckForHDFSRemnantDirsTimeout extends Message

case object RemoveExpiredShuffle extends Message

/**
Expand Down Expand Up @@ -427,6 +428,9 @@ object ControlMessages extends Logging {
case CheckForApplicationTimeOut =>
new TransportMessage(MessageType.CHECK_FOR_APPLICATION_TIMEOUT, null)

case CheckForHDFSRemnantDirsTimeout =>
new TransportMessage(MessageType.CHECK_FOR_HDFS_REMNANT_DIRS_TIMEOUT, null)

case RemoveExpiredShuffle =>
new TransportMessage(MessageType.REMOVE_EXPIRED_SHUFFLE, null)

Expand Down Expand Up @@ -1074,6 +1078,9 @@ object ControlMessages extends Logging {
case CHECK_FOR_APPLICATION_TIMEOUT_VALUE =>
CheckForApplicationTimeOut

case CHECK_FOR_HDFS_REMNANT_DIRS_TIMEOUT_VALUE =>
CheckForHDFSRemnantDirsTimeout

case WORKER_LOST_VALUE =>
PbWorkerLost.parseFrom(message.getPayload)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,29 @@

package org.apache.celeborn.common.util

import java.util.concurrent.atomic.AtomicBoolean

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging

object CelebornHadoopUtils {
object CelebornHadoopUtils extends Logging {
var logFlag = new AtomicBoolean(false)
pan3793 marked this conversation as resolved.
Show resolved Hide resolved
private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = {
val hadoopConf = new Configuration()
if (!conf.hdfsDir.isEmpty) {
val path = new Path(conf.hdfsDir)
val scheme = path.toUri.getScheme
val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme)
hadoopConf.set("dfs.replication", "2")
hadoopConf.set(disableCacheName, "false")
if (logFlag.compareAndSet(false, true)) {
logInfo(
s"Celeborn overrides some HDFS settings defined in Hadoop configuration files, including '$disableCacheName=false' and 'dfs.replication=2'. It can be overridden again in Celeborn configuration with the additional prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'")
pan3793 marked this conversation as resolved.
Show resolved Hide resolved
}
}
appendSparkHadoopConfigs(conf, hadoopConf)
hadoopConf
}
Expand All @@ -34,4 +50,8 @@ object CelebornHadoopUtils {
hadoopConf.set(key.substring("celeborn.hadoop.".length), value)
}
}

def getHadoopFS(conf: CelebornConf): FileSystem = {
new Path(conf.hdfsDir).getFileSystem(newConfiguration(conf))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,11 @@ class CelebornConfSuite extends CelebornFunSuite {
test("Test empty working dir") {
val conf = new CelebornConf()
conf.set("celeborn.storage.activeTypes", "HDFS")
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerBaseDirs.isEmpty)

conf.set("celeborn.storage.activeTypes", "SDD,HDD,HDFS")
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerBaseDirs.isEmpty)

conf.set("celeborn.storage.activeTypes", "SDD,HDD")
Expand All @@ -202,6 +204,7 @@ class CelebornConfSuite extends CelebornFunSuite {
test("Test commit file threads") {
val conf = new CelebornConf()
conf.set("celeborn.storage.activeTypes", "HDFS")
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerCommitThreads === 128)

conf.set("celeborn.storage.activeTypes", "SDD,HDD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class UtilsSuite extends CelebornFunSuite {
assert(mapperEnd == mapperEndTrans)
}

test("validate hdfs compatible fs path") {
test("validate HDFS compatible fs path") {
val hdfsPath = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x"
val simpleHdfsPath = "hdfs:///xxxx/xx-xx/x-x-x"
val sortedHdfsPath = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x.sorted"
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,5 @@ license: |
| celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. | 0.3.0 |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 |
<!--end-include-->
2 changes: 2 additions & 0 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ license: |
| celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 |
| celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 |
| celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of updating partition size for estimation. | 0.3.0 |
| celeborn.master.hdfs.remnantDirs.timeout | 1h | Timeout before HDFS remnant shuffle dirs are deleted. | 0.3.0 |
| celeborn.master.heartbeat.application.timeout | 300s | Application heartbeat timeout. | 0.3.0 |
| celeborn.master.heartbeat.worker.timeout | 120s | Worker heartbeat timeout. | 0.3.0 |
| celeborn.master.host | &lt;localhost&gt; | Hostname for master to bind. | 0.2.0 |
Expand All @@ -34,4 +35,5 @@ license: |
| celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.activeTypes` | 0.3.0 |
| celeborn.master.userResourceConsumption.update.interval | 30s | Time length for a window about compute user resource consumption. | 0.3.0 |
| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 |
<!--end-include-->
2 changes: 1 addition & 1 deletion docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ license: |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |
| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 |
| celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for read buffer per mount point. | 0.3.0 |
| celeborn.worker.closeIdleConnections | false | Whether worker will close idle connections. | 0.2.0 |
| celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
Expand Down Expand Up @@ -92,7 +93,6 @@ license: |
| celeborn.worker.storage.checkDirsEmpty.timeout | 1000ms | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | 0.3.0 |
| celeborn.worker.storage.dirs | &lt;undefined&gt; | Directory list to store shuffle data. It's recommended to configure one directory on each disk. Storage size limit can be set for each directory. For the sake of performance, there should be no more than 2 flush threads on the same disk partition if you are using HDD, and should be 8 or more flush threads on the same disk partition if you are using SSD. For example: `dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=]` | 0.2.0 |
| celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved space for each disk. | 0.3.0 |
| celeborn.worker.storage.hdfs.dir | &lt;undefined&gt; | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's working dir path name. | 0.3.0 |
| celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to close | 0.2.0 |
| celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file writer to create if its creation was failed. | 0.2.0 |
Expand Down
Loading
Loading