Skip to content

Commit

Permalink
Increase timeout for asyncapi test (#3205)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado authored Sep 5, 2023
1 parent aef69c7 commit 3a67eb1
Showing 1 changed file with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.kie.kogito.event.CloudEventMarshaller;
import org.kie.kogito.event.Converter;
Expand Down Expand Up @@ -64,6 +65,7 @@ class AsyncAPIIT extends AbstractCallbackStateIT {

private final static Logger logger = LoggerFactory.getLogger(AsyncAPIIT.class);

@Override
@BeforeEach
void setup() {
kafkaClient = new KafkaTypedTestClient<>(kafkaBootstrapServers, ByteArraySerializer.class, ByteArrayDeserializer.class);
Expand All @@ -74,6 +76,7 @@ void setup() {
marshaller = new ByteArrayCloudEventMarshaller(objectMapper);
}

@Override
@AfterEach
void cleanUp() {
if (kafkaClient != null) {
Expand All @@ -82,23 +85,14 @@ void cleanUp() {
}

@Test
void testConsumer() throws IOException {
final String flowId = "asyncEventConsumer";
String id = startProcess(flowId);
kafkaClient.produce(marshaller.marshall(buildCloudEvent(id, "wait", marshaller)), "wait");
waitForFinish(flowId, id, Duration.ofSeconds(10));
}

@Test
@Order(1)
void testPublisher() throws InterruptedException {
String id = startProcessNoCheck("asyncEventPublisher");
logger.debug("Process instance id is " + id);
Converter<byte[], CloudEvent> converter = new ByteArrayCloudEventUnmarshallerFactory(objectMapper).unmarshaller(Map.class).cloudEvent();
final CountDownLatch countDownLatch = new CountDownLatch(1);
kafkaClient.consume("wait", v -> {
try {
CloudEvent event = converter.convert(v);
logger.debug("Cloud event is {}", event);
if (id.equals(event.getExtension(CloudEventExtensionConstants.PROCESS_INSTANCE_ID))) {
countDownLatch.countDown();
}
Expand All @@ -109,4 +103,13 @@ void testPublisher() throws InterruptedException {
countDownLatch.await(10, TimeUnit.SECONDS);
assertThat(countDownLatch.getCount()).isZero();
}

@Test
@Order(2)
void testConsumer() throws IOException {
final String flowId = "asyncEventConsumer";
String id = startProcess(flowId);
kafkaClient.produce(marshaller.marshall(buildCloudEvent(id, "wait", marshaller)), "wait");
waitForFinish(flowId, id, Duration.ofSeconds(10));
}
}

0 comments on commit 3a67eb1

Please sign in to comment.