diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index ebdc000044052..bbdad278cbdf0 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -30,6 +30,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, LogOffsetMetadata} +import java.util import scala.collection._ import scala.jdk.CollectionConverters._ @@ -39,7 +40,7 @@ import scala.jdk.CollectionConverters._ */ class DelayedFetch( params: FetchParams, - fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], + fetchPartitionStatus: util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus], replicaManager: ReplicaManager, quota: ReplicaQuota, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit @@ -66,8 +67,7 @@ class DelayedFetch( */ override def tryComplete(): Boolean = { var accumulatedSize = 0 - fetchPartitionStatus.foreach { - case (topicIdPartition, fetchStatus) => + fetchPartitionStatus.forEach { (topicIdPartition, fetchStatus) => val fetchOffset = fetchStatus.startOffsetMetadata val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch try { @@ -154,9 +154,9 @@ class DelayedFetch( * Upon completion, read whatever data is available and pass to the complete callback */ override def onComplete(): Unit = { - val fetchInfos = fetchPartitionStatus.map { case (tp, status) => + val fetchInfos = fetchPartitionStatus.asScala.map { case (tp, status) => tp -> status.fetchInfo - } + }.toSeq val logReadResults = replicaManager.readFromLog( params, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1d9d076a317df..e033a5e396255 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1628,7 +1628,7 @@ class ReplicaManager(val config: KafkaConfig, params: FetchParams, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, logReadResults: util.LinkedHashMap[TopicIdPartition, LogReadResult], - fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = { + fetchPartitionStatus: util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]): Unit = { val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]] val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]] @@ -1643,7 +1643,7 @@ class ReplicaManager(val config: KafkaConfig, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs, - fetchPartitionStatus.toMap.asJava, + fetchPartitionStatus, params, logReadResults, tp => getPartitionOrException(tp), @@ -1710,17 +1710,17 @@ class ReplicaManager(val config: KafkaConfig, responseCallback(fetchPartitionData) } else { // construct the fetch results from the read results - val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)] + val fetchPartitionStatus = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] fetchInfos.foreach { case (topicIdPartition, partitionData) => val logReadResult = logReadResultMap.get(topicIdPartition) if (logReadResult != null) { val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata - fetchPartitionStatus += (topicIdPartition -> new FetchPartitionStatus(logOffsetMetadata, partitionData)) + fetchPartitionStatus.put(topicIdPartition, new FetchPartitionStatus(logOffsetMetadata, partitionData)) } } if (!remoteFetchInfos.isEmpty) { - processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResultMap, fetchPartitionStatus.toSeq) + processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResultMap, fetchPartitionStatus) } else { // If there is not enough data to respond and there is no remote data, we will let the fetch request // wait for new data. @@ -1733,7 +1733,7 @@ class ReplicaManager(val config: KafkaConfig, ) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList + val delayedFetchKeys = fetchPartitionStatus.asScala.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList // try to complete the request immediately, otherwise put it into the purgatory; // this is because while the delayed fetch operation is being created, new requests diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index fa3b8465d651f..c232967c01b2d 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -34,6 +34,8 @@ import org.junit.jupiter.params.provider.ValueSource import org.mockito.ArgumentMatchers.{any, anyInt} import org.mockito.Mockito.{mock, when} +import java.util + class DelayedFetchTest { private val maxBytes = 1024 private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) @@ -59,7 +61,7 @@ class DelayedFetchTest { val delayedFetch = new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, fetchStatus), replicaManager = replicaManager, quota = replicaQuota, responseCallback = callback @@ -105,7 +107,7 @@ class DelayedFetchTest { val delayedFetch = new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, fetchStatus), replicaManager = replicaManager, quota = replicaQuota, responseCallback = callback @@ -145,7 +147,7 @@ class DelayedFetchTest { val delayedFetch = new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, fetchStatus), replicaManager = replicaManager, quota = replicaQuota, responseCallback = callback @@ -196,7 +198,7 @@ class DelayedFetchTest { val delayedFetch = new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, fetchStatus), replicaManager = replicaManager, quota = replicaQuota, responseCallback = callback @@ -267,4 +269,9 @@ class DelayedFetchTest { error) } + def createFetchPartitionStatusMap(tpId: TopicIdPartition, status: FetchPartitionStatus): util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] = { + val map = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] + map.put(tpId, status) + map + } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 307afad4f5f45..359f0fdfdf56d 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -40,6 +40,7 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong} import org.mockito.Mockito.{mock, when} import org.mockito.{AdditionalMatchers, ArgumentMatchers} +import java.util import scala.jdk.CollectionConverters._ class ReplicaManagerQuotasTest { @@ -186,7 +187,7 @@ class ReplicaManagerQuotasTest { new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(tp -> fetchPartitionStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(tp, fetchPartitionStatus), replicaManager = replicaManager, quota = null, responseCallback = null @@ -237,7 +238,7 @@ class ReplicaManagerQuotasTest { new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(tidp -> fetchPartitionStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(tidp, fetchPartitionStatus), replicaManager = replicaManager, quota = null, responseCallback = null @@ -341,4 +342,9 @@ class ReplicaManagerQuotasTest { quota } + def createFetchPartitionStatusMap(tpId: TopicIdPartition, status: FetchPartitionStatus): util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] = { + val map = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] + map.put(tpId, status) + map + } }