diff --git a/testsuite/integration/microprofile/pom.xml b/testsuite/integration/microprofile/pom.xml index 48a8030d9f71..f85c486ab01b 100644 --- a/testsuite/integration/microprofile/pom.xml +++ b/testsuite/integration/microprofile/pom.xml @@ -200,6 +200,14 @@ test + + + io.smallrye.reactive + smallrye-reactive-messaging-in-memory + ${version.io.smallrye.smallrye-reactive-messaging} + test + + io.smallrye.reactive diff --git a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/InDepthMetadataBean.java b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/InDepthMetadataBean.java deleted file mode 100644 index 30c27eb2f869..000000000000 --- a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/InDepthMetadataBean.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * JBoss, Home of Professional Open Source. - * Copyright 2021, Red Hat, Inc., and individual contributors - * as indicated by the @author tags. See the copyright.txt file in the - * distribution for a full listing of individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.wildfly.test.integration.microprofile.reactive.messaging.inmemory; - -import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; -import io.smallrye.reactive.messaging.kafka.api.KafkaMetadataUtil; -import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; -import jakarta.enterprise.context.ApplicationScoped; -import org.apache.kafka.common.header.internals.RecordHeader; -import org.eclipse.microprofile.reactive.messaging.Incoming; -import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; -import org.reactivestreams.Publisher; - -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.CountDownLatch; - -/** - * @author Kabir Khan - */ -@ApplicationScoped -public class InDepthMetadataBean { - private final CountDownLatch latch = new CountDownLatch(6); - private final Map> metadatas = Collections.synchronizedMap(new HashMap<>()); - private final Instant timestampEntry5Topic1 = Instant.now().minus(Duration.ofSeconds(10)).truncatedTo(ChronoUnit.SECONDS); - - public CountDownLatch getLatch() { - return latch; - } - - public Map> getMetadatas() { - return metadatas; - } - - public Instant getTimestampEntry5Topic1() { - return timestampEntry5Topic1; - } - - @Outgoing("invm1") - public Publisher source() { - return ReactiveStreams.of(1, 2, 3, 4, 5, 6).buildRs(); - } - - @Incoming("invm1") - @Outgoing("to-inmemory1") - public Message sendToKafka(Integer i) { - Message msg = Message.of(i); - - if (i <= 5) { - // For 6 we don't want any metadata. If we want to tweak what is set in the metadata use another entry - OutgoingKafkaRecordMetadata.OutgoingKafkaRecordMetadataBuilder mb = OutgoingKafkaRecordMetadata.builder() - .withKey("KEY-" + i); - - if (i == 5) { - mb.withHeaders(Collections.singletonList(new RecordHeader("simple", new byte[]{0, 1, 2}))); - mb.withTimestamp(timestampEntry5Topic1); - } - msg = KafkaMetadataUtil.writeOutgoingKafkaMetadata(msg, mb.build()); - return msg; - } - return msg; - } - - @Incoming("from-inmemory1") - public CompletionStage receiveFromKafka(Message msg) { - IncomingKafkaRecordMetadata metadata = KafkaMetadataUtil.readIncomingKafkaMetadata(msg).get(); - metadatas.put(msg.getPayload(), metadata); - latch.countDown(); - return msg.ack(); - } -} diff --git a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/InMemoryBean.java b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/InMemoryBean.java new file mode 100644 index 000000000000..12d9fbd68dfb --- /dev/null +++ b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/InMemoryBean.java @@ -0,0 +1,53 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2021, Red Hat, Inc., and individual contributors + * as indicated by the @author tags. See the copyright.txt file in the + * distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ + +package org.wildfly.test.integration.microprofile.reactive.messaging.inmemory; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; + +@ApplicationScoped +public class InMemoryBean { + + private String message; + + @Inject + @Channel("to-inmemory") + Emitter outgoingEmitter; + + @Incoming("from-inmemory") + public void incoming(String message) { + this.message = message; + } + + public void outgoing(String payload) { + outgoingEmitter.send(payload); + } + + public String getMessage() { + return message; + } + +} diff --git a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/ReactiveMessagingInMemoryTestCase.java b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/ReactiveMessagingInMemoryTestCase.java new file mode 100644 index 000000000000..734813b8ca3e --- /dev/null +++ b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/ReactiveMessagingInMemoryTestCase.java @@ -0,0 +1,90 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2021, Red Hat, Inc., and individual contributors + * as indicated by the @author tags. See the copyright.txt file in the + * distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ + +package org.wildfly.test.integration.microprofile.reactive.messaging.inmemory; + +import io.smallrye.reactive.messaging.memory.InMemoryConnector; +import jakarta.enterprise.inject.Any; +import jakarta.inject.Inject; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.as.arquillian.api.ServerSetup; +import org.jboss.as.test.shared.CLIServerSetupTask; +import org.jboss.as.test.shared.TimeoutUtil; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.wildfly.test.integration.microprofile.reactive.EnableReactiveExtensionsSetupTask; + +import java.util.List; +import java.util.PropertyPermission; + +import static org.jboss.as.test.shared.integration.ejb.security.PermissionUtils.createPermissionsXmlAsset; + +/** + * Run with + * mvn clean install -DallTests -pl testsuite/integration/microprofile -Dtest=ReactiveMessagingInMemoryUserApiTestCase + */ +@RunWith(Arquillian.class) +@ServerSetup({EnableReactiveExtensionsSetupTask.class}) +public class ReactiveMessagingInMemoryTestCase { + + @Inject + InMemoryBean inMemoryBean; + + @Inject + @Any + InMemoryConnector connector; + + @Deployment + public static WebArchive getDeployment() { + + return ShrinkWrap.create(WebArchive.class, "reactive-messaging-connector-inmemory.war") + .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml") + .addPackage(ReactiveMessagingInMemoryTestCase.class.getPackage()) + .addClasses(EnableReactiveExtensionsSetupTask.class, CLIServerSetupTask.class) + .addAsWebInfResource(ReactiveMessagingInMemoryTestCase.class.getPackage(), "microprofile-config.properties", "classes/META-INF/microprofile-config.properties") + .addClass(TimeoutUtil.class) + .addAsManifestResource(createPermissionsXmlAsset( + new PropertyPermission(TimeoutUtil.FACTOR_SYS_PROP, "read") + ), "permissions.xml"); + } + + @Test + public void testIncomingMessageSendFromInMemoryConnector() throws InterruptedException { + connector.source("from-inmemory").send("IncomingTestMessage"); + Assert.assertEquals("IncomingTestMessage",inMemoryBean.getMessage()); + } + + @Test + public void testOutgoingMessageReceivedByInMemoryConnector() throws InterruptedException { + inMemoryBean.outgoing("OutgoingTestMessage"); + List> messages = connector.sink("to-inmemory").received(); + Assert.assertEquals(1,messages.size()); + Assert.assertEquals("OutgoingTestMessage",messages.get(0).getPayload()); + } + +} diff --git a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/ReactiveMessagingInMemoryUserApiTestCase.java b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/ReactiveMessagingInMemoryUserApiTestCase.java deleted file mode 100644 index 34cfc53bc0ab..000000000000 --- a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/ReactiveMessagingInMemoryUserApiTestCase.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * JBoss, Home of Professional Open Source. - * Copyright 2021, Red Hat, Inc., and individual contributors - * as indicated by the @author tags. See the copyright.txt file in the - * distribution for a full listing of individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.wildfly.test.integration.microprofile.reactive.messaging.inmemory; - -import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; -import jakarta.inject.Inject; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.record.TimestampType; -import org.jboss.arquillian.container.test.api.Deployment; -import org.jboss.arquillian.junit.Arquillian; -import org.jboss.as.arquillian.api.ServerSetup; -import org.jboss.as.test.shared.CLIServerSetupTask; -import org.jboss.as.test.shared.TimeoutUtil; -import org.jboss.shrinkwrap.api.ShrinkWrap; -import org.jboss.shrinkwrap.api.asset.EmptyAsset; -import org.jboss.shrinkwrap.api.spec.WebArchive; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.wildfly.test.integration.microprofile.reactive.EnableReactiveExtensionsSetupTask; - - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.PropertyPermission; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import static org.jboss.as.test.shared.integration.ejb.security.PermissionUtils.createPermissionsXmlAsset; - -/** - * @author Kabir Khan - */ -@RunWith(Arquillian.class) -@ServerSetup({EnableReactiveExtensionsSetupTask.class}) -public class ReactiveMessagingInMemoryUserApiTestCase { - - private static final long TIMEOUT = TimeoutUtil.adjust(15000); - - @Inject - InDepthMetadataBean inDepthMetadataBean; - - @Deployment - public static WebArchive getDeployment() { - - return ShrinkWrap.create(WebArchive.class, "reactive-messaging-connector-inmemory.war") - .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml") - .addPackage(ReactiveMessagingInMemoryUserApiTestCase.class.getPackage()) - .addClasses(EnableReactiveExtensionsSetupTask.class, CLIServerSetupTask.class) - .addAsWebInfResource(ReactiveMessagingInMemoryUserApiTestCase.class.getPackage(), "microprofile-config.properties", "classes/META-INF/microprofile-config.properties") - .addClass(TimeoutUtil.class) - .addAsManifestResource(createPermissionsXmlAsset( - new PropertyPermission(TimeoutUtil.FACTOR_SYS_PROP, "read") - ), "permissions.xml"); - } - - /* - * This tests that: - * - incoming Metadata is set (also for entry 6 which did not set any metadata) and contains the topic - * - the key is propagated from what was set in the outgoing metadata, and that it may be null when not set - * - Headers are propagated, if set in the outgoing metadata - * - offsets are unique per partition - * - the timestamp and type are set, and that the timestamp matches if we set it ourselves in the outgoing metadata - */ - @Test - public void testOutgoingAndIncomingMetadataExtensively() throws InterruptedException { - inDepthMetadataBean.getLatch().await(TIMEOUT, TimeUnit.MILLISECONDS); - Map> map = inDepthMetadataBean.getMetadatas(); - - Assert.assertEquals(6, map.size()); - Map> offsetsByPartition = new HashMap<>(); - - for (int i = 1; i <= 6; i++) { - IncomingKafkaRecordMetadata metadata = map.get(i); - Assert.assertNotNull(metadata); - if (i != 6) { - Assert.assertEquals("KEY-" + i, metadata.getKey()); - } else { - Assert.assertNull(metadata.getKey()); - } - Assert.assertEquals("testing1", metadata.getTopic()); - Set offsets = offsetsByPartition.get(metadata.getPartition()); - if (offsets == null) { - offsets = new HashSet<>(); - offsetsByPartition.put(metadata.getPartition(), offsets); - } - offsets.add(metadata.getOffset()); - Assert.assertNotNull(metadata.getTimestamp()); - if (i == 5) { - Assert.assertEquals(inDepthMetadataBean.getTimestampEntry5Topic1(), metadata.getTimestamp()); - } - Assert.assertEquals(TimestampType.CREATE_TIME, metadata.getTimestampType()); - Assert.assertNotNull(metadata.getRecord()); - - Headers headers = metadata.getHeaders(); - if (i != 5) { - Assert.assertEquals(0, headers.toArray().length); - } else { - Assert.assertEquals(1, headers.toArray().length); - Header header = headers.toArray()[0]; - Assert.assertEquals("simple", header.key()); - Assert.assertArrayEquals(new byte[]{0, 1, 2}, header.value()); - } - } - Assert.assertEquals(6, checkOffsetsByPartitionAndCalculateTotalEntries(offsetsByPartition)); - } - - private int checkOffsetsByPartitionAndCalculateTotalEntries(Map> offsetsByPartition) { - int total = 0; - for (Iterator> it = offsetsByPartition.values().iterator(); it.hasNext() ; ) { - Set offsets = it.next(); - long size = offsets.size(); - total += size; - for (long l = 0; l < size; l++) { - Assert.assertTrue(offsets.contains(l)); - } - } - return total; - } - - -} diff --git a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/microprofile-config.properties b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/microprofile-config.properties index 2a8de1564f2d..16db1ba65e9f 100644 --- a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/microprofile-config.properties +++ b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/inmemory/microprofile-config.properties @@ -17,18 +17,5 @@ ########################################################################################## # Config for InDepthMetadataBean mp.messaging.outgoing.to-inmemory.connector=smallrye-in-memory -mp.messaging.outgoing.to-inmemory.topic=testing1 -mp.messaging.outgoing.to-inmemory.key.serializer=org.apache.kafka.common.serialization.StringSerializer -mp.messaging.outgoing.to-inmemory.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer - mp.messaging.incoming.from-inmemory.connector=smallrye-in-memory -mp.messaging.incoming.from-inmemory.topic=testing1 -mp.messaging.incoming.from-inmemory.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer -mp.messaging.incoming.from-inmemory.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer -# Needed as per https://github.com/smallrye/smallrye-reactive-messaging/issues/845 since the consumer -# joins after the messages are sent -mp.messaging.incoming.from-inmemory.auto.offset.reset=earliest -# Config for InDepthMetadataBean - END -########################################################################################## -