Skip to content

Conversation

@brandboat
Copy link
Member

@brandboat brandboat commented Oct 24, 2025

This patch wraps ShareFetchUtils static method processFetchResponse
with MockedStatic to improve test isolation and also fixes some
incorrect test results.

@github-actions github-actions bot added core Kafka Broker KIP-932 Queues for Kafka labels Oct 24, 2025
@brandboat brandboat changed the title KAFKA-19345: Use ShareFetchUtils mock for DelayedShareFetchTest tests (WIP) KAFKA-19345: Use ShareFetchUtils mock for DelayedShareFetchTest tests Oct 24, 2025
@brandboat brandboat changed the title (WIP) KAFKA-19345: Use ShareFetchUtils mock for DelayedShareFetchTest tests KAFKA-19345: Use ShareFetchUtils mock for DelayedShareFetchTest tests Oct 24, 2025
assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId));

delayedShareFetch.lock().unlock();
Mockito.verify(exceptionHandler, never()).accept(any(), any());
Copy link
Member Author

@brandboat brandboat Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exceptionHandler shouldn't be triggered in this test case.
The reason it throw error simply due to incorrect test mock up,

when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
the above code snippet override the return value of replicaManager.getPartitionOrException(tp0.topicPartition()) defined in
mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
, which make isMinBytesSatisfied raise NPE , and this is unexpected.

assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
Mockito.verify(exceptionHandler, never()).accept(any(), any());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@brandboat
Copy link
Member Author

UserQuotaTest#testThrottledRequest(String) with groupProtocol=classic is a known flaky test (KAFKA-8459). It’s unrelated to this PR and passes locally.

@brandboat
Copy link
Member Author

Gentle ping , @AndrewJSchofield, @apoorvmittal10, @adixitconfluent — sorry for the late implementation 🙏
I’ve also fixed a few incorrect tests in this patch. Would appreciate your review when you have a chance. Thanks!

@apoorvmittal10
Copy link
Contributor

@adixitconfluent Can you please review the PR.

Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @brandboat . I've left some comments.


// We are testing the case when the share partition is getting fetched for the first time, so for the first time
// the fetchOffsetMetadata will return empty. Post the readFromLog call, the fetchOffsetMetadata will be
// the fetchOffsetMetadata will return empty. Post the isMinBytesSatisfied call, the fetchOffsetMetadata will be
Copy link
Contributor

@adixitconfluent adixitconfluent Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment that makes more sense is Post the first readFromLog call...

Comment on lines -227 to -230
doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler();

PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Set.of(tp0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why this piece of code has been moved below - basically a cut and paste?

Comment on lines -2152 to -2159
private void mockTopicIdPartitionFetchBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, LogOffsetMetadata hwmOffsetMetadata) {
LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class),
hwmOffsetMetadata, mock(LogOffsetMetadata.class));
Partition partition = mock(Partition.class);
when(partition.fetchOffsetSnapshot(any(), anyBoolean())).thenReturn(endOffsetSnapshot);
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removal of this function doesn't make sense to me. I agree that the line when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition); is problematic. We can pass the mock Partition object into this function. Example -

caller function -

Partition p0 = mock(Partition.class);
mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata, p0);
when(p0.isLeader()).thenReturn(true);
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);

function mockTopicIdPartitionFetchBytes -

private void mockTopicIdPartitionFetchBytes(LogOffsetMetadata hwmOffsetMetadata, Partition partition) {
        LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class),
            hwmOffsetMetadata, mock(LogOffsetMetadata.class));
        when(partition.fetchOffsetSnapshot(any(), anyBoolean())).thenReturn(endOffsetSnapshot);
}


assertFalse(delayedShareFetch.isCompleted());
try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need mocking of processFetchResponse since it is not being called in this test

when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);

BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For safety, should we be mocking ShareFetchUtils.processFetchResponse for the test testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants