Skip to content

Commit 997e599

Browse files
attilapirosdongjoon-hyun
authored andcommitted
[SPARK-43221][CORE] Host local block fetching should use a block status of a block stored on disk
Thanks for yorksity who reported this error and even provided a PR for it. This solution very different from apache#40883 as `BlockManagerMasterEndpoint#getLocationsAndStatus()` needed some refactoring. ### What changes were proposed in this pull request? This PR fixes an error which can be manifested in the following exception: ``` 25/02/20 09:58:31 ERROR util.Utils: [Executor task launch worker for task 61.0 in stage 67.0 (TID 9391)]: Exception encountered java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:185) ~[spark-core_2.12-3.3.2.3.3.7190.5-2.jar:3.3.2.3.3.7190.5-2] at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) ~[scala-library-2.12.15.jar:?] at scala.collection.immutable.List.foreach(List.scala:431) ~[scala-library-2.12.15.jar:?] at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:171) ~[spark-core_2.12-3.3.2.3.3.7190.5-2.jar:3.3.2.3.3.7190.5-2] ``` The PR is changing `BlockManagerMasterEndpoint#getLocationsAndStatus()`. The `BlockManagerMasterEndpoint#getLocationsAndStatus()` function is giving back an optional `BlockLocationsAndStatus` which consist of 3 parts: - `locations`: all the locations where the block can be found (as a sequence of block manager IDs) - `status`: one block status - `localDirs`: optional directory paths which can be used to read block if the block is found in the disk of an executor running on the same host The block (either RDD blocks, shuffle blocks or torrent blocks) can be stored in many executors with different storage levels: disk or memory. This PR changing how the block status and the block manager ID for the `localDirs` is found to guarantee they belong together. ### Why are the changes needed? Before this PR the `BlockManagerMasterEndpoint#getLocationsAndStatus()` was searching for the block status (`status`) and the `localDirs` separately. The block status actually was computed as the very first one where the block can be found. This way it can easily happen this block status was representing an in-memory block (where the disk size is 0 as it is stored in the memory) but the `localDirs` was filled out based on a host local block instance which was stored on disk. This situation can be very frequent but only causing problems (exceptions as above) when encryption is on (spark.io.encryption.enabled=true) as for a not encrypted block the whole file containing the block is read, see https://github.com/apache/spark/blob/branch-3.5/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1244 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Host local block fetching was already covered by some existing unit tests but a new unit test is provided for this exact case: "SPARK-43221: Host local block fetching should use a block status with disk size". The number of block mangers and the order of the blocks was chosen after some experimentation as the block status order is depends on a `HashSet`, see: ``` private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] ``` This test was executed with the old code too to validate the issue is reproduced: ``` BlockManagerSuite: OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended - SPARK-43221: Host local block fetching should use a block status with disk size *** FAILED *** 0 was not greater than 0 The block size must be greater than 0 for a nonempty block! (BlockManagerSuite.scala:491) Run completed in 6 seconds, 705 milliseconds. Total number of tests run: 1 Suites: completed 1, aborted 0 Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0 *** 1 TEST FAILED *** ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50122 from attilapiros/SPARK-43221. Authored-by: attilapiros <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 4b9b246 commit 997e599

File tree

2 files changed

+63
-24
lines changed

2 files changed

+63
-24
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

