diff --git a/docker/Dockerfile b/docker/Dockerfile index 323a35f..8d0dd43 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -2,4 +2,4 @@ FROM apache/nifi:1.14.0 COPY --chown=nifi:nifi lib/*.nar /opt/nifi/nifi-current/lib/ -COPY --chown=nifi:nifi creds/*.json /tmp \ No newline at end of file +COPY --chown=nifi:nifi creds/*.json /tmp diff --git a/nifi-pulsar-client-service-api/pom.xml b/nifi-pulsar-client-service-api/pom.xml index ce3e8d6..d2f0e00 100644 --- a/nifi-pulsar-client-service-api/pom.xml +++ b/nifi-pulsar-client-service-api/pom.xml @@ -35,7 +35,7 @@ org.apache.pulsar - pulsar-client-original + pulsar-client ${pulsar.version} diff --git a/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java b/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java index 446364e..9561e21 100644 --- a/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java +++ b/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java @@ -29,5 +29,5 @@ public interface PulsarClientService extends ControllerService { public PulsarClient getPulsarClient(); public String getPulsarBrokerRootURL(); - + } diff --git a/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCache.java b/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarConsumerLRUCache.java similarity index 88% rename from nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCache.java rename to nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarConsumerLRUCache.java index 446cb88..644554f 100644 --- a/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCache.java +++ b/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarConsumerLRUCache.java @@ -21,17 +21,17 @@ import org.apache.commons.collections4.map.LRUMap; -public class PulsarClientLRUCache extends LRUMap { +public class PulsarConsumerLRUCache extends LRUMap { private static final long serialVersionUID = 730163138087670453L; private final static float LOAD_FACTOR = 0.75F; private final static boolean SCAN_UNTIL_REMOVABLE = false; - public PulsarClientLRUCache(int maxSize) { + public PulsarConsumerLRUCache(int maxSize) { this(maxSize, LOAD_FACTOR, SCAN_UNTIL_REMOVABLE); } - public PulsarClientLRUCache(int maxSize, float loadFactor, boolean scanUntilRemovable) { + public PulsarConsumerLRUCache(int maxSize, float loadFactor, boolean scanUntilRemovable) { super(maxSize, loadFactor, scanUntilRemovable); } diff --git a/nifi-pulsar-client-service-api/src/test/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCacheTest.java b/nifi-pulsar-client-service-api/src/test/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCacheTest.java index 42abe7d..9274898 100644 --- a/nifi-pulsar-client-service-api/src/test/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCacheTest.java +++ b/nifi-pulsar-client-service-api/src/test/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCacheTest.java @@ -44,7 +44,7 @@ public void setUp() throws InterruptedException { */ @Test public void simpleTest() { - PulsarClientLRUCache cache = new PulsarClientLRUCache(10); + PulsarConsumerLRUCache cache = new PulsarConsumerLRUCache(10); for (Character i='A'; i<='E'; i++){ cache.put(i.toString(), mockedPulsarProducer); @@ -60,7 +60,7 @@ public void simpleTest() { @Test public void evictionTest() { - PulsarClientLRUCache cache = new PulsarClientLRUCache(5); + PulsarConsumerLRUCache cache = new PulsarConsumerLRUCache(5); for (Character i='A'; i<='Z'; i++){ cache.put(i.toString(), mockedPulsarProducer); @@ -78,7 +78,7 @@ public void evictionTest() { @Test public void evictionLruTest() { - PulsarClientLRUCache cache = new PulsarClientLRUCache(5); + PulsarConsumerLRUCache cache = new PulsarConsumerLRUCache(5); final Character A = 'A'; @@ -102,7 +102,7 @@ public void evictionLruTest() { @Test public void clearTest() throws PulsarClientException { - PulsarClientLRUCache cache = new PulsarClientLRUCache(26); + PulsarConsumerLRUCache cache = new PulsarConsumerLRUCache(26); for (Character i='A'; i<='Z'; i++) { cache.put(i.toString(), mockedPulsarProducer); diff --git a/nifi-pulsar-client-service/pom.xml b/nifi-pulsar-client-service/pom.xml index 476bb4b..cb53d2a 100644 --- a/nifi-pulsar-client-service/pom.xml +++ b/nifi-pulsar-client-service/pom.xml @@ -27,11 +27,22 @@ + + org.apache.pulsar + pulsar-client + ${pulsar.version} + + + + org.apache.pulsar + pulsar-client-admin + ${pulsar.version} + + org.apache.nifi nifi-pulsar-client-service-api ${version} - provided diff --git a/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java b/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java index a2e03fa..e69b76f 100644 --- a/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java +++ b/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java @@ -53,9 +53,7 @@ public class StandardPulsarClientService extends AbstractControllerService imple .defaultValue("false") .description("") .displayName("Allow TLS Insecure Connection") - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(false) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); public static final PropertyDescriptor AUTHENTICATION_SERVICE = new PropertyDescriptor.Builder() @@ -96,9 +94,7 @@ public class StandardPulsarClientService extends AbstractControllerService imple + "It validates incoming x509 certificate and matches provided hostname(CN/SAN) with expected " + "broker's host name. It follows RFC 2818, 3.1. Server Identity hostname verification.") .displayName("Enable TLS Hostname Verification") - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(false) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() @@ -233,10 +229,10 @@ protected List getSupportedPropertyDescriptors() { @OnEnabled public void onEnabled(final ConfigurationContext context) throws InitializationException, UnsupportedAuthenticationException { try { - client = getClientBuilder(context).build(); + client = getClient(context); brokerUrl = context.getProperty(PULSAR_SERVICE_URL).evaluateAttributeExpressions().getValue(); } catch (Exception e) { - throw new InitializationException("Unable to create Pulsar Client", e); + throw new InitializationException("Unable to connect to the Pulsar cluster ", e); } } @@ -258,11 +254,11 @@ public String getPulsarBrokerRootURL() { return brokerUrl; } - private ClientBuilder getClientBuilder(ConfigurationContext context) throws UnsupportedAuthenticationException, MalformedURLException { + private PulsarClient getClient(ConfigurationContext context) throws MalformedURLException, PulsarClientException { ClientBuilder builder = PulsarClient.builder() - .allowTlsInsecureConnection(context.getProperty(ALLOW_TLS_INSECURE_CONNECTION).evaluateAttributeExpressions().asBoolean()) - .enableTlsHostnameVerification(context.getProperty(ENABLE_TLS_HOSTNAME_VERIFICATION).evaluateAttributeExpressions().asBoolean()) + .allowTlsInsecureConnection(context.getProperty(ALLOW_TLS_INSECURE_CONNECTION).asBoolean()) + .enableTlsHostnameVerification(context.getProperty(ENABLE_TLS_HOSTNAME_VERIFICATION).asBoolean()) .maxConcurrentLookupRequests(context.getProperty(CONCURRENT_LOOKUP_REQUESTS).evaluateAttributeExpressions().asInteger()) .connectionsPerBroker(context.getProperty(CONNECTIONS_PER_BROKER).evaluateAttributeExpressions().asInteger()) .ioThreads(context.getProperty(IO_THREADS).evaluateAttributeExpressions().asInteger()) @@ -288,7 +284,7 @@ private ClientBuilder getClientBuilder(ConfigurationContext context) throws Unsu } builder = builder.serviceUrl(context.getProperty(PULSAR_SERVICE_URL).evaluateAttributeExpressions().getValue()); - return builder; + return builder.build(); } } diff --git a/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/validator/PulsarBrokerUrlValidator.java b/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/validator/PulsarBrokerUrlValidator.java index 8f11ed2..b0d21c5 100644 --- a/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/validator/PulsarBrokerUrlValidator.java +++ b/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/validator/PulsarBrokerUrlValidator.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.pulsar.validator; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; diff --git a/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java b/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java index f9c4bd9..1efb941 100644 --- a/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java +++ b/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java @@ -24,7 +24,7 @@ public class TestStandardPulsarClientService { @Test - public void testService() throws InitializationException { + public void validServiceTest() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); final PulsarClientService service = new StandardPulsarClientService(); runner.addControllerService("test-good", service); diff --git a/nifi-pulsar-processors/pom.xml b/nifi-pulsar-processors/pom.xml index 3fb9fa8..af044c1 100644 --- a/nifi-pulsar-processors/pom.xml +++ b/nifi-pulsar-processors/pom.xml @@ -24,7 +24,7 @@ nifi-pulsar-processors jar - + org.apache.nifi @@ -56,9 +56,38 @@ org.apache.nifi nifi-pulsar-client-service-api ${version} - provided + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + + org.apache.commons + commons-compress + ${commons-compress.version} + + + + org.apache.directory.studio + org.apache.commons.io + ${commons.io.version} + + + + com.fasterxml.jackson.core + jackson-core + ${jackson-core.version} + + + + com.google.protobuf + protobuf-java + ${protobuf3.version} + + org.apache.nifi nifi-mock @@ -77,24 +106,6 @@ test - - org.apache.commons - commons-lang3 - ${commons-lang3.version} - - - - org.apache.directory.studio - org.apache.commons.io - ${commons.io.version} - - - - com.fasterxml.jackson.core - jackson-core - ${jackson-core.version} - - diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java index 91ac5d6..93638d4 100644 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java @@ -47,15 +47,18 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.PropertyMappingUtils; import org.apache.nifi.pulsar.PulsarClientService; -import org.apache.nifi.pulsar.cache.PulsarClientLRUCache; +import org.apache.nifi.pulsar.cache.PulsarConsumerLRUCache; import org.apache.nifi.util.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.schema.GenericRecord; public abstract class AbstractPulsarConsumerProcessor extends AbstractProcessor { protected static final String PULSAR_MESSAGE_KEY = "__KEY__"; @@ -263,9 +266,9 @@ public abstract class AbstractPulsarConsumerProcessor extends AbstractProcess } private PulsarClientService pulsarClientService; - private PulsarClientLRUCache> consumers; + private PulsarConsumerLRUCache> consumers; private ExecutorService consumerPool; - private ExecutorCompletionService>> consumerService; + private ExecutorCompletionService>> consumerService; private ExecutorService ackPool; private ExecutorCompletionService ackService; @@ -367,14 +370,15 @@ protected String getConsumerId(final ProcessContext context, FlowFile flowFile) return sb.toString(); } - protected void consumeAsync(final Consumer consumer, ProcessContext context, ProcessSession session) throws PulsarClientException { + protected void consumeAsync(final Consumer consumer, + ProcessContext context, ProcessSession session) throws PulsarClientException { try { final int maxMessages = context.getProperty(CONSUMER_BATCH_SIZE).isSet() ? context.getProperty(CONSUMER_BATCH_SIZE) .evaluateAttributeExpressions().asInteger() : Integer.MAX_VALUE; getConsumerService().submit(() -> { - List> messages = new LinkedList>(); - Message msg = null; + List> messages = new LinkedList>(); + Message msg = null; AtomicInteger msgCount = new AtomicInteger(0); while (((msg = consumer.receive(0, TimeUnit.SECONDS)) != null) && msgCount.get() < maxMessages) { @@ -389,14 +393,14 @@ protected void consumeAsync(final Consumer consumer, ProcessContext context, } } - protected synchronized Consumer getConsumer(ProcessContext context, String topic) throws PulsarClientException { + protected synchronized Consumer getConsumer(ProcessContext context, String topic) throws PulsarClientException { /* Avoid creating producers for non-existent topics */ if (StringUtils.isBlank(topic)) { return null; } - Consumer consumer = getConsumers().get(topic); + Consumer consumer = getConsumers().get(topic); if (consumer != null && consumer.isConnected()) { return consumer; @@ -411,15 +415,19 @@ protected synchronized Consumer getConsumer(ProcessContext context, String to return (consumer != null && consumer.isConnected()) ? consumer : null; } - protected synchronized ConsumerBuilder getConsumerBuilder(ProcessContext context) throws PulsarClientException { - - ConsumerBuilder builder = (ConsumerBuilder) getPulsarClientService().getPulsarClient().newConsumer(); + protected synchronized ConsumerBuilder getConsumerBuilder(ProcessContext context) throws PulsarClientException { + + ConsumerBuilder builder = + getPulsarClientService().getPulsarClient().newConsumer(Schema.AUTO_CONSUME()); if (context.getProperty(TOPICS).isSet()) { - builder = builder.topic(Arrays.stream(context.getProperty(TOPICS).evaluateAttributeExpressions().getValue().split("[, ]")) - .map(String::trim).toArray(String[]::new)); + String[] topics = Arrays.stream(context.getProperty(TOPICS).evaluateAttributeExpressions().getValue().split("[, ]")) + .map(String::trim).toArray(String[]::new); + + builder = builder.topic(topics); } else if (context.getProperty(TOPICS_PATTERN).isSet()) { - builder = builder.topicsPattern(context.getProperty(TOPICS_PATTERN).getValue()); + String topicsPattern = context.getProperty(TOPICS_PATTERN).getValue(); + builder = builder.topicsPattern(topicsPattern); } if (context.getProperty(CONSUMER_NAME).isSet()) { @@ -433,7 +441,7 @@ protected synchronized ConsumerBuilder getConsumerBuilder(ProcessContext cont .subscriptionType(SubscriptionType.valueOf(context.getProperty(SUBSCRIPTION_TYPE).getValue())); } - protected synchronized ExecutorService getConsumerPool() { + protected synchronized ExecutorService getConsumerPool() { return consumerPool; } @@ -441,11 +449,11 @@ protected synchronized void setConsumerPool(ExecutorService pool) { this.consumerPool = pool; } - protected synchronized ExecutorCompletionService>> getConsumerService() { + protected synchronized ExecutorCompletionService>> getConsumerService() { return consumerService; } - protected synchronized void setConsumerService(ExecutorCompletionService>> service) { + protected synchronized void setConsumerService(ExecutorCompletionService>> service) { this.consumerService = service; } @@ -473,21 +481,22 @@ protected synchronized void setPulsarClientService(PulsarClientService pulsarCli this.pulsarClientService = pulsarClientService; } - protected synchronized PulsarClientLRUCache> getConsumers() { + protected synchronized PulsarConsumerLRUCache> getConsumers() { if (consumers == null) { - consumers = new PulsarClientLRUCache>(20); + consumers = new PulsarConsumerLRUCache>(20); } return consumers; } - protected void setConsumers(PulsarClientLRUCache> consumers) { + protected void setConsumers(PulsarConsumerLRUCache> consumers) { this.consumers = consumers; } - protected Map getMappedFlowFileAttributes(ProcessContext context, final Message message) { + protected Map getMappedFlowFileAttributes(ProcessContext context, final Message msg) { String mappings = context.getProperty(MAPPED_FLOWFILE_ATTRIBUTES).getValue(); - return PropertyMappingUtils.getMappedValues(mappings, (p) -> PULSAR_MESSAGE_KEY.equals(p) ? message.getKey() : message.getProperty(p)); + return PropertyMappingUtils.getMappedValues(mappings, + (p) -> PULSAR_MESSAGE_KEY.equals(p) ? msg.getKey() : msg.getProperty(p)); } protected boolean isSharedSubscription(ProcessContext context) { diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java index 05ed3eb..c7e3209 100644 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java @@ -48,7 +48,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.pulsar.PulsarClientService; -import org.apache.nifi.pulsar.cache.PulsarClientLRUCache; +import org.apache.nifi.pulsar.cache.PulsarConsumerLRUCache; import org.apache.nifi.util.StringUtils; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.MessageId; @@ -271,7 +271,7 @@ protected List getSupportedPropertyDescriptors() { } private PulsarClientService pulsarClientService; - private PulsarClientLRUCache> producers; + private PulsarConsumerLRUCache> producers; private ExecutorService publisherPool; // Used to sync between onTrigger method and shutdown code block. @@ -441,14 +441,14 @@ protected synchronized void setPulsarClientService(PulsarClientService pulsarCli this.pulsarClientService = pulsarClientService; } - protected synchronized PulsarClientLRUCache> getProducers() { + protected synchronized PulsarConsumerLRUCache> getProducers() { if (producers == null) { - producers = new PulsarClientLRUCache>(20); + producers = new PulsarConsumerLRUCache>(20); } return producers; } - protected synchronized void setProducers(PulsarClientLRUCache> producers) { + protected synchronized void setProducers(PulsarConsumerLRUCache> producers) { this.producers = producers; } diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java index ae12a9e..98fe2e4 100644 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.commons.io.IOUtils; @SeeAlso({PublishPulsar.class, ConsumePulsarRecord.class, PublishPulsarRecord.class}) @@ -58,7 +59,7 @@ public class ConsumePulsar extends AbstractPulsarConsumerProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { try { - Consumer consumer = getConsumer(context, getConsumerId(context, session.get())); + Consumer consumer = getConsumer(context, getConsumerId(context, session.get())); if (consumer == null) { context.yield(); @@ -78,9 +79,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro } } - private void handleAsync(final Consumer consumer, ProcessContext context, ProcessSession session) { + private void handleAsync(final Consumer consumer, ProcessContext context, ProcessSession session) { try { - Future>> done = getConsumerService().poll(5, TimeUnit.SECONDS); + Future>> done = getConsumerService().poll(5, TimeUnit.SECONDS); if (done != null) { @@ -90,7 +91,7 @@ private void handleAsync(final Consumer consumer, ProcessContext context // Cumulative acks are NOT permitted on Shared subscriptions. final boolean shared = isSharedSubscription(context); - List> messages = done.get(); + List> messages = done.get(); if (CollectionUtils.isNotEmpty(messages)) { FlowFile flowFile = null; @@ -98,10 +99,10 @@ private void handleAsync(final Consumer consumer, ProcessContext context AtomicInteger msgCount = new AtomicInteger(0); Map lastAttributes = null; - Message lastMessage = null; + Message lastMessage = null; Map currentAttributes = null; - for (Message msg : messages) { + for (Message msg : messages) { currentAttributes = getMappedFlowFileAttributes(context, msg); if (lastAttributes != null && !lastAttributes.equals(currentAttributes)) { @@ -114,7 +115,7 @@ private void handleAsync(final Consumer consumer, ProcessContext context session.commitAsync(); if (!shared) { - final Message finalMessage = lastMessage; + final Message finalMessage = lastMessage; // Cumulatively acknowledge consuming the messages for non-shared subs getAckService().submit(new Callable() { @@ -156,8 +157,13 @@ public Object call() throws Exception { out.write(demarcatorBytes); } - out.write(msg.getValue()); - msgCount.getAndIncrement(); + byte[] data = msg.getData(); + + if (data != null && data.length > 0) { + out.write(data); + msgCount.getAndIncrement(); + } + } catch (final IOException ioEx) { session.rollback(); return; @@ -186,7 +192,7 @@ public Object call() throws Exception { } } - private void consume(Consumer consumer, ProcessContext context, ProcessSession session) throws PulsarClientException { + private void consume(Consumer consumer, ProcessContext context, ProcessSession session) throws PulsarClientException { try { final int maxMessages = context.getProperty(CONSUMER_BATCH_SIZE).isSet() ? context.getProperty(CONSUMER_BATCH_SIZE) @@ -200,8 +206,8 @@ private void consume(Consumer consumer, ProcessContext context, ProcessS FlowFile flowFile = null; OutputStream out = null; - Message msg = null; - Message lastMsg = null; + Message msg = null; + Message lastMsg = null; AtomicInteger msgCount = new AtomicInteger(0); AtomicInteger loopCounter = new AtomicInteger(0); @@ -249,20 +255,19 @@ private void consume(Consumer consumer, ProcessContext context, ProcessS if (shared) { consumer.acknowledge(msg); } - - byte[] msgValue = msg.getValue(); - // Skip empty messages, as they cause NPE's when we write them to the OutputStream - if (msgValue == null || msgValue.length < 1) { - continue; - } // only write demarcators between messages if (msgCount.get() > 0) { out.write(demarcatorBytes); } - out.write(msgValue); - msgCount.getAndIncrement(); + byte[] data = msg.getData(); + + if (data != null && data.length > 0) { + out.write(data); + msgCount.getAndIncrement(); + } + } catch (final IOException ioEx) { getLogger().error("Unable to create flow file ", ioEx); session.rollback(); diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java index aa0a018..9b9b68b 100644 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java @@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.schema.GenericRecord; @CapabilityDescription("Consumes messages from Apache Pulsar. " + "The complementary NiFi processor for sending messages is PublishPulsarRecord. Please note that, at this time, " @@ -81,7 +82,7 @@ }) @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @SeeAlso({PublishPulsar.class, ConsumePulsar.class, PublishPulsarRecord.class}) -public class ConsumePulsarRecord extends AbstractPulsarConsumerProcessor { +public class ConsumePulsarRecord extends AbstractPulsarConsumerProcessor { public static final String MSG_COUNT = "record.count"; private static final String RECORD_SEPARATOR = "\n"; @@ -159,7 +160,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro .evaluateAttributeExpressions().getValue().getBytes() : RECORD_SEPARATOR.getBytes(); try { - Consumer consumer = getConsumer(context, getConsumerId(context, session.get())); + Consumer consumer = getConsumer(context, getConsumerId(context, session.get())); if (consumer == null) { /* If we aren't connected to Pulsar, then just yield */ context.yield(); @@ -187,9 +188,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro * @return A List of Messages * @throws PulsarClientException in the event we cannot communicate with the Pulsar broker. */ - private List> getMessages(final Consumer consumer, int maxMessages) throws PulsarClientException { - List> messages = new LinkedList>(); - Message msg = null; + private List> getMessages(final Consumer consumer, int maxMessages) throws PulsarClientException { + List> messages = new LinkedList>(); + Message msg = null; AtomicInteger msgCount = new AtomicInteger(0); while (((msg = consumer.receive(0, TimeUnit.SECONDS)) != null) && msgCount.get() < maxMessages) { @@ -212,34 +213,37 @@ private List> getMessages(final Consumer consumer, int m * @param writerFactory - The factory used to write the messages. * @throws PulsarClientException if there is an issue communicating with Apache Pulsar. */ - private void consumeMessages(ProcessContext context, ProcessSession session, final Consumer consumer, final List> messages, - final RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, final byte[] demarcator, final boolean async) throws PulsarClientException { + private void consumeMessages(ProcessContext context, ProcessSession session, + final Consumer consumer, final List> messages, + final RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, + final byte[] demarcator, final boolean async) throws PulsarClientException { if (CollectionUtils.isEmpty(messages)) { return; } + final BlockingQueue> parseFailures = + new LinkedBlockingQueue>(); + RecordSchema schema = null; - final BlockingQueue> parseFailures = new LinkedBlockingQueue>(); FlowFile flowFile = null; OutputStream rawOut = null; RecordSetWriter writer = null; Map lastAttributes = null; - Message lastMessage = null; + Message lastMessage = null; Map currentAttributes = null; // Cumulative acks are NOT permitted on Shared subscriptions final boolean shared = isSharedSubscription(context); try { - for (Message msg : messages) { + for (Message msg : messages) { currentAttributes = getMappedFlowFileAttributes(context, msg); - byte[] msgValue = msg.getValue(); - + // if the current message's mapped attribute values differ from the previous set's, // write out the active record set and clear various references so that we'll start a new one - if (lastMessage != null && !lastAttributes.equals(currentAttributes)) { + if (lastAttributes != null && !lastAttributes.equals(currentAttributes)) { WriteResult result = writer.finishRecordSet(); IOUtils.closeQuietly(writer); IOUtils.closeQuietly(rawOut); @@ -257,7 +261,7 @@ private void consumeMessages(ProcessContext context, ProcessSession session, fin parseFailures.clear(); if (!shared) { - acknowledgeCumulative(consumer, lastMessage, async); + acknowledgeCumulative(consumer, lastMessage, async); } lastAttributes = null; @@ -265,19 +269,19 @@ private void consumeMessages(ProcessContext context, ProcessSession session, fin } // if there's no record set actively being written, begin one + byte[] data = msg.getData(); if (lastMessage == null) { flowFile = session.create(); flowFile = session.putAllAttributes(flowFile, currentAttributes); - schema = getSchema(flowFile, readerFactory, msgValue); + schema = getSchema(flowFile, readerFactory, data); rawOut = session.write(flowFile); - writer = getRecordWriter(writerFactory, schema, rawOut); + writer = getRecordWriter(writerFactory, schema, rawOut, flowFile); if (schema == null || writer == null) { parseFailures.add(msg); session.remove(flowFile); IOUtils.closeQuietly(rawOut); getLogger().error("Unable to create a record writer to consume from the Pulsar topic"); - continue; } @@ -288,13 +292,14 @@ private void consumeMessages(ProcessContext context, ProcessSession session, fin lastMessage = msg; if (shared) { - acknowledge(consumer, msg, async); + acknowledge(consumer, msg, async); } - // write each of records in the current message to the active record set. These will each + // write each of the records in the current message to the active record set. These will each // have the same mapped flowfile attribute values, which means that it's ok that they are all placed // in the same output flowfile. - final InputStream in = new ByteArrayInputStream(msgValue); + + final InputStream in = new ByteArrayInputStream(data); try { RecordReader r = readerFactory.createRecordReader(flowFile, in, getLogger()); for (Record record = r.nextRecord(); record != null; record = r.nextRecord()) { @@ -325,11 +330,11 @@ private void consumeMessages(ProcessContext context, ProcessSession session, fin handleFailures(session, parseFailures, demarcator); if (!shared) { - acknowledgeCumulative(consumer, messages.get(messages.size() - 1), async); + acknowledgeCumulative(consumer, messages.get(messages.size() - 1), async); } } - private void acknowledge(final Consumer consumer, final Message msg, final boolean async) throws PulsarClientException { + private void acknowledge(final Consumer consumer, final Message msg, final boolean async) throws PulsarClientException { if (async) { getAckService().submit(new Callable() { @Override @@ -343,7 +348,7 @@ public Object call() throws Exception { } } - private void acknowledgeCumulative(final Consumer consumer, final Message msg, final boolean async) throws PulsarClientException { + private void acknowledgeCumulative(final Consumer consumer, final Message msg, final boolean async) throws PulsarClientException { if (async) { getAckService().submit(new Callable() { @Override @@ -357,7 +362,8 @@ public Object call() throws Exception { } } - private void handleFailures(ProcessSession session, BlockingQueue> parseFailures, byte[] demarcator) { + private void handleFailures(ProcessSession session, + BlockingQueue> parseFailures, byte[] demarcator) { if (CollectionUtils.isEmpty(parseFailures)) { return; @@ -367,18 +373,17 @@ private void handleFailures(ProcessSession session, BlockingQueue> failureIterator = parseFailures.iterator(); + Iterator> failureIterator = parseFailures.iterator(); for (int idx = 0; failureIterator.hasNext(); idx++) { - Message msg = failureIterator.next(); - byte[] msgValue = msg.getValue(); + Message msg = failureIterator.next(); - if (msgValue != null && msgValue.length > 0) { + if (msg != null && msg.getData() != null) { if (idx > 0) { rawOut.write(demarcator); } - rawOut.write(msgValue); + rawOut.write(msg.getData()); } } IOUtils.closeQuietly(rawOut); @@ -392,18 +397,18 @@ private void handleFailures(ProcessSession session, BlockingQueue consumer, + protected void handleAsync(ProcessContext context, ProcessSession session, final Consumer consumer, final RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, byte[] demarcator) throws PulsarClientException { final Integer queryTimeout = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue(); try { - Future>> done = null; + Future>> done = null; do { done = getConsumerService().poll(queryTimeout, TimeUnit.SECONDS); if (done != null) { - List> messages = done.get(); + List> messages = done.get(); if (CollectionUtils.isNotEmpty(messages)) { consumeMessages(context, session, consumer, messages, readerFactory, writerFactory, demarcator, true); } @@ -423,6 +428,7 @@ private RecordSchema getSchema(FlowFile flowFile, RecordReaderFactory readerFact in = new ByteArrayInputStream(msgValue); schema = readerFactory.createRecordReader(flowFile, in, getLogger()).getSchema(); } catch (MalformedRecordException | IOException | SchemaNotFoundException e) { + getLogger().error("Unable to determine the schema", e); return null; } finally { IOUtils.closeQuietly(in); @@ -431,10 +437,11 @@ private RecordSchema getSchema(FlowFile flowFile, RecordReaderFactory readerFact return schema; } - private RecordSetWriter getRecordWriter(RecordSetWriterFactory writerFactory, RecordSchema srcSchema, OutputStream out) { + private RecordSetWriter getRecordWriter(RecordSetWriterFactory writerFactory, + RecordSchema srcSchema, OutputStream out, FlowFile flowFile) { try { RecordSchema writeSchema = writerFactory.getSchema(Collections.emptyMap(), srcSchema); - return writerFactory.createWriter(getLogger(), writeSchema, out); + return writerFactory.createWriter(getLogger(), writeSchema, out, flowFile); } catch (SchemaNotFoundException | IOException e) { return null; } diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java index cbda218..8e81064 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java @@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -31,8 +32,6 @@ import org.mockito.junit.MockitoRule; import static org.junit.Assert.assertEquals; - -import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -44,11 +43,12 @@ public class TestConsumePulsar extends AbstractPulsarProcessorTest { @Mock - protected Message mockMessage; + protected Message mockMessage; @Rule public MockitoRule mockitoRule = MockitoJUnit.rule(); - @Before + @SuppressWarnings("unchecked") + @Before public void init() throws InitializationException { runner = TestRunners.newTestRunner(ConsumePulsar.class); mockMessage = mock(Message.class); @@ -94,38 +94,10 @@ public void multipleAsyncMessagesBatchTest() throws PulsarClientException { public void multipleAsyncMessagesBatchSharedSubTest() throws PulsarClientException { this.batchMessages("Mocked Message", "foo", "bar", true, 40, "Shared"); } - - /* - * Verify that the consumer gets closed. - */ - @Test - public void onStoppedTest() throws NoSuchMethodException, SecurityException, PulsarClientException { - when(mockMessage.getValue()).thenReturn("Mocked Message".getBytes()); - mockClientService.setMockMessage(mockMessage); - - runner.setProperty(ConsumePulsar.TOPICS, "foo"); - runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar"); - runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive"); - runner.run(10, true); - runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS); - - runner.assertQueueEmpty(); - - // Verify that the receive method on the consumer was called 10 times - int batchSize = Integer.parseInt(ConsumePulsar.CONSUMER_BATCH_SIZE.getDefaultValue()); - verify(mockClientService.getMockConsumer(), atLeast(10 * batchSize)).receive(0, TimeUnit.SECONDS); - - // Verify that each message was acknowledged - verify(mockClientService.getMockConsumer(), times(10)).acknowledgeCumulative(mockMessage); - - // Verify that the consumer was closed - verify(mockClientService.getMockConsumer(), times(1)).close(); - - } @Test public void keySharedTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("Mocked Message".getBytes()); + when(mockMessage.getData()).thenReturn("Mocked Message".getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.TOPICS, "foo"); @@ -144,7 +116,7 @@ protected void batchMessages(String msg, String topic, String sub, boolean async } protected void batchMessages(String msg, String topic, String sub, boolean async, int batchSize, String subType) throws PulsarClientException { - when(mockMessage.getValue()).thenReturn(msg.getBytes()); + when(mockMessage.getData()).thenReturn(msg.getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.ASYNC_ENABLED, Boolean.toString(async)); @@ -200,7 +172,7 @@ protected void sendMessages(String msg, String topic, String sub, boolean async, protected void sendMessages(String msg, String topic, String sub, boolean async, int iterations, String subType) throws PulsarClientException { - when(mockMessage.getValue()).thenReturn(msg.getBytes()); + when(mockMessage.getData()).thenReturn(msg.getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.ASYNC_ENABLED, Boolean.toString(async)); @@ -240,9 +212,24 @@ protected void sendMessages(String msg, String topic, String sub, boolean async, } protected void doMappedAttributesTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("A".getBytes()).thenReturn("B".getBytes()).thenReturn("C".getBytes()).thenReturn("D".getBytes()); - when(mockMessage.getProperty("prop")).thenReturn(null).thenReturn(null).thenReturn("val").thenReturn("val"); - when(mockMessage.getKey()).thenReturn(null).thenReturn(null).thenReturn(null).thenReturn("K"); + when(mockMessage.getData()) + .thenReturn("A".getBytes()) + .thenReturn("B".getBytes()) + .thenReturn("C".getBytes()) + .thenReturn("D".getBytes()); + + when(mockMessage.getProperty("prop")) + .thenReturn(null) + .thenReturn(null) + .thenReturn("val") + .thenReturn("val"); + + when(mockMessage.getKey()) + .thenReturn(null) + .thenReturn(null) + .thenReturn(null) + .thenReturn("K"); + mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.MAPPED_FLOWFILE_ATTRIBUTES, "prop,key=__KEY__"); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord.java index 696ef71..b915162 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord.java @@ -33,12 +33,12 @@ import org.apache.nifi.processors.pulsar.pubsub.mocks.MockRecordParser; import org.apache.nifi.processors.pulsar.pubsub.mocks.MockRecordWriter; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunners; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -51,12 +51,13 @@ public class TestConsumePulsarRecord extends AbstractPulsarProcessorTest protected static String DEFAULT_SUB = "bar"; @Mock - protected Message mockMessage; + protected Message mockMessage; protected MockRecordParser readerService; protected MockRecordWriter writerService; - @Before + @SuppressWarnings("unchecked") + @Before public void setup() throws InitializationException { mockMessage = mock(Message.class); @@ -113,7 +114,7 @@ protected void doFailedParseHandlingTest(String msg, String topic, String sub, b runner.setProperty(ConsumePulsarRecord.MAX_WAIT_TIME, "0 sec"); } - when(mockMessage.getValue()).thenReturn(msg.getBytes()); + when(mockMessage.getData()).thenReturn(msg.getBytes()); mockClientService.setMockMessage(mockMessage); runner.run(); @@ -142,7 +143,7 @@ protected List sendMessages(String msg, String topic, String sub, } protected List sendMessages(String msg, String topic, String sub, boolean async, int iterations, int batchSize, String subType) throws PulsarClientException { - when(mockMessage.getValue()).thenReturn(msg.getBytes()); + when(mockMessage.getData()).thenReturn(msg.getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.ASYNC_ENABLED, Boolean.toString(async)); @@ -186,9 +187,21 @@ protected List sendMessages(String msg, String topic, String sub, } protected void doMappedAttributesTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("A,10".getBytes()).thenReturn("B,10".getBytes()).thenReturn("C,10".getBytes()).thenReturn("D,10".getBytes()); - when(mockMessage.getProperty("prop")).thenReturn(null).thenReturn(null).thenReturn("val").thenReturn("val"); - when(mockMessage.getKey()).thenReturn(null).thenReturn(null).thenReturn(null).thenReturn("K"); + when(mockMessage.getData()).thenReturn("A,10".getBytes()) + .thenReturn("B,10".getBytes()) + .thenReturn("C,10".getBytes()) + .thenReturn("D,10".getBytes()); + + when(mockMessage.getProperty("prop")).thenReturn(null) + .thenReturn(null) + .thenReturn("val") + .thenReturn("val"); + + when(mockMessage.getKey()).thenReturn(null) + .thenReturn(null) + .thenReturn(null) + .thenReturn("K"); + mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.MAPPED_FLOWFILE_ATTRIBUTES, "prop,key=__KEY__"); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsar.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsar.java index 074738b..aedee67 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsar.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsar.java @@ -52,7 +52,7 @@ public void pulsarClientExceptionTest() throws PulsarClientException { @Test public void emptyMessageTest() { - when(mockMessage.getValue()).thenReturn("".getBytes()); + when(mockMessage.getData()).thenReturn("".getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.TOPICS, "foo"); @@ -76,25 +76,6 @@ public void multipleMessagesTest() throws PulsarClientException { this.sendMessages("Mocked Message", "foo", "bar", true, 40); } - /* - * Verify that the consumer gets closed. - */ - @Test - public void onStoppedTest() throws NoSuchMethodException, SecurityException, PulsarClientException { - when(mockMessage.getValue()).thenReturn("Mocked Message".getBytes()); - mockClientService.setMockMessage(mockMessage); - - runner.setProperty(ConsumePulsar.TOPICS, "foo"); - runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar"); - runner.run(10, true); - runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS); - - runner.assertQueueEmpty(); - - // Verify that the consumer was closed - verify(mockClientService.getMockConsumer(), times(1)).close(); - } - @Test public void mappedAttributesTest() throws PulsarClientException { runner.setProperty(ConsumePulsar.ASYNC_ENABLED, Boolean.toString(true)); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsarRecord.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsarRecord.java index 15806f0..d8dd007 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsarRecord.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsarRecord.java @@ -24,7 +24,6 @@ import java.util.List; -import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsar; import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsarRecord; import org.apache.nifi.processors.pulsar.pubsub.TestConsumePulsarRecord; import org.apache.nifi.util.MockFlowFile; @@ -35,7 +34,7 @@ public class TestAsyncConsumePulsarRecord extends TestConsumePulsarRecord { @Test public void emptyMessageTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("".getBytes()); + when(mockMessage.getData()).thenReturn("".getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.TOPICS, DEFAULT_TOPIC); @@ -51,7 +50,7 @@ public void emptyMessageTest() throws PulsarClientException { @Test public void singleMalformedMessageTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn(BAD_MSG.getBytes()); + when(mockMessage.getData()).thenReturn(BAD_MSG.getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.TOPICS, DEFAULT_TOPIC); @@ -110,7 +109,7 @@ public void singleMessageWithGoodAndBadRecordsTest() throws PulsarClientExceptio } } - when(mockMessage.getValue()).thenReturn(input.toString().getBytes()); + when(mockMessage.getData()).thenReturn(input.toString().getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.ASYNC_ENABLED, Boolean.toString(false)); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java index b7f9452..4821564 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java @@ -25,6 +25,7 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -36,15 +37,15 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.mockito.ArgumentMatcher; -import org.mockito.Matchers; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.mockito.stubbing.OngoingStubbing; - import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; @@ -52,28 +53,35 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; +@SuppressWarnings("unchecked") public class MockPulsarClientService extends AbstractControllerService implements PulsarClientService { @Mock PulsarClient mockClient = mock(PulsarClient.class); - + @Mock + PulsarAdmin mockAdmin = mock(PulsarAdmin.class); + + @Mock ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class); @Mock - ConsumerBuilder mockConsumerBuilder = mock(ConsumerBuilder.class); + ConsumerBuilder mockConsumerBuilder = mock(ConsumerBuilder.class); @Mock Producer mockProducer = mock(Producer.class); @Mock - Consumer mockConsumer = mock(Consumer.class); + Consumer mockConsumer = mock(Consumer.class); @Mock TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilder.class); @Mock - protected Message mockMessage; + protected Message mockMessage; + + @Mock + SchemaInfo mockSchema = mock(SchemaInfo.class); @Mock MessageId mockMessageId = mock(MessageId.class); @@ -82,8 +90,9 @@ public class MockPulsarClientService extends AbstractControllerService implem public MockPulsarClientService() { when(mockClient.newProducer()).thenReturn((ProducerBuilder) mockProducerBuilder); - when(mockClient.newConsumer()).thenReturn((ConsumerBuilder) mockConsumerBuilder); - + when(mockClient.newConsumer(Schema.AUTO_CONSUME())).thenReturn((ConsumerBuilder) mockConsumerBuilder); + when(mockClient.newConsumer(any(Schema.class))).thenReturn((ConsumerBuilder) mockConsumerBuilder); + when(mockProducerBuilder.topic(anyString())).thenReturn(mockProducerBuilder); when(mockProducerBuilder.enableBatching(anyBoolean())).thenReturn(mockProducerBuilder); when(mockProducerBuilder.batchingMaxMessages(anyInt())).thenReturn(mockProducerBuilder); @@ -102,13 +111,15 @@ public MockPulsarClientService() { when(mockConsumerBuilder.priorityLevel(anyInt())).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.receiverQueueSize(anyInt())).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder); + + when(mockSchema.getType()).thenReturn(SchemaType.BYTES); try { when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer); when(mockConsumer.isConnected()).thenReturn(true); when(mockConsumer.receive()).thenReturn(mockMessage); - doAnswer(new Answer>() { - public Message answer(InvocationOnMock invocation) { + doAnswer(new Answer>() { + public Message answer(InvocationOnMock invocation) { return mockMessage; } }).when(mockConsumer).receive(0, TimeUnit.SECONDS); @@ -120,8 +131,8 @@ public Message answer(InvocationOnMock invocation) { } } - public void setMockMessage(Message msg) { - this.mockMessage = msg; + public void setMockMessage(Message mockMessage2) { + this.mockMessage = mockMessage2; // Configure the consumer behavior try { @@ -130,7 +141,7 @@ public void setMockMessage(Message msg) { e.printStackTrace(); } - CompletableFuture> future = CompletableFuture.supplyAsync(() -> { + CompletableFuture> future = CompletableFuture.supplyAsync(() -> { return mockMessage; }); @@ -179,7 +190,7 @@ private void defineDefaultProducerBehavior() { } } - public Consumer getMockConsumer() { + public Consumer getMockConsumer() { return mockConsumer; } @@ -187,7 +198,7 @@ public ProducerBuilder getMockProducerBuilder() { return mockProducerBuilder; } - public ConsumerBuilder getMockConsumerBuilder() { + public ConsumerBuilder getMockConsumerBuilder() { return mockConsumerBuilder; } @@ -204,4 +215,5 @@ public PulsarClient getPulsarClient() { public String getPulsarBrokerRootURL() { return "pulsar://mocked:6650"; } + } diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java index 00fafd6..c35f4fc 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java @@ -28,7 +28,6 @@ import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsar; import org.apache.nifi.processors.pulsar.pubsub.TestConsumePulsar; import org.apache.nifi.util.MockFlowFile; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.junit.Test; @@ -73,7 +72,7 @@ public void pulsarClientExceptionTest() throws PulsarClientException { @Test public void emptyMessageTest() throws PulsarClientException { when(mockClientService.getMockConsumer().receive(0, TimeUnit.SECONDS)).thenReturn(mockMessage).thenReturn(null); - when(mockMessage.getValue()).thenReturn("".getBytes()); + when(mockMessage.getData()).thenReturn("".getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.TOPICS, "foo"); @@ -104,27 +103,6 @@ public final void batchMessageTest() throws PulsarClientException { this.batchMessages("Mocked Message", "foo", "bar", false, 400); } - /* - * Verify that the consumer gets closed. - */ - @Test - public void onStoppedTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("Mocked Message".getBytes()); - mockClientService.setMockMessage(mockMessage); - - runner.setProperty(ConsumePulsar.TOPICS, "foo"); - runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar"); - runner.setProperty(ConsumePulsar.CONSUMER_BATCH_SIZE, 1 + ""); - runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive"); - runner.run(1, true); - runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS); - - runner.assertQueueEmpty(); - - // Verify that the consumer was closed - verify(mockClientService.getMockConsumer(), times(1)).close(); - } - @Test public void mappedAttributesTest() throws PulsarClientException { super.doMappedAttributesTest(); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsarRecord.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsarRecord.java index bc5d96d..1ab2d6c 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsarRecord.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsarRecord.java @@ -34,7 +34,7 @@ public class TestSyncConsumePulsarRecord extends TestConsumePulsarRecord { @Test public void emptyMessageTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("".getBytes()); + when(mockMessage.getData()).thenReturn("".getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.TOPICS, DEFAULT_TOPIC); @@ -49,7 +49,7 @@ public void emptyMessageTest() throws PulsarClientException { @Test public void singleMalformedMessageTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn(BAD_MSG.getBytes()); + when(mockMessage.getData()).thenReturn(BAD_MSG.getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.TOPICS, DEFAULT_TOPIC); @@ -107,7 +107,7 @@ public void singleMessageWithGoodAndBadRecordsTest() throws PulsarClientExceptio } } - when(mockMessage.getValue()).thenReturn(input.toString().getBytes()); + when(mockMessage.getData()).thenReturn(input.toString().getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.ASYNC_ENABLED, Boolean.toString(false)); diff --git a/pom.xml b/pom.xml index e3c5917..77d9581 100644 --- a/pom.xml +++ b/pom.xml @@ -24,11 +24,15 @@ 4.2 + 1.21 3.8.1 2.4 2.5.2 + 1.41.0 2.12.3 - 2.8.0 + 3.11.4 + 3.11.4 + 2.8.1 11