Skip to content

Commit

Permalink
[fix][test] Fix quiet time implementation in BrokerTestUtil.receiveMe…
Browse files Browse the repository at this point in the history
…ssages (#23876)
  • Loading branch information
lhotari authored Jan 23, 2025
1 parent 87fb442 commit 52e8730
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -239,33 +241,46 @@ public static <T> void receiveMessages(BiFunction<Consumer<T>, Message<T>, Boole
public static <T> void receiveMessages(BiFunction<Consumer<T>, Message<T>, Boolean> messageHandler,
Duration quietTimeout,
Stream<Consumer<T>> consumers) {
long quietTimeoutNanos = quietTimeout.toNanos();
AtomicLong lastMessageReceivedNanos = new AtomicLong(System.nanoTime());
FutureUtil.waitForAll(consumers
.map(consumer -> receiveMessagesAsync(consumer, quietTimeout, messageHandler)).toList()).join();
.map(consumer -> receiveMessagesAsync(consumer, quietTimeoutNanos, quietTimeoutNanos, messageHandler,
lastMessageReceivedNanos)).toList()).join();
}

// asynchronously receive messages from a consumer and handle them using the provided message handler
// the benefit is that multiple consumers can be concurrently consumed without the need to have multiple threads
// this is useful in tests where multiple consumers are needed to test the functionality
private static <T> CompletableFuture<Void> receiveMessagesAsync(Consumer<T> consumer, Duration quietTimeout,
BiFunction<Consumer<T>, Message<T>, Boolean>
messageHandler) {
CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
return receiveFuture
.orTimeout(quietTimeout.toMillis(), TimeUnit.MILLISECONDS)
private static <T> CompletableFuture<Void> receiveMessagesAsync(Consumer<T> consumer,
long quietTimeoutNanos,
long receiveTimeoutNanos,
BiFunction<Consumer<T>, Message<T>, Boolean>
messageHandler,
AtomicLong lastMessageReceivedNanos) {
return consumer.receiveAsync()
.orTimeout(receiveTimeoutNanos, TimeUnit.NANOSECONDS)
.handle((msg, t) -> {
long currentNanos = System.nanoTime();
if (t != null) {
if (t instanceof TimeoutException) {
// cancel the receive future so that Pulsar client can clean up the resources
receiveFuture.cancel(false);
return false;
long sinceLastMessageReceivedNanos = currentNanos - lastMessageReceivedNanos.get();
if (sinceLastMessageReceivedNanos > quietTimeoutNanos) {
return Pair.of(false, 0L);
} else {
return Pair.of(true, quietTimeoutNanos - sinceLastMessageReceivedNanos);
}
} else {
throw FutureUtil.wrapToCompletionException(t);
}
}
return messageHandler.apply(consumer, msg);
}).thenComposeAsync(receiveMore -> {
lastMessageReceivedNanos.set(currentNanos);
return Pair.of(messageHandler.apply(consumer, msg), quietTimeoutNanos);
}).thenComposeAsync(receiveMoreAndNextTimeout -> {
boolean receiveMore = receiveMoreAndNextTimeout.getLeft();
if (receiveMore) {
return receiveMessagesAsync(consumer, quietTimeout, messageHandler);
Long nextReceiveTimeoutNanos = receiveMoreAndNextTimeout.getRight();
return receiveMessagesAsync(consumer, quietTimeoutNanos, nextReceiveTimeoutNanos,
messageHandler, lastMessageReceivedNanos);
} else {
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.Test;

@Slf4j
public class BrokerTestUtilTest {
@Test
public void testReceiveMessagesQuietTime() throws Exception {
// Mock consumers
Consumer<Integer> consumer1 = mock(Consumer.class);
Consumer<Integer> consumer2 = mock(Consumer.class);

long consumer1DelayMs = 300L;
long consumer2DelayMs = 400L;
long quietTimeMs = 500L;

// Define behavior for receiveAsync with delay
AtomicBoolean consumer1FutureContinueSupplying = new AtomicBoolean(true);
when(consumer1.receiveAsync()).thenAnswer(invocation -> {
if (consumer1FutureContinueSupplying.get()) {
CompletableFuture<Message> messageCompletableFuture =
CompletableFuture.supplyAsync(() -> mock(Message.class),
CompletableFuture.delayedExecutor(consumer1DelayMs, TimeUnit.MILLISECONDS));
consumer1FutureContinueSupplying.set(false);
// continue supplying while the future is cancelled or timed out
FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, () -> {
consumer1FutureContinueSupplying.set(true);
});
return messageCompletableFuture;
} else {
return new CompletableFuture<>();
}
});
AtomicBoolean consumer2FutureContinueSupplying = new AtomicBoolean(true);
when(consumer2.receiveAsync()).thenAnswer(invocation -> {
if (consumer2FutureContinueSupplying.get()) {
CompletableFuture<Message> messageCompletableFuture =
CompletableFuture.supplyAsync(() -> mock(Message.class),
CompletableFuture.delayedExecutor(consumer2DelayMs, TimeUnit.MILLISECONDS));
consumer2FutureContinueSupplying.set(false);
// continue supplying while the future is cancelled or timed out
FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, () -> {
consumer2FutureContinueSupplying.set(true);
});
return messageCompletableFuture;
} else {
return new CompletableFuture<>();
}
});

// Atomic variables to track message handling
AtomicInteger messageCount = new AtomicInteger(0);

// Message handler
BiFunction<Consumer<Integer>, Message<Integer>, Boolean> messageHandler = (consumer, msg) -> {
messageCount.incrementAndGet();
return true;
};

// Track start time
long startTime = System.nanoTime();

// Call receiveMessages method
BrokerTestUtil.receiveMessages(messageHandler, Duration.ofMillis(quietTimeMs), consumer1, consumer2);

// Track end time
long endTime = System.nanoTime();

// Verify that messages were attempted to be received
verify(consumer1, times(3)).receiveAsync();
verify(consumer2, times(2)).receiveAsync();

// Verify that the message handler was called
assertEquals(messageCount.get(), 2);

// Verify the time spent is as expected (within a reasonable margin)
long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
assertThat(durationMillis).isBetween(consumer2DelayMs + quietTimeMs,
consumer2DelayMs + quietTimeMs + (quietTimeMs / 2));
}
}

0 comments on commit 52e8730

Please sign in to comment.