-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: Pulsar pool messages support in failure handlers #2584
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a change to run ack/nack tests with poolMessages to true.
@@ -42,6 +42,7 @@ public PulsarIgnore(String channel) { | |||
public Uni<Void> handle(PulsarIncomingMessage<?> message, Throwable reason, Metadata metadata) { | |||
log.messageFailureIgnored(channel, reason.getMessage()); | |||
log.messageFailureFullCause(reason); | |||
message.unwrap().release(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would not work as the ack needs to access messageid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ozangunalp
the release method:
public void release() {
if (this.poolMessage) {
ReferenceCountUtil.safeRelease(this.payload);
this.recycle();
}
}
works only on payload, not the messageId. MessageId is not a ByteBuf, but a standard Java class (not a direct memory).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok your are right, the recycle method sets the messageId to null. I'll fix that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@michalcukierman I just pushed a change to the branch with some tests. The final change only calls the release on continue and failstop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @ozangunalp and sorry for incomplete PR, I believe it’s OK now. We’ll probably enable pooling, so if anything comes out, we will let you know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ozangunalp It looks like the message release introduced by @michalcukierman is required to avoid a memory leak. While this change resolves the direct memory issue, it breaks DLQ functionality due to a bug in Pulsar. I have created an issue to track this problem.
Added messages release in failure handler, What do you think @ozangunalp @marekczajkowski?