Skip to content

Commit

Permalink
[fix][client] Fix messages in the batch container timed out unexpecte…
Browse files Browse the repository at this point in the history
…dly (apache#21889)
  • Loading branch information
BewareMyPower authored Jan 16, 2024
1 parent 4ebbd2f commit b56e40c
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,32 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* Different with {@link org.apache.pulsar.client.api.SimpleProducerConsumerTest}, this class can visit the variables
* of {@link ConsumerImpl} which are modified `protected`.
* of {@link ConsumerImpl} or {@link ProducerImpl} which have protected or default access modifiers.
*/
@Test(groups = "broker-api")
@Slf4j
@Test(groups = "broker-impl")
public class ProducerConsumerInternalTest extends ProducerConsumerBase {

@BeforeClass(alwaysRun = true)
Expand Down Expand Up @@ -144,4 +154,36 @@ public void testExclusiveConsumerWillAlwaysRetryEvenIfReceivedConsumerBusyError(
consumer.close();
admin.topics().delete(topicName, false);
}

@DataProvider(name = "containerBuilder")
public Object[][] containerBuilderProvider() {
return new Object[][] {
{ BatcherBuilder.DEFAULT },
{ BatcherBuilder.KEY_BASED }
};
}

@Test(timeOut = 30000, dataProvider = "containerBuilder")
public void testSendTimerCheckForBatchContainer(BatcherBuilder batcherBuilder) throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
@Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batcherBuilder(batcherBuilder)
.sendTimeout(1, TimeUnit.SECONDS)
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
.batchingMaxMessages(1000)
.create();

log.info("Before sendAsync msg-0: {}", System.nanoTime());
CompletableFuture<MessageId> future = producer.sendAsync("msg-0".getBytes());
future.thenAccept(msgId -> log.info("msg-0 done: {} (msgId: {})", System.nanoTime(), msgId));
future.get(); // t: the current time point

((ProducerImpl<byte[]>) producer).triggerSendTimer(); // t+1000ms && t+2000ms: run() will be called again

Thread.sleep(1950); // t+2050ms: the batch timer is expired, which happens after run() is called
log.info("Before sendAsync msg-1: {}", System.nanoTime());
future = producer.sendAsync("msg-1".getBytes());
future.thenAccept(msgId -> log.info("msg-1 done: {} (msgId: {})", System.nanoTime(), msgId));
future.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
// allocate a new buffer that can hold the entire batch without needing costly reallocations
protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;
protected int maxMessagesNum = INITIAL_MESSAGES_NUM;
private volatile long firstAddedTimestamp = 0L;

@Override
public boolean haveEnoughSpace(MessageImpl<?> msg) {
Expand Down Expand Up @@ -127,4 +128,19 @@ public boolean hasSameTxn(MessageImpl<?> msg) {
return currentTxnidMostBits == msg.getMessageBuilder().getTxnidMostBits()
&& currentTxnidLeastBits == msg.getMessageBuilder().getTxnidLeastBits();
}

@Override
public long getFirstAddedTimestamp() {
return firstAddedTimestamp;
}

protected void tryUpdateTimestamp() {
if (numMessagesInBatch == 1) {
firstAddedTimestamp = System.nanoTime();
}
}

protected void clearTimestamp() {
firstAddedTimestamp = 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,11 @@ public interface BatchMessageContainerBase extends BatchMessageContainer {
* @return belong to the same txn or not
*/
boolean hasSameTxn(MessageImpl<?> msg);

/**
* Get the timestamp in nanoseconds when the 1st message is added into the batch container.
*
* @return the timestamp in nanoseconds or 0L if the batch container is empty
*/
long getFirstAddedTimestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
previousCallback = callback;
currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
messages.add(msg);
tryUpdateTimestamp();

if (lowestSequenceId == -1L) {
lowestSequenceId = msg.getSequenceId();
Expand Down Expand Up @@ -203,6 +204,7 @@ void updateMaxBatchSize(int uncompressedSize) {

@Override
public void clear() {
clearTimestamp();
messages = new ArrayList<>(maxMessagesNum);
firstCallback = null;
previousCallback = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
numMessagesInBatch++;
currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
}
tryUpdateTimestamp();
return isBatchFull();
}

@Override
public void clear() {
clearTimestamp();
numMessagesInBatch = 0;
currentBatchSizeBytes = 0;
batches.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1983,6 +1983,11 @@ String getHandlerName() {
return producerName;
}

@VisibleForTesting
void triggerSendTimer() throws Exception {
run(sendTimeout);
}

/**
* Process sendTimeout events.
*/
Expand All @@ -2001,7 +2006,8 @@ public void run(Timeout timeout) throws Exception {
}

OpSendMsg firstMsg = pendingMessages.peek();
if (firstMsg == null && (batchMessageContainer == null || batchMessageContainer.isEmpty())) {
if (firstMsg == null && (batchMessageContainer == null || batchMessageContainer.isEmpty()
|| batchMessageContainer.getFirstAddedTimestamp() == 0L)) {
// If there are no pending messages, reset the timeout to the configured value.
timeToWaitMs = conf.getSendTimeoutMs();
} else {
Expand All @@ -2011,7 +2017,7 @@ public void run(Timeout timeout) throws Exception {
} else {
// Because we don't flush batch messages while disconnected, we consider them "createdAt" when
// they would have otherwise been flushed.
createdAt = lastBatchSendNanoTime
createdAt = batchMessageContainer.getFirstAddedTimestamp()
+ TimeUnit.MICROSECONDS.toNanos(conf.getBatchingMaxPublishDelayMicros());
}
// If there is at least one message, calculate the diff between the message timeout and the elapsed
Expand Down

0 comments on commit b56e40c

Please sign in to comment.