Skip to content

Conversation

@DL1231
Copy link
Contributor

@DL1231 DL1231 commented Jun 2, 2025

Move DelayedRemoteFetch to the storage module and rewrite it to java.

Reviewers: Mickael Maison [email protected], Kamal
Chandraprakash [email protected], Chia-Ping Tsai
[email protected]

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Jun 2, 2025
@DL1231 DL1231 marked this pull request as ready for review June 4, 2025 03:36
@DL1231 DL1231 marked this pull request as draft June 4, 2025 03:39
@DL1231 DL1231 marked this pull request as ready for review June 4, 2025 07:08
@DL1231
Copy link
Contributor Author

DL1231 commented Jun 5, 2025

@frankvicky @apoorvmittal10 PTAL when you get a chance, thanks in advance.

@github-actions
Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

# Conflicts:
#	core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@DL1231
Copy link
Contributor Author

DL1231 commented Jun 13, 2025

I compared the performance of this patch with the current version in catch-up read scenarios with tiered storage enabled.
Use command:

sh kafka-consumer-perf-test.sh --topic topic-test --group group-test --broker-list 127.0.0.1:9092 --messages 140000 --timeout 9223372036854775807 --show-detailed-stats --reporting-interval 2000

This patch(with trunk code)

time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2025-06-13 14:52:08:086, 0, 95.4208, 12.3076, 1516, 195.5372, 1749797520793, -1749797513040, 0.0000, 0.0000
2025-06-13 14:52:10:087, 0, 468.3552, 186.3740, 7441, 2961.0195, 0, 2001, 186.3740, 2961.0195
2025-06-13 14:52:12:099, 0, 861.4311, 195.3658, 13686, 3103.8767, 0, 2012, 195.3658, 3103.8767
2025-06-13 14:52:14:133, 0, 1190.6834, 161.8743, 18917, 2571.7797, 0, 2034, 161.8743, 2571.7797
2025-06-13 14:52:16:133, 0, 1556.6940, 183.0053, 24732, 2907.5000, 0, 2000, 183.0053, 2907.5000
2025-06-13 14:52:18:161, 0, 1917.4805, 177.9026, 30464, 2826.4300, 0, 2028, 177.9026, 2826.4300
2025-06-13 14:52:20:266, 0, 2283.4282, 173.8469, 36278, 2761.9952, 0, 2105, 173.8469, 2761.9952
2025-06-13 14:52:22:269, 0, 2663.4750, 189.7388, 42316, 3014.4783, 0, 2003, 189.7388, 3014.4783
2025-06-13 14:52:24:409, 0, 3023.0656, 168.0330, 48029, 2669.6262, 0, 2140, 168.0330, 2669.6262
2025-06-13 14:52:26:412, 0, 3397.3217, 186.8478, 53975, 2968.5472, 0, 2003, 186.8478, 2968.5472
2025-06-13 14:52:28:441, 0, 3739.0995, 168.4464, 59405, 2676.1952, 0, 2029, 168.4464, 2676.1952
2025-06-13 14:52:30:442, 0, 4113.2927, 187.0031, 65350, 2971.0145, 0, 2001, 187.0031, 2971.0145
2025-06-13 14:52:32:450, 0, 4476.2192, 180.7403, 71116, 2871.5139, 0, 2008, 180.7403, 2871.5139
2025-06-13 14:52:34:519, 0, 4844.3069, 177.9061, 76964, 2826.4862, 0, 2069, 177.9061, 2826.4862
2025-06-13 14:52:36:531, 0, 5189.5466, 171.5903, 82449, 2726.1431, 0, 2012, 171.5903, 2726.1431
2025-06-13 14:52:38:551, 0, 5516.0923, 161.6563, 87637, 2568.3168, 0, 2020, 161.6563, 2568.3168
2025-06-13 14:52:40:552, 0, 5913.3224, 198.5158, 93948, 3153.9230, 0, 2001, 198.5158, 3153.9230
2025-06-13 14:52:42:619, 0, 6263.9122, 169.6128, 99518, 2694.7267, 0, 2067, 169.6128, 2694.7267
2025-06-13 14:52:44:621, 0, 6633.5735, 184.6460, 105391, 2933.5664, 0, 2002, 184.6460, 2933.5664
2025-06-13 14:52:46:701, 0, 6970.8195, 162.1375, 110749, 2575.9615, 0, 2080, 162.1375, 2575.9615
2025-06-13 14:52:48:835, 0, 7368.8049, 186.4974, 117072, 2962.9803, 0, 2134, 186.4974, 2962.9803
2025-06-13 14:52:50:835, 0, 7749.4812, 190.3381, 123120, 3024.0000, 0, 2000, 190.3381, 3024.0000
2025-06-13 14:52:52:891, 0, 8092.7067, 166.9385, 128573, 2652.2374, 0, 2056, 166.9385, 2652.2374
2025-06-13 14:52:54:922, 0, 8470.2358, 185.8834, 134571, 2953.2250, 0, 2031, 185.8834, 2953.2250

