-
Notifications
You must be signed in to change notification settings - Fork 355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-764] Fix celeborn on HDFS might clean using app directories. #1678
Conversation
Codecov Report
@@ Coverage Diff @@
## main #1678 +/- ##
==========================================
+ Coverage 46.22% 46.27% +0.05%
==========================================
Files 161 161
Lines 9957 9990 +33
Branches 920 924 +4
==========================================
+ Hits 4602 4622 +20
- Misses 5051 5061 +10
- Partials 304 307 +3
... and 4 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@@ -534,7 +534,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs | |||
val iter = hadoopFs.listStatusIterator(hdfsWorkPath) | |||
while (iter.hasNext) { | |||
val fileStatus = iter.next() | |||
if (!appIds.contains(fileStatus.getPath.getName)) { | |||
if (!appIds.contains(fileStatus.getPath.getName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getModificationTime will not reflect the change in nested directory. For example I have path /tmp/test/, then I upload a new file into /tmp/test, the modified time of /tmp will not change.
IMO, HDFS directory does not belong to worker, maybe we should let Master to clean hdfs. cc @pan3793 @RexXiong @AngersZhuuuu
dec6671
to
998c20d
Compare
We'd better change the doc of hdfs path configuration to mention this change and let user make sure the configuration is same in master and worker side |
Updated. |
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Outdated
Show resolved
Hide resolved
common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
Outdated
Show resolved
Hide resolved
@@ -1072,4 +1074,20 @@ object Utils extends Logging { | |||
} | |||
labelPart(0).trim -> labelPart(1).trim | |||
} | |||
|
|||
def getHadoopFS(conf: CelebornConf): FileSystem = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving it to CelebornHadoopUtils
? and we should use CelebornHadoopUtils#newConfiguration
instead of new Configuration()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here needs an empty configuration so that celeborn conf can override pre-defined settings. So It can not be moved to CelebornHadoopUtils.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, the configuration priority here is
- CelebornConf
- hardcoded
- core-site.xml, hdfs-site.xml
It changes the current behavior but looks reasonable. I'm wondering if we can change the CelebornHadoopUtils#newConfiguration
and pass a Map to achieve this ability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, we'd better to document this behavior in some place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments addressed.
.withAlternative("celeborn.storage.hdfs.dir") | ||
.categories("worker") | ||
buildConf("celeborn.storage.hdfs.dir") | ||
.withAlternative("celeborn.worker.storage.hdfs.dir") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we don't need this alternative, it was called celeborn.storage.hdfs.dir
in 0.2
…scala Update common/src/main/scala/org/apache/celeborn/common/util/Utils.scala Co-authored-by: Cheng Pan <[email protected]>
common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
Outdated
Show resolved
Hide resolved
@@ -138,6 +138,12 @@ public void updateAppHeartbeatMeta(String appId, long time, long totalWritten, l | |||
partitionTotalFileCount.add(fileCount); | |||
} | |||
|
|||
public Set<String> getActiveAppIds() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to use appHeartbeatTime map to get active appIds. Master may encounter corner case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, We can directly use appHeartbeatTime keySet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, We can directly use appHeartbeatTime keySet.
+1, one application may have many shuffleKeys
@@ -653,11 +656,33 @@ private[celeborn] class Master( | |||
override def run(): Unit = { | |||
statusSystem.handleAppLost(appId, requestId) | |||
logInfo(s"Removed application $appId") | |||
// only leader can clean hdfs dirs | |||
if (conf.hasHDFSStorage && !conf.hdfsDir.isEmpty) { | |||
cleanExpiredAppDirsOnHDFS() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may cost lot as applications may lost frequently in a big cluster, we would better not clean expired app dirs in handleApplicationLost, instead we need do this in timeoutDeadApplications after we handleApplicationLost. And if this cost lot, I suggest we can cache this list and refresh every x(3) min turn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable. I'll move the clean logic to timeoutDeadApplications
. I think listing directories won't cost a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may cost lot as applications may lost frequently in a big cluster, we would better not clean expired app dirs in handleApplicationLost, instead we need do this in timeoutDeadApplications after we handleApplicationLost. And if this cost lot, I suggest we can cache this list and refresh every x(3) min turn?
I also think it's not a good idea to call cleanExpiredAppDirsOnHDFS
when app lost. IMO we can just use forwardMessageThread.scheduleAtFixedRate
to check like checkForWorkerTimeOutTask
and checkForApplicationTimeOutTask
. Also we need to change forwardMessageThread
from single thread to multiple threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we need to ensure that the leader has replayed all raft logs before cleanup.
common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
Outdated
Show resolved
Hide resolved
common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
Outdated
Show resolved
Hide resolved
…HadoopUtils.scala
common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
Outdated
Show resolved
Hide resolved
…HadoopUtils.scala
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
Outdated
Show resolved
Hide resolved
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
Outdated
Show resolved
Hide resolved
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, except to log on deleting
@@ -1509,6 +1510,14 @@ object CelebornConf extends Logging { | |||
.timeConf(TimeUnit.MILLISECONDS) | |||
.createWithDefaultString("300s") | |||
|
|||
val HDFS_REMNANTDIRS_TIMEOUT: ConfigEntry[Long] = | |||
buildConf("celeborn.master.hdfs.remnantDirs.timeout") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The storage namespace should be used, and we'd better emphasize it applies to app level in the name
buildConf("celeborn.master.hdfs.remnantDirs.timeout") | |
buildConf("celeborn.master.storage.hdfs.appDirs.expiredDuration") |
Or celeborn.master.storage.hdfs.appRemnantDirs.expiredDuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the ding-ding group, I explained the reason why I choose this name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu do you have suggestion for its name?
common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
Outdated
Show resolved
Hide resolved
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
Outdated
Show resolved
Hide resolved
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
Outdated
Show resolved
Hide resolved
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
Outdated
Show resolved
Hide resolved
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
Outdated
Show resolved
Hide resolved
val startTime = System.currentTimeMillis() | ||
val fileStatus = iter.next() | ||
if (!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) { | ||
hadoopFs.delete(fileStatus.getPath, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should try catch the IOException
and print the failed to delete dir in case HDFS is abnormal or some permission deny issues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
# Conflicts: # master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
Outdated
Show resolved
Hide resolved
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
Outdated
Show resolved
Hide resolved
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
Outdated
Show resolved
Hide resolved
common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
Outdated
Show resolved
Hide resolved
common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
Outdated
Show resolved
Hide resolved
### What changes were proposed in this pull request? Make Celeborn leader clean expired app dirs on HDFS when an application is Lost. ### Why are the changes needed? If Celeborn is working on HDFS, the storage manager starts and cleans expired app directories, and the newly created worker will want to delete any unknown app directories. This will cause using app directories to be deleted unexpectedly. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT and cluster. Closes #1678 from FMX/CELEBORN-764. Lead-authored-by: mingji <[email protected]> Co-authored-by: Cheng Pan <[email protected]> Co-authored-by: Ethan Feng <[email protected]> Signed-off-by: zky.zhoukeyong <[email protected]> (cherry picked from commit d0ecf83) Signed-off-by: zky.zhoukeyong <[email protected]>
What changes were proposed in this pull request?
Make Celeborn leader clean expired app dirs on HDFS when an application is Lost.
Why are the changes needed?
If Celeborn is working on HDFS, the storage manager starts and cleans expired app directories, and the newly created worker will want to delete any unknown app directories.
This will cause using app directories to be deleted unexpectedly.
Does this PR introduce any user-facing change?
NO.
How was this patch tested?
UT and cluster.