Skip to content

Commit

Permalink
[CELEBORN-764] Fix celeborn on HDFS might clean using app directories.
Browse files Browse the repository at this point in the history
  • Loading branch information
FMX committed Jul 5, 2023
1 parent de0fd8c commit 998c20d
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 36 deletions.
14 changes: 3 additions & 11 deletions client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,7 +29,7 @@
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.util.CelebornHadoopUtils$;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.write.PushState;

/**
Expand Down Expand Up @@ -85,16 +84,9 @@ public static FileSystem getHdfsFs(CelebornConf conf) {
if (null == hdfsFs) {
synchronized (ShuffleClient.class) {
if (null == hdfsFs) {
Configuration hdfsConfiguration = CelebornHadoopUtils$.MODULE$.newConfiguration(conf);
// enable fs cache to avoid too many fs instances
hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false");
hdfsConfiguration.set("fs.viewfs.impl.disable.cache", "false");
logger.info(
"Celeborn client will ignore cluster"
+ " settings about fs.hdfs/viewfs.impl.disable.cache and set it to false");
try {
hdfsFs = FileSystem.get(hdfsConfiguration);
} catch (IOException e) {
hdfsFs = Utils.getHadoopFS(conf);
} catch (Exception e) {
System.err.println("Celeborn initialize hdfs failed.");
e.printStackTrace(System.err);
}
Expand Down
18 changes: 18 additions & 0 deletions common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.google.protobuf.{ByteString, GeneratedMessageV3}
import io.netty.channel.unix.Errors.NativeIoException
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.roaringbitmap.RoaringBitmap

import org.apache.celeborn.common.CelebornConf
Expand Down Expand Up @@ -1072,4 +1074,20 @@ object Utils extends Logging {
}
labelPart(0).trim -> labelPart(1).trim
}

def getHadoopFS(conf: CelebornConf): FileSystem = {
val path = new Path(conf.hdfsDir)
val scheme = path.toUri.getScheme
val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme)
val hdfsConfiguration = new Configuration()
hdfsConfiguration.set("dfs.replication", "2")
hdfsConfiguration.set(disableCacheName, "false")
for (elem <- CelebornHadoopUtils.newConfiguration(conf).iterator().asScala) {
hdfsConfiguration.set(elem.getKey, elem.getValue)
}
logInfo("Celeborn will ignore cluster settings " +
disableCacheName + " and set it to false")
path.getFileSystem(hdfsConfiguration)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, l
partitionTotalFileCount.add(fileCount);
}

public Set<String> getActiveAppIds() {
return registeredShuffle.stream()
.map(key -> Utils.splitShuffleKey(key)._1)
.collect(Collectors.toSet());
}

public void updateAppLostMeta(String appId) {
registeredShuffle.stream()
.filter(shuffle -> shuffle.startsWith(appId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.Random

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.identity.UserIdentifier
Expand Down Expand Up @@ -141,6 +143,7 @@ private[celeborn] class Master(
// init and register master metrics
val resourceConsumptionSource = new ResourceConsumptionSource(conf)
private val masterSource = new MasterSource(conf)
private var hadoopFs: FileSystem = _
masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
statusSystem.registeredShuffle.size
}
Expand Down Expand Up @@ -653,11 +656,33 @@ private[celeborn] class Master(
override def run(): Unit = {
statusSystem.handleAppLost(appId, requestId)
logInfo(s"Removed application $appId")
// only leader can clean hdfs dirs
if (conf.hasHDFSStorage && !conf.hdfsDir.isEmpty) {
cleanExpiredAppDirsOnHDFS()
}
context.reply(ApplicationLostResponse(StatusCode.SUCCESS))
}
})
}

private def cleanExpiredAppDirsOnHDFS(): Unit = {
val activeAppIds = statusSystem.getActiveAppIds
if (hadoopFs == null) {
hadoopFs = Utils.getHadoopFS(conf)
}
val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir)
if (hadoopFs.exists(hdfsWorkPath)) {
val iter = hadoopFs.listStatusIterator(hdfsWorkPath)
while (iter.hasNext) {
val fileStatus = iter.next()
if (!activeAppIds.contains(fileStatus.getPath.getName)) {
logDebug(s"Clean hdfs dir ${fileStatus.getPath.toString}")
hadoopFs.delete(fileStatus.getPath, true)
}
}
}
}

private def handleHeartbeatFromApplication(
context: RpcCallContext,
appId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,12 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
deviceMonitor.startCheck()

val hdfsDir = conf.hdfsDir
if (!hdfsDir.isEmpty && conf.hasHDFSStorage) {
logInfo(s"Initialize HDFS support with path ${hdfsDir}")
}
val hdfsPermission = new FsPermission("755")
val hdfsWriters = JavaUtils.newConcurrentHashMap[String, FileWriter]()
val (hdfsFlusher, _totalHdfsFlusherThread) =
if (!hdfsDir.isEmpty && conf.hasHDFSStorage) {
val path = new Path(hdfsDir)
val scheme = path.toUri.getScheme
val disableCacheName = String.format("fs.%s.impl.disable.cache", scheme)
val hdfsConfiguration = CelebornHadoopUtils.newConfiguration(conf)
hdfsConfiguration.set("dfs.replication", "2")
hdfsConfiguration.set(disableCacheName, "false")
logInfo("Celeborn will ignore cluster settings " +
disableCacheName + " and set it to false")
StorageManager.hadoopFs = path.getFileSystem(hdfsConfiguration)
logInfo(s"Initialize HDFS support with path ${hdfsDir}")
StorageManager.hadoopFs = Utils.getHadoopFS(conf)
(
Some(new HdfsFlusher(
workerSource,
Expand Down Expand Up @@ -527,19 +517,6 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
case _ => // do nothing
}
}

if (hadoopFs != null) {
val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir)
if (hadoopFs.exists(hdfsWorkPath)) {
val iter = hadoopFs.listStatusIterator(hdfsWorkPath)
while (iter.hasNext) {
val fileStatus = iter.next()
if (!appIds.contains(fileStatus.getPath.getName)) {
hadoopFs.delete(fileStatus.getPath, true)
}
}
}
}
}

private def deleteDirectory(dir: File, threadPool: ThreadPoolExecutor): Unit = {
Expand Down

0 comments on commit 998c20d

Please sign in to comment.