-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker]Fix repeatedly acquire pending reads quota #23869
base: master
Are you sure you want to change the base?
[fix][broker]Fix repeatedly acquire pending reads quota #23869
Conversation
LedgerHandle currentLedger = ml.currentLedger; | ||
LedgerHandle spyCurrentLedger = Mockito.spy(currentLedger); | ||
ml.currentLedger = spyCurrentLedger; | ||
Answer answer = invocation -> { | ||
long firstEntry = (long) invocation.getArguments()[0]; | ||
log.info("reading entry: {}", firstEntry); | ||
if (firstEntry == start1) { | ||
// Wait 3s to make | ||
firstReadingStarted.countDown(); | ||
readCompleteSignal1.await(); | ||
Object res = invocation.callRealMethod(); | ||
return res; | ||
} else if(secondReadEntries.contains(firstEntry)) { | ||
final CompletableFuture res = new CompletableFuture<>(); | ||
threadFactory.newThread(() -> { | ||
try { | ||
readCompleteSignal2.await(); | ||
CompletableFuture<LedgerEntries> future = | ||
(CompletableFuture<LedgerEntries>) invocation.callRealMethod(); | ||
future.thenAccept(v -> { | ||
res.complete(v); | ||
}).exceptionally(ex -> { | ||
res.completeExceptionally(ex); | ||
return null; | ||
}); | ||
} catch (Throwable ex) { | ||
res.completeExceptionally(ex); | ||
} | ||
}).start(); | ||
return res; | ||
} else { | ||
return invocation.callRealMethod(); | ||
} | ||
}; | ||
doAnswer(answer).when(spyCurrentLedger).readAsync(anyLong(), anyLong()); | ||
doAnswer(answer).when(spyCurrentLedger).readUnconfirmedAsync(anyLong(), anyLong()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the PulsarMockBookKeeper read handle interceptor (added in #23875) could be used here to avoid the use of Mockito?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but the current mock is compatible with both the mocked Bookie client and the real Bookie client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, but this test uses PulsarMockBookKeeper
. The problem with Mockito is that it requires injecting changes deeply into the implementation. Incorrect use of Mockito could result in thread safety issues and other issues, causing flaky tests.
btw. When there are a large number of operations, Mockito Spies that record invocations should only be used when the invocations are validated. We have a helper for spies that don't record invocations (that could cause OOME issues in tests):
pulsar/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
Lines 99 to 112 in 52e8730
/** | |
* Create a Mockito spy that is stub-only which does not record method invocations, | |
* thus saving memory but disallowing verification of invocations. | |
* | |
* @param object to spy on | |
* @return a spy of the real object | |
* @param <T> type of object | |
*/ | |
public static <T> T spyWithoutRecordingInvocations(T object) { | |
return Mockito.mock((Class<T>) object.getClass(), Mockito.withSettings() | |
.spiedInstance(object) | |
.defaultAnswer(Mockito.CALLS_REAL_METHODS) | |
.stubOnly()); | |
} |
I agree that this issue exists, however there are broader issues in the Sharing some context about the issues I have found and what I have currently in progress: I have reported issues #23482, #23504, #23505 and #23506 . I have already changes to address these issues in an experimental branch, pending the submission of individual PRs. Addressing the lack of caching for replay queue messages requires broader changes to the the broker cache and those will be covered with a new PIP. Some earlier details shared in this comment: #23524 (comment) . Due to issues #23482 and #23506, I don't think that this PR would resolve problems alone. Regarding broker OOMEs, issue #23504 would also need to be resolved. It's possible that |
Agree with you, let us fix them one by one, which is easier to review, and we should add tests for each case. |
I think we should use AsyncTokenBucket or Guava.RateLimiter.tryAcquire for this rate limiter. IMHO, the current logic, requiring acquiredPermit to release is error-prone, when the caller forgets to release it. Instead, I think we better use token bucket-based one, which can automatically fill the bucket. |
@heesung-sn I agree, the current solution is problematic. However, a token bucket isn't most optimal for this use case. I have a redesigned solution the |
Motivation
Background
managedLedgerMaxReadsInFlightSize
50~59
50~69
Request-2
will wait forRequest-1
, and only send a real request that reads60~69
afterRequest-1
is finished.Issue
Request-2
above requests repeatedly50~70
when creating.61~70
afterRequest-1
is finished.Request-2
acquire20
quota.Request-2
acquire40
quota.testPreciseLimitation
to reproduce the issueModifications
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x