Skip to content

Commit

Permalink
[CELEBORN-764] Fix celeborn on HDFS might clean using app directories
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Make Celeborn leader clean expired app dirs on HDFS when an application is Lost.

### Why are the changes needed?
If Celeborn is working on HDFS, the storage manager starts and cleans expired app directories, and the newly created worker will want to delete any unknown app directories.
This will cause using app directories to be deleted unexpectedly.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
UT and cluster.

Closes #1678 from FMX/CELEBORN-764.

Lead-authored-by: mingji <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Co-authored-by: Ethan Feng <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
2 people authored and waitinfuture committed Jul 5, 2023
1 parent 4300835 commit d0ecf83
Show file tree
Hide file tree
Showing 17 changed files with 145 additions and 66 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ celeborn.worker.commitFiles.threads 128
celeborn.master.slot.assign.policy roundrobin
celeborn.rpc.askTimeout 240s
celeborn.worker.flusher.hdfs.buffer.size 4m
celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.worker.replicate.fastFail.duration 240s

# If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false
Expand Down Expand Up @@ -175,7 +175,7 @@ celeborn.worker.commitFiles.threads 128
celeborn.master.slot.assign.policy roundrobin
celeborn.rpc.askTimeout 240s
celeborn.worker.flusher.hdfs.buffer.size 4m
celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.worker.replicate.fastFail.duration 240s

# If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false
Expand Down Expand Up @@ -261,7 +261,7 @@ spark.celeborn.client.push.replicate.enabled true
spark.sql.adaptive.localShuffleReader.enabled false

# If Celeborn is using HDFS
spark.celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
spark.celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn

# we recommend enabling aqe support to gain better performance
spark.sql.adaptive.enabled true
Expand Down
17 changes: 4 additions & 13 deletions client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,7 +29,7 @@
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.util.CelebornHadoopUtils$;
import org.apache.celeborn.common.util.CelebornHadoopUtils;
import org.apache.celeborn.common.write.PushState;

