diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java index f05f735635746..240d8d2304768 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java @@ -20,22 +20,32 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; + +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.client.api.BatcherBuilder; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** * Different with {@link org.apache.pulsar.client.api.SimpleProducerConsumerTest}, this class can visit the variables - * of {@link ConsumerImpl} which are modified `protected`. + * of {@link ConsumerImpl} or {@link ProducerImpl} which have protected or default access modifiers. */ -@Test(groups = "broker-api") +@Slf4j +@Test(groups = "broker-impl") public class ProducerConsumerInternalTest extends ProducerConsumerBase { @BeforeClass(alwaysRun = true) @@ -144,4 +154,36 @@ public void testExclusiveConsumerWillAlwaysRetryEvenIfReceivedConsumerBusyError( consumer.close(); admin.topics().delete(topicName, false); } + + @DataProvider(name = "containerBuilder") + public Object[][] containerBuilderProvider() { + return new Object[][] { + { BatcherBuilder.DEFAULT }, + { BatcherBuilder.KEY_BASED } + }; + } + + @Test(timeOut = 30000, dataProvider = "containerBuilder") + public void testSendTimerCheckForBatchContainer(BatcherBuilder batcherBuilder) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_"); + @Cleanup Producer producer = pulsarClient.newProducer().topic(topicName) + .batcherBuilder(batcherBuilder) + .sendTimeout(1, TimeUnit.SECONDS) + .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) + .batchingMaxMessages(1000) + .create(); + + log.info("Before sendAsync msg-0: {}", System.nanoTime()); + CompletableFuture future = producer.sendAsync("msg-0".getBytes()); + future.thenAccept(msgId -> log.info("msg-0 done: {} (msgId: {})", System.nanoTime(), msgId)); + future.get(); // t: the current time point + + ((ProducerImpl) producer).triggerSendTimer(); // t+1000ms && t+2000ms: run() will be called again + + Thread.sleep(1950); // t+2050ms: the batch timer is expired, which happens after run() is called + log.info("Before sendAsync msg-1: {}", System.nanoTime()); + future = producer.sendAsync("msg-1".getBytes()); + future.thenAccept(msgId -> log.info("msg-1 done: {} (msgId: {})", System.nanoTime(), msgId)); + future.get(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java index e81365d3886cc..8c17d8fcb253c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java @@ -53,6 +53,7 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta // allocate a new buffer that can hold the entire batch without needing costly reallocations protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE; protected int maxMessagesNum = INITIAL_MESSAGES_NUM; + private volatile long firstAddedTimestamp = 0L; @Override public boolean haveEnoughSpace(MessageImpl msg) { @@ -127,4 +128,19 @@ public boolean hasSameTxn(MessageImpl msg) { return currentTxnidMostBits == msg.getMessageBuilder().getTxnidMostBits() && currentTxnidLeastBits == msg.getMessageBuilder().getTxnidLeastBits(); } + + @Override + public long getFirstAddedTimestamp() { + return firstAddedTimestamp; + } + + protected void tryUpdateTimestamp() { + if (numMessagesInBatch == 1) { + firstAddedTimestamp = System.nanoTime(); + } + } + + protected void clearTimestamp() { + firstAddedTimestamp = 0L; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java index 8fb4e9f2ce543..ddbe1bc255779 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java @@ -82,4 +82,11 @@ public interface BatchMessageContainerBase extends BatchMessageContainer { * @return belong to the same txn or not */ boolean hasSameTxn(MessageImpl msg); + + /** + * Get the timestamp in nanoseconds when the 1st message is added into the batch container. + * + * @return the timestamp in nanoseconds or 0L if the batch container is empty + */ + long getFirstAddedTimestamp(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index dfcbc42bcc6b8..bf8c1f9de8201 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -127,6 +127,7 @@ public boolean add(MessageImpl msg, SendCallback callback) { previousCallback = callback; currentBatchSizeBytes += msg.getDataBuffer().readableBytes(); messages.add(msg); + tryUpdateTimestamp(); if (lowestSequenceId == -1L) { lowestSequenceId = msg.getSequenceId(); @@ -203,6 +204,7 @@ void updateMaxBatchSize(int uncompressedSize) { @Override public void clear() { + clearTimestamp(); messages = new ArrayList<>(maxMessagesNum); firstCallback = null; previousCallback = null; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java index e2ce9e2d0bd70..1592d3cae6cb5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java @@ -57,11 +57,13 @@ public boolean add(MessageImpl msg, SendCallback callback) { numMessagesInBatch++; currentBatchSizeBytes += msg.getDataBuffer().readableBytes(); } + tryUpdateTimestamp(); return isBatchFull(); } @Override public void clear() { + clearTimestamp(); numMessagesInBatch = 0; currentBatchSizeBytes = 0; batches.clear(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 2763da524cd58..4908d10f330b3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1983,6 +1983,11 @@ String getHandlerName() { return producerName; } + @VisibleForTesting + void triggerSendTimer() throws Exception { + run(sendTimeout); + } + /** * Process sendTimeout events. */ @@ -2001,7 +2006,8 @@ public void run(Timeout timeout) throws Exception { } OpSendMsg firstMsg = pendingMessages.peek(); - if (firstMsg == null && (batchMessageContainer == null || batchMessageContainer.isEmpty())) { + if (firstMsg == null && (batchMessageContainer == null || batchMessageContainer.isEmpty() + || batchMessageContainer.getFirstAddedTimestamp() == 0L)) { // If there are no pending messages, reset the timeout to the configured value. timeToWaitMs = conf.getSendTimeoutMs(); } else { @@ -2011,7 +2017,7 @@ public void run(Timeout timeout) throws Exception { } else { // Because we don't flush batch messages while disconnected, we consider them "createdAt" when // they would have otherwise been flushed. - createdAt = lastBatchSendNanoTime + createdAt = batchMessageContainer.getFirstAddedTimestamp() + TimeUnit.MICROSECONDS.toNanos(conf.getBatchingMaxPublishDelayMicros()); } // If there is at least one message, calculate the diff between the message timeout and the elapsed