Skip to content

Commit 76a1c83

Browse files
authored
KAFKA-19340 Move DelayedRemoteFetch to the storage module (#19876)
Move DelayedRemoteFetch to the storage module and rewrite it to java. Reviewers: Mickael Maison <[email protected]>, Kamal Chandraprakash <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 4768585 commit 76a1c83

File tree

17 files changed

+781
-776
lines changed

17 files changed

+781
-776
lines changed

checkstyle/import-control-storage.xml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@
8484
</subpackage>
8585
</subpackage>
8686
</subpackage>
87+
88+
<subpackage name="purgatory">
89+
<allow pkg="org.apache.kafka.server.storage.log" />
90+
</subpackage>
8791
</subpackage>
8892

8993
<subpackage name="storage.internals">
@@ -164,7 +168,6 @@
164168
<allow pkg="org.apache.kafka.server.log.remote.storage" />
165169
<allow pkg="scala.jdk.javaapi" />
166170
<allow pkg="org.apache.kafka.test" />
167-
168171
</subpackage>
169172

170173
</import-control>

core/src/main/java/kafka/server/share/DelayedShareFetch.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.kafka.common.requests.FetchRequest;
3131
import org.apache.kafka.common.utils.Time;
3232
import org.apache.kafka.raft.errors.NotLeaderException;
33-
import org.apache.kafka.server.LogReadResult;
3433
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
3534
import org.apache.kafka.server.purgatory.DelayedOperation;
3635
import org.apache.kafka.server.share.SharePartitionKey;
@@ -46,6 +45,7 @@
4645
import org.apache.kafka.storage.internals.log.FetchDataInfo;
4746
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
4847
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
48+
import org.apache.kafka.storage.internals.log.LogReadResult;
4949
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
5050
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
5151

@@ -852,13 +852,12 @@ private void completeRemoteStorageShareFetchRequest() {
852852
if (remoteFetch.remoteFetchResult().isDone()) {
853853
RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get();
854854
if (remoteLogReadResult.error().isPresent()) {
855-
Throwable error = remoteLogReadResult.error().get();
856855
// If there is any error for the remote fetch topic partition, we populate the error accordingly.
857856
shareFetchPartitionData.add(
858857
new ShareFetchPartitionData(
859858
remoteFetch.topicIdPartition(),
860859
partitionsAcquired.get(remoteFetch.topicIdPartition()),
861-
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
860+
new LogReadResult(Errors.forException(remoteLogReadResult.error().get())).toFetchPartitionData(false)
862861
)
863862
);
864863
} else {

core/src/main/java/kafka/server/share/PendingRemoteFetches.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package kafka.server.share;
1818

1919
import org.apache.kafka.common.TopicIdPartition;
20-
import org.apache.kafka.server.LogReadResult;
2120
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
21+
import org.apache.kafka.storage.internals.log.LogReadResult;
2222
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
2323
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
2424

core/src/main/scala/kafka/server/DelayedFetch.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,15 @@ import java.util.concurrent.TimeUnit
2424
import org.apache.kafka.common.TopicIdPartition
2525
import org.apache.kafka.common.errors._
2626
import org.apache.kafka.common.protocol.Errors
27-
import org.apache.kafka.common.requests.FetchRequest.PartitionData
2827
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
2928
import org.apache.kafka.server.metrics.KafkaMetricsGroup
3029
import org.apache.kafka.server.purgatory.DelayedOperation
3130
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
32-
import org.apache.kafka.storage.internals.log.LogOffsetMetadata
31+
import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, LogOffsetMetadata}
3332

3433
import scala.collection._
3534
import scala.jdk.CollectionConverters._
3635

37-
case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData) {
38-
39-
override def toString: String = {
40-
"[startOffsetMetadata: " + startOffsetMetadata +
41-
", fetchInfo: " + fetchInfo +
42-
"]"
43-
}
44-
}
45-
4636
/**
4737
* A delayed fetch operation that can be created by the replica manager and watched
4838
* in the fetch operation purgatory

core/src/main/scala/kafka/server/DelayedRemoteFetch.scala

Lines changed: 0 additions & 146 deletions
This file was deleted.

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,16 @@ import org.apache.kafka.server.config.ReplicationConfigs
5656
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
5757
import org.apache.kafka.server.metrics.KafkaMetricsGroup
5858
import org.apache.kafka.server.network.BrokerEndPoint
59-
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey}
59+
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey}
6060
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey}
6161
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
6262
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
6363
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
6464
import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
6565
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
66-
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, LogReadResult, common}
66+
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
6767
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
68-
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
68+
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
6969
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
7070

7171
import java.io.File
@@ -188,19 +188,7 @@ object ReplicaManager {
188188
-1L,
189189
-1L,
190190
OptionalLong.empty(),
191-
Optional.of(e))
192-
}
193-
194-
def createLogReadResult(e: Throwable): LogReadResult = {
195-
new LogReadResult(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
196-
Optional.empty(),
197-
UnifiedLog.UNKNOWN_OFFSET,
198-
UnifiedLog.UNKNOWN_OFFSET,
199-
UnifiedLog.UNKNOWN_OFFSET,
200-
UnifiedLog.UNKNOWN_OFFSET,
201-
-1L,
202-
OptionalLong.empty(),
203-
Optional.of(e))
191+
Errors.forException(e));
204192
}
205193

206194
private[server] def isListOffsetsTimestampUnsupported(timestamp: JLong, version: Short): Boolean = {
@@ -1639,7 +1627,7 @@ class ReplicaManager(val config: KafkaConfig,
16391627
private def processRemoteFetches(remoteFetchInfos: util.LinkedHashMap[TopicIdPartition, RemoteStorageFetchInfo],
16401628
params: FetchParams,
16411629
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
1642-
logReadResults: Seq[(TopicIdPartition, LogReadResult)],
1630+
logReadResults: util.LinkedHashMap[TopicIdPartition, LogReadResult],
16431631
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
16441632
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
16451633
val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]
@@ -1651,8 +1639,15 @@ class ReplicaManager(val config: KafkaConfig,
16511639
}
16521640

16531641
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
1654-
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
1655-
fetchPartitionStatus, params, logReadResults, this, responseCallback)
1642+
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks,
1643+
remoteFetchResults,
1644+
remoteFetchInfos,
1645+
remoteFetchMaxWaitMs,
1646+
fetchPartitionStatus.toMap.asJava,
1647+
params,
1648+
logReadResults,
1649+
tp => getPartitionOrException(tp),
1650+
response => responseCallback(response.asScala.toSeq))
16561651

16571652
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
16581653
val delayedFetchKeys = remoteFetchTasks.asScala.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
@@ -1681,7 +1676,7 @@ class ReplicaManager(val config: KafkaConfig,
16811676

16821677
var hasDivergingEpoch = false
16831678
var hasPreferredReadReplica = false
1684-
val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
1679+
val logReadResultMap = new util.LinkedHashMap[TopicIdPartition, LogReadResult]
16851680

16861681
logReadResults.foreach { case (topicIdPartition, logReadResult) =>
16871682
brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()
@@ -1717,14 +1712,15 @@ class ReplicaManager(val config: KafkaConfig,
17171712
// construct the fetch results from the read results
17181713
val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)]
17191714
fetchInfos.foreach { case (topicIdPartition, partitionData) =>
1720-
logReadResultMap.get(topicIdPartition).foreach(logReadResult => {
1715+
val logReadResult = logReadResultMap.get(topicIdPartition)
1716+
if (logReadResult != null) {
17211717
val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
1722-
fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
1723-
})
1718+
fetchPartitionStatus += (topicIdPartition -> new FetchPartitionStatus(logOffsetMetadata, partitionData))
1719+
}
17241720
}
17251721

17261722
if (!remoteFetchInfos.isEmpty) {
1727-
processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResults, fetchPartitionStatus.toSeq)
1723+
processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResultMap, fetchPartitionStatus.toSeq)
17281724
} else {
17291725
// If there is not enough data to respond and there is no remote data, we will let the fetch request
17301726
// wait for new data.
@@ -1812,7 +1808,7 @@ class ReplicaManager(val config: KafkaConfig,
18121808
-1L,
18131809
OptionalLong.of(offsetSnapshot.lastStableOffset.messageOffset),
18141810
if (preferredReadReplica.isDefined) OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(),
1815-
Optional.empty())
1811+
Errors.NONE)
18161812
} else {
18171813
log = partition.localLogWithEpochOrThrow(fetchInfo.currentLeaderEpoch, params.fetchOnlyLeader())
18181814

@@ -1836,7 +1832,7 @@ class ReplicaManager(val config: KafkaConfig,
18361832
fetchTimeMs,
18371833
OptionalLong.of(readInfo.lastStableOffset),
18381834
if (preferredReadReplica.isDefined) OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(),
1839-
Optional.empty()
1835+
Errors.NONE
18401836
)
18411837
}
18421838
} catch {
@@ -1849,7 +1845,7 @@ class ReplicaManager(val config: KafkaConfig,
18491845
_: ReplicaNotAvailableException |
18501846
_: KafkaStorageException |
18511847
_: InconsistentTopicIdException) =>
1852-
createLogReadResult(e)
1848+
new LogReadResult(Errors.forException(e))
18531849
case e: OffsetOutOfRangeException =>
18541850
handleOffsetOutOfRangeError(tp, params, fetchInfo, adjustedMaxBytes, minOneMessage, log, fetchTimeMs, e)
18551851
case e: Throwable =>
@@ -1868,7 +1864,7 @@ class ReplicaManager(val config: KafkaConfig,
18681864
UnifiedLog.UNKNOWN_OFFSET,
18691865
-1L,
18701866
OptionalLong.empty(),
1871-
Optional.of(e)
1867+
Errors.forException(e)
18721868
)
18731869
}
18741870
}
@@ -1949,10 +1945,10 @@ class ReplicaManager(val config: KafkaConfig,
19491945
fetchInfo.logStartOffset,
19501946
fetchTimeMs,
19511947
OptionalLong.of(log.lastStableOffset),
1952-
Optional.empty[Throwable]())
1948+
Errors.NONE)
19531949
}
19541950
} else {
1955-
createLogReadResult(exception)
1951+
new LogReadResult(Errors.forException(exception))
19561952
}
19571953
}
19581954

0 commit comments

Comments
 (0)