/**
Expand Down Expand Up @@ -85,18 +84,10 @@ public static FileSystem getHdfsFs(CelebornConf conf) {
if (null == hdfsFs) {
synchronized (ShuffleClient.class) {
if (null == hdfsFs) {
Configuration hdfsConfiguration = CelebornHadoopUtils$.MODULE$.newConfiguration(conf);
// enable fs cache to avoid too many fs instances
hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false");
hdfsConfiguration.set("fs.viewfs.impl.disable.cache", "false");
logger.info(
"Celeborn client will ignore cluster"
+ " settings about fs.hdfs/viewfs.impl.disable.cache and set it to false");
try {
hdfsFs = FileSystem.get(hdfsConfiguration);
} catch (IOException e) {
System.err.println("Celeborn initialize hdfs failed.");
e.printStackTrace(System.err);
hdfsFs = CelebornHadoopUtils.getHadoopFS(conf);
} catch (Exception e) {
logger.error("Celeborn initialize HDFS failed.", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public DfsPartitionReader(
// Parse this message to ensure sort is done.
} catch (IOException | InterruptedException e) {
throw new IOException(
"read shuffle file from hdfs failed, filePath: "
"read shuffle file from HDFS failed, filePath: "
+ location.getStorageInfo().getFilePath(),
e);
}
Expand Down Expand Up @@ -121,7 +121,7 @@ public DfsPartitionReader(
hdfsInputStream.readFully(offset, buffer);
} catch (IOException e) {
logger.warn(
"read hdfs {} failed will retry, error detail {}",
"read HDFS {} failed will retry, error detail {}",
location.getStorageInfo().getFilePath(),
e);
try {
Expand All @@ -135,7 +135,7 @@ public DfsPartitionReader(
hdfsInputStream.readFully(offset, buffer);
} catch (IOException ex) {
logger.warn(
"retry read hdfs {} failed, error detail {} ",
"retry read HDFS {} failed, error detail {} ",
location.getStorageInfo().getFilePath(),
e);
exception.set(ex);
Expand Down Expand Up @@ -238,7 +238,7 @@ public void close() {
try {
hdfsInputStream.close();
} catch (IOException e) {
logger.warn("close hdfs input stream failed.", e);
logger.warn("close HDFS input stream failed.", e);
}
if (results.size() > 0) {
results.forEach(ReferenceCounted::release);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void deleteAllFiles(FileSystem hdfsFs) {
} catch (Exception e) {
// ignore delete exceptions because some other workers might be deleting the directory
logger.debug(
"delete hdfs file {},{},{},{} failed {}",
"delete HDFS file {},{},{},{} failed {}",
getHdfsPath(),
getHdfsWriterSuccessPath(),
getHdfsIndexPath(),
Expand Down
1 change: 1 addition & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ enum MessageType {
PARTITION_SPLIT = 47;
REGISTER_MAP_PARTITION_TASK = 48;
HEARTBEAT_FROM_APPLICATION_RESPONSE = 49;
CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT = 50;
}

message PbStorageInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def masterSlotAssignPolicy: SlotsAssignPolicy =
SlotsAssignPolicy.valueOf(get(MASTER_SLOT_ASSIGN_POLICY))

def hasHDFSStorage: Boolean = get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name())

def hasHDFSStorage: Boolean =
get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) && get(HDFS_DIR).isDefined
def masterSlotAssignLoadAwareDiskGroupNum: Int = get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM)
def masterSlotAssignLoadAwareDiskGroupGradient: Double =
get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT)
Expand Down Expand Up @@ -692,6 +692,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientCommitFilesIgnoreExcludedWorkers: Boolean = get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS)
def clientRpcMaxParallelism: Int = get(CLIENT_RPC_MAX_PARALLELISM)
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
Expand Down Expand Up @@ -1510,6 +1511,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("300s")

val HDFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.hdfs.expireDirs.timeout")
.categories("master")
.version("0.3.0")
.doc("The timeout for a expire dirs to be deleted on HDFS.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")

val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.heartbeat.worker.timeout")
.withAlternative("celeborn.worker.heartbeat.timeout")
Expand Down Expand Up @@ -1956,11 +1965,10 @@ object CelebornConf extends Logging {
.createWithDefault(16)

val HDFS_DIR: OptionalConfigEntry[String] =
buildConf("celeborn.worker.storage.hdfs.dir")
.withAlternative("celeborn.storage.hdfs.dir")
.categories("worker")
buildConf("celeborn.storage.hdfs.dir")
.categories("worker", "master", "client")
.version("0.2.0")
.doc("HDFS dir configuration for Celeborn to access HDFS.")
.doc("HDFS base directory for Celeborn to store shuffle data.")
.stringConf
.createOptional

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.celeborn.common.protocol.message

import java.util
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -67,6 +66,8 @@ object ControlMessages extends Logging {

case object CheckForApplicationTimeOut extends Message

case object CheckForHDFSExpiredDirsTimeout extends Message

case object RemoveExpiredShuffle extends Message

/**
Expand Down Expand Up @@ -427,6 +428,9 @@ object ControlMessages extends Logging {
case CheckForApplicationTimeOut =>
new TransportMessage(MessageType.CHECK_FOR_APPLICATION_TIMEOUT, null)

case CheckForHDFSExpiredDirsTimeout =>
new TransportMessage(MessageType.CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT, null)

case RemoveExpiredShuffle =>
new TransportMessage(MessageType.REMOVE_EXPIRED_SHUFFLE, null)

Expand Down Expand Up @@ -1074,6 +1078,9 @@ object ControlMessages extends Logging {
case CHECK_FOR_APPLICATION_TIMEOUT_VALUE =>
CheckForApplicationTimeOut

case CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT_VALUE =>
CheckForHDFSExpiredDirsTimeout

case WORKER_LOST_VALUE =>
PbWorkerLost.parseFrom(message.getPayload)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,33 @@

package org.apache.celeborn.common.util

import java.io.IOException
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging

object CelebornHadoopUtils {
object CelebornHadoopUtils extends Logging {
private var logPrinted = new AtomicBoolean(false)
private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = {
val hadoopConf = new Configuration()
if (conf.hdfsDir.nonEmpty) {
val path = new Path(conf.hdfsDir)
val scheme = path.toUri.getScheme
val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme)
hadoopConf.set("dfs.replication", "2")
hadoopConf.set(disableCacheName, "false")
if (logPrinted.compareAndSet(false, true)) {
logInfo(
"Celeborn overrides some HDFS settings defined in Hadoop configuration files, " +
s"including '$disableCacheName=false' and 'dfs.replication=2'. " +
"It can be overridden again in Celeborn configuration with the additional " +
"prefix 'celeborn.hadoop.', e.g. 'celeborn.hadoop.dfs.replication=3'")
}
}
appendSparkHadoopConfigs(conf, hadoopConf)
hadoopConf
}
Expand All @@ -34,4 +54,21 @@ object CelebornHadoopUtils {
hadoopConf.set(key.substring("celeborn.hadoop.".length), value)
}
}

def getHadoopFS(conf: CelebornConf): FileSystem = {
new Path(conf.hdfsDir).getFileSystem(newConfiguration(conf))
}

def deleteHDFSPathOrLogError(hadoopFs: FileSystem, path: Path, recursive: Boolean): Unit = {
try {
val startTime = System.currentTimeMillis()
hadoopFs.delete(path, recursive)
logInfo(
s"Delete HDFS ${path}(recursive=$recursive) costs " +
Utils.msDurationToString(System.currentTimeMillis() - startTime))
} catch {
case e: IOException =>
logError(s"Failed to delete HDFS ${path}(recursive=$recursive) due to: ", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.google.protobuf.{ByteString, GeneratedMessageV3}
import io.netty.channel.unix.Errors.NativeIoException
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.fs.Path
import org.roaringbitmap.RoaringBitmap

import org.apache.celeborn.common.CelebornConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,11 @@ class CelebornConfSuite extends CelebornFunSuite {
test("Test empty working dir") {
val conf = new CelebornConf()
conf.set("celeborn.storage.activeTypes", "HDFS")
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerBaseDirs.isEmpty)

conf.set("celeborn.storage.activeTypes", "SDD,HDD,HDFS")
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerBaseDirs.isEmpty)

conf.set("celeborn.storage.activeTypes", "SDD,HDD")
Expand All @@ -202,6 +204,7 @@ class CelebornConfSuite extends CelebornFunSuite {
test("Test commit file threads") {
val conf = new CelebornConf()
conf.set("celeborn.storage.activeTypes", "HDFS")
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerCommitThreads === 128)

conf.set("celeborn.storage.activeTypes", "SDD,HDD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class UtilsSuite extends CelebornFunSuite {
assert(mapperEnd == mapperEndTrans)
}

test("validate hdfs compatible fs path") {
test("validate HDFS compatible fs path") {
val hdfsPath = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x"
val simpleHdfsPath = "hdfs:///xxxx/xx-xx/x-x-x"
val sortedHdfsPath = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x.sorted"
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,5 @@ license: |
| celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. | 0.3.0 |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 |
<!--end-include-->
2 changes: 2 additions & 0 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ license: |
| celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 |
| celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 |
| celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of updating partition size for estimation. | 0.3.0 |
| celeborn.master.hdfs.expireDirs.timeout | 1h | The timeout for a expire dirs to be deleted on HDFS. | 0.3.0 |
| celeborn.master.heartbeat.application.timeout | 300s | Application heartbeat timeout. | 0.3.0 |
| celeborn.master.heartbeat.worker.timeout | 120s | Worker heartbeat timeout. | 0.3.0 |
| celeborn.master.host | &lt;localhost&gt; | Hostname for master to bind. | 0.2.0 |
Expand All @@ -34,4 +35,5 @@ license: |
| celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.activeTypes` | 0.3.0 |
| celeborn.master.userResourceConsumption.update.interval | 30s | Time length for a window about compute user resource consumption. | 0.3.0 |
| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 |
<!--end-include-->
2 changes: 1 addition & 1 deletion docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ license: |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |
| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 |
| celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for read buffer per mount point. | 0.3.0 |
| celeborn.worker.closeIdleConnections | false | Whether worker will close idle connections. | 0.2.0 |
| celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
Expand Down Expand Up @@ -92,7 +93,6 @@ license: |
| celeborn.worker.storage.checkDirsEmpty.timeout | 1000ms | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | 0.3.0 |
| celeborn.worker.storage.dirs | &lt;undefined&gt; | Directory list to store shuffle data. It's recommended to configure one directory on each disk. Storage size limit can be set for each directory. For the sake of performance, there should be no more than 2 flush threads on the same disk partition if you are using HDD, and should be 8 or more flush threads on the same disk partition if you are using SSD. For example: `dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=]` | 0.2.0 |
| celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved space for each disk. | 0.3.0 |
| celeborn.worker.storage.hdfs.dir | &lt;undefined&gt; | HDFS dir configuration for Celeborn to access HDFS. | 0.2.0 |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's working dir path name. | 0.3.0 |
| celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to close | 0.2.0 |
| celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file writer to create if its creation was failed. | 0.2.0 |
Expand Down
Loading

0 comments on commit d0ecf83

Please sign in to comment.