Current 4.0.0 release

time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2025-06-13 15:23:12:911, 0, 277.3247, 138.6623, 4406, 2203.0000, 1749799391344, -1749799389344, 0.0000, 0.0000
2025-06-13 15:23:14:913, 0, 717.2928, 219.7643, 11396, 3491.5085, 0, 2002, 219.7643, 3491.5085
2025-06-13 15:23:17:053, 0, 1065.6796, 162.7976, 16931, 2586.4486, 0, 2140, 162.7976, 2586.4486
2025-06-13 15:23:19:056, 0, 1424.7036, 179.2432, 22635, 2847.7284, 0, 2003, 179.2432, 2847.7284
2025-06-13 15:23:21:225, 0, 1803.9322, 174.8403, 28660, 2777.7778, 0, 2169, 174.8403, 2777.7778
2025-06-13 15:23:23:227, 0, 2174.6635, 185.1805, 34550, 2942.0579, 0, 2002, 185.1805, 2942.0579
2025-06-13 15:23:25:344, 0, 2537.4641, 171.3749, 40314, 2722.7208, 0, 2117, 171.3749, 2722.7208
2025-06-13 15:23:27:368, 0, 2919.2104, 188.6098, 46379, 2996.5415, 0, 2024, 188.6098, 2996.5415
2025-06-13 15:23:29:535, 0, 3282.3887, 167.5949, 52149, 2662.6673, 0, 2167, 167.5949, 2662.6673
2025-06-13 15:23:31:535, 0, 3650.3506, 183.9809, 57995, 2923.0000, 0, 2000, 183.9809, 2923.0000
2025-06-13 15:23:33:537, 0, 4011.2000, 180.2444, 63728, 2863.6364, 0, 2002, 180.2444, 2863.6364
2025-06-13 15:23:35:538, 0, 4380.9872, 184.8012, 69603, 2936.0320, 0, 2001, 184.8012, 2936.0320
2025-06-13 15:23:37:542, 0, 4736.3605, 177.3320, 75249, 2817.3653, 0, 2004, 177.3320, 2817.3653
2025-06-13 15:23:39:542, 0, 5079.7119, 171.6757, 80704, 2727.5000, 0, 2000, 171.6757, 2727.5000
2025-06-13 15:23:41:627, 0, 5449.3732, 177.2956, 86577, 2816.7866, 0, 2085, 177.2956, 2816.7866
2025-06-13 15:23:43:627, 0, 5795.6200, 173.1234, 92078, 2750.5000, 0, 2000, 173.1234, 2750.5000
2025-06-13 15:23:45:677, 0, 6168.5543, 181.9192, 98003, 2890.2439, 0, 2050, 181.9192, 2890.2439
2025-06-13 15:23:47:797, 0, 6538.6562, 174.5764, 103883, 2773.5849, 0, 2120, 174.5764, 2773.5849
2025-06-13 15:23:49:869, 0, 6888.3018, 168.7479, 109438, 2680.9846, 0, 2072, 168.7479, 2680.9846
2025-06-13 15:23:51:992, 0, 7226.8696, 159.4761, 114817, 2533.6788, 0, 2123, 159.4761, 2533.6788
2025-06-13 15:23:54:065, 0, 7656.4522, 207.2275, 121642, 3292.3300, 0, 2073, 207.2275, 3292.3300
2025-06-13 15:23:56:144, 0, 8008.1749, 169.1788, 127230, 2687.8307, 0, 2079, 169.1788, 2687.8307
2025-06-13 15:23:58:146, 0, 8352.2186, 171.8500, 132696, 2730.2697, 0, 2002, 171.8500, 2730.2697
2025-06-13 15:24:00:170, 0, 8660.6369, 152.3806, 137596, 2420.9486, 0, 2024, 152.3806, 2420.9486

