-
Notifications
You must be signed in to change notification settings - Fork 355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-764] Fix celeborn on HDFS might clean using app directories. #1678
Closed
Closed
Changes from 12 commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
c6b50e3
[CELEBORN-764] Fix celeborn on HDFS might clean using app directories.
FMX c407201
Update common/src/main/scala/org/apache/celeborn/common/CelebornConf.…
FMX d4e3595
update
FMX 113f7df
update.
FMX cd37620
Update common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
pan3793 b21b056
update.
FMX 344787e
Merge remote-tracking branch 'origin/CELEBORN-764' into CELEBORN-764
FMX dd6f55d
update.
FMX 84ceda8
address comments.
FMX acbcdc8
Update common/src/main/scala/org/apache/celeborn/common/util/Celeborn…
pan3793 59b99c3
Update common/src/main/scala/org/apache/celeborn/common/util/Celeborn…
pan3793 393bb53
Add new configs to avoid checking frequently.
FMX fc2d0d7
fix ut.
FMX f23aca4
update spelling and docs.
FMX ad21966
fix ut.
FMX 760b924
test on cluster.
FMX 8df0d4b
address comments.
FMX af59f07
update.
FMX 22b6989
refine.
FMX 9e47b1a
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 ff715b8
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 59d0869
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 2999a49
Update master/src/main/scala/org/apache/celeborn/service/deploy/maste…
pan3793 1d793b7
refine.
FMX d18430d
Merge remote-tracking branch 'origin/CELEBORN-764' into CELEBORN-764
FMX 2fbcc57
refine.
FMX 9499121
refine.
FMX 1643cb6
refine.
FMX 93689d9
refine.
FMX 063bc9c
refine.
FMX e6ff705
refine.
FMX File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,8 @@ import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} | |
import scala.collection.JavaConverters._ | ||
import scala.util.Random | ||
|
||
import org.apache.hadoop.fs.{FileSystem, Path} | ||
|
||
import org.apache.celeborn.common.CelebornConf | ||
import org.apache.celeborn.common.client.MasterClient | ||
import org.apache.celeborn.common.identity.UserIdentifier | ||
|
@@ -37,7 +39,7 @@ import org.apache.celeborn.common.protocol.message.{ControlMessages, StatusCode} | |
import org.apache.celeborn.common.protocol.message.ControlMessages._ | ||
import org.apache.celeborn.common.quota.{QuotaManager, ResourceConsumption} | ||
import org.apache.celeborn.common.rpc._ | ||
import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, ThreadUtils, Utils} | ||
import org.apache.celeborn.common.util.{CelebornHadoopUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils} | ||
import org.apache.celeborn.server.common.{HttpService, Service} | ||
import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager | ||
import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler} | ||
|
@@ -90,11 +92,13 @@ private[celeborn] class Master( | |
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") | ||
private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _ | ||
private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _ | ||
private var checkForHDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _ | ||
private val nonEagerHandler = ThreadUtils.newDaemonCachedThreadPool("master-noneager-handler", 64) | ||
|
||
// Config constants | ||
private val workerHeartbeatTimeoutMs = conf.workerHeartbeatTimeout | ||
private val appHeartbeatTimeoutMs = conf.appHeartbeatTimeoutMs | ||
private val hdfsRemnantDirsTimeoutMS = conf.hdfsRemnantDirsTimeoutMS | ||
|
||
private val quotaManager = QuotaManager.instantiate(conf) | ||
private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval | ||
|
@@ -141,6 +145,7 @@ private[celeborn] class Master( | |
// init and register master metrics | ||
val resourceConsumptionSource = new ResourceConsumptionSource(conf) | ||
private val masterSource = new MasterSource(conf) | ||
private var hadoopFs: FileSystem = _ | ||
masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () => | ||
statusSystem.registeredShuffle.size | ||
} | ||
|
@@ -181,6 +186,19 @@ private[celeborn] class Master( | |
0, | ||
appHeartbeatTimeoutMs / 2, | ||
TimeUnit.MILLISECONDS) | ||
|
||
if (conf.hasHDFSStorage) { | ||
checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleAtFixedRate( | ||
new Runnable { | ||
override def run(): Unit = Utils.tryLogNonFatalError { | ||
self.send(CheckForHDFSRemanetDirsTimeout) | ||
} | ||
}, | ||
hdfsRemnantDirsTimeoutMS, | ||
hdfsRemnantDirsTimeoutMS, | ||
TimeUnit.MILLISECONDS) | ||
} | ||
|
||
} | ||
|
||
override def onStop(): Unit = { | ||
|
@@ -191,6 +209,9 @@ private[celeborn] class Master( | |
if (checkForApplicationTimeOutTask != null) { | ||
checkForApplicationTimeOutTask.cancel(true) | ||
} | ||
if (checkForHDFSRemnantDirsTimeOutTask != null) { | ||
checkForHDFSRemnantDirsTimeOutTask.cancel(true) | ||
} | ||
forwardMessageThread.shutdownNow() | ||
logInfo("Celeborn Master is stopped.") | ||
} | ||
|
@@ -215,6 +236,8 @@ private[celeborn] class Master( | |
executeWithLeaderChecker(null, timeoutDeadWorkers()) | ||
case CheckForApplicationTimeOut => | ||
executeWithLeaderChecker(null, timeoutDeadApplications()) | ||
case CheckForHDFSRemanetDirsTimeout => | ||
executeWithLeaderChecker(null, cleanExpiredAppDirsOnHDFS()) | ||
case pb: PbWorkerLost => | ||
val host = pb.getHost | ||
val rpcPort = pb.getRpcPort | ||
|
@@ -653,11 +676,34 @@ private[celeborn] class Master( | |
override def run(): Unit = { | ||
statusSystem.handleAppLost(appId, requestId) | ||
logInfo(s"Removed application $appId") | ||
cleanExpiredAppDirsOnHDFS(appId) | ||
context.reply(ApplicationLostResponse(StatusCode.SUCCESS)) | ||
} | ||
}) | ||
} | ||
|
||
private def cleanExpiredAppDirsOnHDFS(dir: String = ""): Unit = { | ||
if (hadoopFs == null) { | ||
hadoopFs = CelebornHadoopUtils.getHadoopFS(conf) | ||
} | ||
val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir) | ||
if (hadoopFs.exists(hdfsWorkPath)) { | ||
if (!dir.isEmpty) { | ||
// delete specific app dir on application lost | ||
hadoopFs.delete(new Path(hdfsWorkPath, dir), true) | ||
} else { | ||
val iter = hadoopFs.listStatusIterator(hdfsWorkPath) | ||
while (iter.hasNext) { | ||
val fileStatus = iter.next() | ||
if (!statusSystem.appHeartbeatTime.contains(fileStatus.getPath.getName)) { | ||
logDebug(s"Clean hdfs dir ${fileStatus.getPath.toString}") | ||
pan3793 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
hadoopFs.delete(fileStatus.getPath, true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should try catch the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
private def handleHeartbeatFromApplication( | ||
context: RpcCallContext, | ||
appId: String, | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The storage namespace should be used, and we'd better emphasize it applies to app level in the name
Or
celeborn.master.storage.hdfs.appRemnantDirs.expiredDuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the ding-ding group, I explained the reason why I choose this name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu do you have suggestion for its name?