+43-24
Original file line numberDiff line numberDiff line change
@@ -862,31 +862,50 @@ class BlockManagerMasterEndpoint(
862862
private def getLocationsAndStatus(
863863
blockId: BlockId,
864864
requesterHost: String): Option[BlockLocationsAndStatus] = {
865-
val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty)
866-
val status = locations.headOption.flatMap { bmId =>
867-
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
868-
blockStatusByShuffleService.get(bmId).flatMap(m => m.get(blockId))
869-
} else {
870-
blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId))
865+
val allLocations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty)
866+
val hostLocalLocations = allLocations.filter(bmId => bmId.host == requesterHost)
867+
868+
val blockStatusWithBlockManagerId: Option[(BlockStatus, BlockManagerId)] =
869+
(if (externalShuffleServiceRddFetchEnabled) {
870+
// if fetching RDD is enabled from the external shuffle service then first try to find
871+
// the block in the external shuffle service of the same host
872+
val location = hostLocalLocations.find(_.port == externalShuffleServicePort)
873+
location
874+
.flatMap(blockStatusByShuffleService.get(_).flatMap(_.get(blockId)))
875+
.zip(location)
876+
} else {
877+
None
878+
})
879+
.orElse {
880+
// if the block is not found via the external shuffle service trying to find it in the
881+
// executors running on the same host and persisted on the disk
882+
// using flatMap on iterators makes the transformation lazy
883+
hostLocalLocations.iterator
884+
.flatMap { bmId =>
885+
blockManagerInfo.get(bmId).flatMap { blockInfo =>
886+
blockInfo.getStatus(blockId).map((_, bmId))
887+
}
888+
}
889+
.find(_._1.storageLevel.useDisk)
890+
}
891+
.orElse {
892+
// if the block cannot be found in the same host search it in all the executors
893+
val location = allLocations.headOption
894+
location.flatMap(blockManagerInfo.get(_)).flatMap(_.getStatus(blockId)).zip(location)
895+
}
896+
logDebug(s"Identified block: $blockStatusWithBlockManagerId")
897+
blockStatusWithBlockManagerId
898+
.map { case (blockStatus: BlockStatus, bmId: BlockManagerId) =>
899+
if (bmId.host == requesterHost && blockStatus.storageLevel.useDisk) {
900+
BlockLocationsAndStatus(
901+
allLocations,
902+
blockStatus,
903+
Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)))
904+
} else {
905+
BlockLocationsAndStatus(allLocations, blockStatus, None)
906+
}
871907
}
872-
}
873-
874-
if (locations.nonEmpty && status.isDefined) {
875-
val localDirs = locations.find { loc =>
876-
// When the external shuffle service running on the same host is found among the block
877-
// locations then the block must be persisted on the disk. In this case the executorId
878-
// can be used to access this block even when the original executor is already stopped.
879-
loc.host == requesterHost &&
880-
(loc.port == externalShuffleServicePort ||
881-
blockManagerInfo
882-
.get(loc)
883-
.flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk))
884-
.getOrElse(false))
885-
}.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) }
886-
Some(BlockLocationsAndStatus(locations, status.get, localDirs))
887-
} else {
888-
None
889-
}
908+
.orElse(None)
890909
}
891910

892911
private def getLocationsMultipleBlockIds(

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

+20
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,26 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
474474
assert(!BlockManagerId("notADriverIdentifier", "XXX", 1).isDriver)
475475
}
476476

477+
test("SPARK-43221: Host local block fetching should use a block status with disk size") {
478+
conf.set(IO_ENCRYPTION_ENABLED, true)
479+
conf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
480+
val store1 = makeBlockManager(2000, "exec1")
481+
val store2 = makeBlockManager(2000, "exec2")
482+
val store3 = makeBlockManager(2000, "exec3")
483+
val store4 = makeBlockManager(2000, "exec4")
484+
val value = new Array[Byte](100)
485+
val broadcastId = BroadcastBlockId(0)
486+
store1.putSingle(broadcastId, value, StorageLevel.MEMORY_ONLY, tellMaster = true)
487+
store2.putSingle(broadcastId, value, StorageLevel.MEMORY_ONLY, tellMaster = true)
488+
store3.putSingle(broadcastId, value, StorageLevel.DISK_ONLY, tellMaster = true)
489+
store4.getRemoteBytes(broadcastId) match {
490+
case Some(block) =>
491+
assert(block.size > 0, "The block size must be greater than 0 for a nonempty block!")
492+
case None =>
493+
assert(false, "Block not found!")
494+
}
495+
}
496+
477497
test("master + 1 manager interaction") {
478498
val store = makeBlockManager(20000)
479499
val a1 = new Array[Byte](4000)

0 commit comments

Comments
 (0)