Summary:

  • The overall throughput has increased slightly(The first row of data is excluded), this patch 186.9MB/s, current release 185.8MB/s.

@apoorvmittal10
Copy link
Contributor

@frankvicky @apoorvmittal10 PTAL when you get a chance, thanks in advance.

Thanks for the PR, I am a bot occupied with queues as of now. It's on my list to review, will take some time.

@apoorvmittal10 apoorvmittal10 removed the triage PRs from the community label Jun 13, 2025
DL1231 added 3 commits July 22, 2025 09:13
# Conflicts:
#	core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
#	core/src/main/scala/kafka/server/ReplicaManager.scala
#	core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@DL1231
Copy link
Contributor Author

DL1231 commented Jul 22, 2025

@showuon @kamalcph, PTAL when you get a chance, thanks.

@chia7712
Copy link
Member

chia7712 commented Aug 4, 2025

> Compilation failed; see the compiler output below.
  /home/chia7712/project/kafka/server/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java:165: error: error has private access in RemoteLogReadResult
                      if (remoteFetchResult.get().error.isPresent()) {
                                                 ^
  /home/chia7712/project/kafka/server/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java:167: error: error has private access in RemoteLogReadResult
                              new LogReadResult(remoteFetchResult.get().error.get()).toFetchPartitionData(false));
                                                                       ^
  /home/chia7712/project/kafka/server/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java:169: error: fetchDataInfo has private access in RemoteLogReadResult
                          FetchDataInfo info = remoteFetchResult.get().fetchDataInfo.get();
                                                                      ^
  3 errors

@DL1231 please fix the build error

@DL1231 DL1231 requested a review from chia7712 August 15, 2025 02:54
Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I made a first quick pass and left a few minor suggestions.

@DL1231 DL1231 changed the title KAFKA-19340: Move DelayedRemoteFetch to the server module KAFKA-19340: Move DelayedRemoteFetch to the storage module Sep 30, 2025
@chia7712
Copy link
Member

@DL1231 would you mind fixing the conflicts? I will take a look asap

# Conflicts:
#	core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@kamalcph kamalcph self-requested a review October 14, 2025 05:11
# Conflicts:
#	core/src/main/scala/kafka/server/ReplicaManager.scala
#	core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. I left a few comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like some of the constructors are not used anymore, so they can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DL1231 thanks for this patch. it is great overall, and I have just left a few comments

@kamalcph
Copy link
Contributor

I'll review this PR by tomorrow.

Copy link
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, left few comments to address. PTAL.

*/
@Override
public void onComplete() {
Map<TopicIdPartition, FetchPartitionData> fetchPartitionData = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we retain the response order similar to request order?

# Conflicts:
#	core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
remoteFetchMaxWaitMs,
fetchPartitionStatus.toMap.asJava,
params,
logReadResults.toMap.asJava,
Copy link
Contributor

@kamalcph kamalcph Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also maintain the order of elements in logReadResults?

Copy link
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the PR!

@kamalcph
Copy link
Contributor

@mimaison @chia7712

Call for review. Do you want to take another look on this PR?

Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM and I left a follow-up comment

remoteFetchResults,
remoteFetchInfos,
remoteFetchMaxWaitMs,
fetchPartitionStatus.toMap.asJava,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could initialize fetchPartitionStatus as a Map type to reduce unnecessary collection conversions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #20769 to address this comment. PTAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't saw #20768 was also opened. We can merge either one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the FIFO policy and review #20768 together

@chia7712 chia7712 merged commit 76a1c83 into apache:trunk Oct 24, 2025
27 checks passed
remoteFetch.topicIdPartition(),
partitionsAcquired.get(remoteFetch.topicIdPartition()),
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
new LogReadResult(Errors.forException(remoteLogReadResult.error().get())).toFetchPartitionData(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we check that we didn't lose a useful error message by doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I checked the usage of LogReadResult and only found calls to LogReadResult.error(), so in the previous implementation, no code would utilize the error message from the Exception.
If anything is inappropriate, please feel free to correct me.

joshua2519 pushed a commit to joshua2519/kafka that referenced this pull request Oct 27, 2025
Move DelayedRemoteFetch to the storage module and rewrite it to java.

Reviewers: Mickael Maison <[email protected]>, Kamal
 Chandraprakash <[email protected]>, Chia-Ping Tsai
 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker KIP-932 Queues for Kafka storage Pull requests that target the storage module

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants