diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala index 611686fcf9..8243f65227 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala @@ -86,7 +86,14 @@ class LocalDeviceMonitor( .groupBy(_.deviceInfo) .foreach { case (deviceInfo: DeviceInfo, diskInfos: List[DiskInfo]) => val deviceLabel = Map("device" -> deviceInfo.name) - def usage: DeviceMonitor.DiskUsageInfo = DeviceMonitor.getDiskUsageInfos(diskInfos.head) + def usage: DeviceMonitor.DiskUsageInfo = + try { + DeviceMonitor.getDiskUsageInfos(diskInfos.head) + } catch { + case t: Throwable => + logError("Device monitor get usage infos failed.", t) + DeviceMonitor.DiskUsageInfo(0L, 0L, 0L, 0) + } workerSource.addGauge(WorkerSource.DEVICE_OS_TOTAL_CAPACITY, deviceLabel) { () => usage.totalSpace } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 00e0a28878..ce5d0d94c0 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -727,15 +727,19 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1) while (retryTimes < conf.workerCheckFileCleanMaxRetries) { val localCleaned = - !disksSnapshot().filter(_.status != DiskStatus.IO_HANG).exists { diskInfo => - diskInfo.dirs.exists { - case workingDir if workingDir.exists() => - // Don't check appDirs that store information in the fileInfos - workingDir.listFiles().exists(appDir => !appIds.contains(appDir.getName)) - case _ => - false + !disksSnapshot() + .filter(diskInfo => + diskInfo.status != DiskStatus.IO_HANG || diskInfo.status != DiskStatus.READ_OR_WRITE_FAILURE) + .exists { + diskInfo => + diskInfo.dirs.exists { + case workingDir if workingDir.exists() => + // Don't check appDirs that store information in the fileInfos + workingDir.listFiles().exists(appDir => !appIds.contains(appDir.getName)) + case _ => + false + } } - } val dfsCleaned = hadoopFs match { case dfs: FileSystem => @@ -853,33 +857,43 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } def updateDiskInfos(): Unit = this.synchronized { - disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo => - val totalUsage = diskInfo.dirs.map { dir => - val writers = workingDirWriters.get(dir) - if (writers != null) { - writers.synchronized { - writers.values.asScala.map(_.getDiskFileInfo.getFileLength).sum + disksSnapshot() + .filter(diskInfo => + diskInfo.status != DiskStatus.IO_HANG || diskInfo.status != DiskStatus.READ_OR_WRITE_FAILURE) + .foreach { + diskInfo => + val totalUsage = diskInfo.dirs.map { dir => + val writers = workingDirWriters.get(dir) + if (writers != null) { + writers.synchronized { + writers.values.asScala.map(_.getDiskFileInfo.getFileLength).sum + } + } else { + 0 + } + }.sum + + try { + val (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace) = + getFileSystemReportedSpace(diskInfo.mountPoint) + val workingDirUsableSpace = + Math.min(diskInfo.configuredUsableSpace - totalUsage, fileSystemReportedUsableSpace) + val minimumReserveSize = + DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, diskReserveRatio) + val usableSpace = Math.max(workingDirUsableSpace - minimumReserveSize, 0) + logDebug( + s"Update diskInfo:${diskInfo.mountPoint} workingDirUsableSpace:$workingDirUsableSpace fileMeta:$fileSystemReportedUsableSpace" + + s"conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage totalSpace:$fileSystemReportedTotalSpace" + + s"minimumReserveSize:$minimumReserveSize usableSpace:$usableSpace") + diskInfo.setUsableSpace(usableSpace) + diskInfo.setTotalSpace(fileSystemReportedTotalSpace) + } catch { + case t: Throwable => + logError(s"Update diskInfo:${diskInfo.mountPoint} failed.", t) } - } else { - 0 - } - }.sum - - val (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace) = - getFileSystemReportedSpace(diskInfo.mountPoint) - val workingDirUsableSpace = - Math.min(diskInfo.configuredUsableSpace - totalUsage, fileSystemReportedUsableSpace) - val minimumReserveSize = - DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, diskReserveRatio) - val usableSpace = Math.max(workingDirUsableSpace - minimumReserveSize, 0) - logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace filemeta:$fileSystemReportedUsableSpace" + - s"conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage totalSpace:$fileSystemReportedTotalSpace" + - s"minimumReserveSize:$minimumReserveSize usableSpace:$usableSpace") - diskInfo.setUsableSpace(usableSpace) - diskInfo.setTotalSpace(fileSystemReportedTotalSpace) - diskInfo.updateFlushTime() - diskInfo.updateFetchTime() - } + diskInfo.updateFlushTime() + diskInfo.updateFetchTime() + } logInfo(s"Updated diskInfos:\n${disksSnapshot().mkString("\n")}") }