From fe2431f62fe66de8183330a7500575df32cb6907 Mon Sep 17 00:00:00 2001 From: david-streamlio Date: Thu, 21 Oct 2021 14:28:10 -0700 Subject: [PATCH 1/2] Added schema-awareness to consumers --- .../nifi/pulsar/PulsarClientService.java | 8 ++ ...Cache.java => PulsarConsumerLRUCache.java} | 6 +- .../cache/PulsarClientLRUCacheTest.java | 8 +- nifi-pulsar-client-service/pom.xml | 6 + .../pulsar/StandardPulsarClientService.java | 94 +++++++++++++-- .../TestStandardPulsarClientService.java | 3 +- nifi-pulsar-processors/pom.xml | 103 +++++++++++++--- .../AbstractPulsarConsumerProcessor.java | 34 ++++-- .../AbstractPulsarProducerProcessor.java | 10 +- .../pulsar/util/ConsumerBuilderFactory.java | 108 +++++++++++++++++ .../pubsub/mocks/MockPulsarClientService.java | 38 +++++- .../util/ConsumerBuilderFactoryTests.java | 111 ++++++++++++++++++ .../org/apache/nifi/testing/addressbook.proto | 30 +++++ pom.xml | 3 + 14 files changed, 503 insertions(+), 59 deletions(-) rename nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/{PulsarClientLRUCache.java => PulsarConsumerLRUCache.java} (88%) create mode 100644 nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactory.java create mode 100644 nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactoryTests.java create mode 100644 nifi-pulsar-processors/src/test/proto/org/apache/nifi/testing/addressbook.proto diff --git a/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java b/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java index 446364e..83cf563 100644 --- a/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java +++ b/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java @@ -19,7 +19,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.controller.ControllerService; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.schema.SchemaInfo; @Tags({"Pulsar", "client", "pool"}) @CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, " @@ -27,7 +29,13 @@ public interface PulsarClientService extends ControllerService { public PulsarClient getPulsarClient(); + + public PulsarAdmin getPulsarAdmin(); public String getPulsarBrokerRootURL(); + + public SchemaInfo getTopicSchema(String[] topicNames); + + public SchemaInfo getTopicSchemaByRegex(String regex); } diff --git a/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCache.java b/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarConsumerLRUCache.java similarity index 88% rename from nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCache.java rename to nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarConsumerLRUCache.java index 446cb88..644554f 100644 --- a/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCache.java +++ b/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/PulsarConsumerLRUCache.java @@ -21,17 +21,17 @@ import org.apache.commons.collections4.map.LRUMap; -public class PulsarClientLRUCache extends LRUMap { +public class PulsarConsumerLRUCache extends LRUMap { private static final long serialVersionUID = 730163138087670453L; private final static float LOAD_FACTOR = 0.75F; private final static boolean SCAN_UNTIL_REMOVABLE = false; - public PulsarClientLRUCache(int maxSize) { + public PulsarConsumerLRUCache(int maxSize) { this(maxSize, LOAD_FACTOR, SCAN_UNTIL_REMOVABLE); } - public PulsarClientLRUCache(int maxSize, float loadFactor, boolean scanUntilRemovable) { + public PulsarConsumerLRUCache(int maxSize, float loadFactor, boolean scanUntilRemovable) { super(maxSize, loadFactor, scanUntilRemovable); } diff --git a/nifi-pulsar-client-service-api/src/test/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCacheTest.java b/nifi-pulsar-client-service-api/src/test/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCacheTest.java index 42abe7d..9274898 100644 --- a/nifi-pulsar-client-service-api/src/test/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCacheTest.java +++ b/nifi-pulsar-client-service-api/src/test/java/org/apache/nifi/pulsar/cache/PulsarClientLRUCacheTest.java @@ -44,7 +44,7 @@ public void setUp() throws InterruptedException { */ @Test public void simpleTest() { - PulsarClientLRUCache cache = new PulsarClientLRUCache(10); + PulsarConsumerLRUCache cache = new PulsarConsumerLRUCache(10); for (Character i='A'; i<='E'; i++){ cache.put(i.toString(), mockedPulsarProducer); @@ -60,7 +60,7 @@ public void simpleTest() { @Test public void evictionTest() { - PulsarClientLRUCache cache = new PulsarClientLRUCache(5); + PulsarConsumerLRUCache cache = new PulsarConsumerLRUCache(5); for (Character i='A'; i<='Z'; i++){ cache.put(i.toString(), mockedPulsarProducer); @@ -78,7 +78,7 @@ public void evictionTest() { @Test public void evictionLruTest() { - PulsarClientLRUCache cache = new PulsarClientLRUCache(5); + PulsarConsumerLRUCache cache = new PulsarConsumerLRUCache(5); final Character A = 'A'; @@ -102,7 +102,7 @@ public void evictionLruTest() { @Test public void clearTest() throws PulsarClientException { - PulsarClientLRUCache cache = new PulsarClientLRUCache(26); + PulsarConsumerLRUCache cache = new PulsarConsumerLRUCache(26); for (Character i='A'; i<='Z'; i++) { cache.put(i.toString(), mockedPulsarProducer); diff --git a/nifi-pulsar-client-service/pom.xml b/nifi-pulsar-client-service/pom.xml index 476bb4b..4fb5b0a 100644 --- a/nifi-pulsar-client-service/pom.xml +++ b/nifi-pulsar-client-service/pom.xml @@ -27,6 +27,12 @@ + + org.apache.pulsar + pulsar-client-admin + ${pulsar.version} + + org.apache.nifi nifi-pulsar-client-service-api diff --git a/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java b/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java index a2e03fa..38c1eb0 100644 --- a/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java +++ b/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java @@ -36,10 +36,15 @@ import org.apache.nifi.pulsar.auth.PulsarClientAuthenticationService; import org.apache.nifi.pulsar.validator.PulsarBrokerUrlValidator; import org.apache.nifi.reporting.InitializationException; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse; +import org.apache.pulsar.common.schema.SchemaInfo; @Tags({"Pulsar", "client", "pool"}) @CapabilityDescription("Standard implementation of the PulsarClientService. " @@ -53,9 +58,7 @@ public class StandardPulsarClientService extends AbstractControllerService imple .defaultValue("false") .description("") .displayName("Allow TLS Insecure Connection") - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(false) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); public static final PropertyDescriptor AUTHENTICATION_SERVICE = new PropertyDescriptor.Builder() @@ -96,9 +99,7 @@ public class StandardPulsarClientService extends AbstractControllerService imple + "It validates incoming x509 certificate and matches provided hostname(CN/SAN) with expected " + "broker's host name. It follows RFC 2818, 3.1. Server Identity hostname verification.") .displayName("Enable TLS Hostname Verification") - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(false) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() @@ -173,6 +174,15 @@ public class StandardPulsarClientService extends AbstractControllerService imple .addValidator(new PulsarBrokerUrlValidator()) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); + + public static final PropertyDescriptor PULSAR_SERVICE_HTTP_URL = new PropertyDescriptor.Builder() + .name("PULSAR_SERVICE_HTTP_URL") + .displayName("Pulsar Service HTTP URL") + .description("URL for the administrative endpoint of the Pulsar cluster, e.g. pulsar://localhost:8080") + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); public static final PropertyDescriptor STATS_INTERVAL = new PropertyDescriptor.Builder() .name("STATS_INTERVAL") @@ -199,11 +209,13 @@ public class StandardPulsarClientService extends AbstractControllerService imple private static List properties; private volatile PulsarClient client; + private volatile PulsarAdmin admin; private String brokerUrl; static { final List props = new ArrayList<>(); props.add(PULSAR_SERVICE_URL); + props.add(PULSAR_SERVICE_HTTP_URL); props.add(AUTHENTICATION_SERVICE); props.add(CONCURRENT_LOOKUP_REQUESTS); props.add(CONNECTIONS_PER_BROKER); @@ -233,10 +245,11 @@ protected List getSupportedPropertyDescriptors() { @OnEnabled public void onEnabled(final ConfigurationContext context) throws InitializationException, UnsupportedAuthenticationException { try { - client = getClientBuilder(context).build(); + client = getClient(context); + admin = getAdmin(context); brokerUrl = context.getProperty(PULSAR_SERVICE_URL).evaluateAttributeExpressions().getValue(); } catch (Exception e) { - throw new InitializationException("Unable to create Pulsar Client", e); + throw new InitializationException("Unable to connect to the Pulsar cluster ", e); } } @@ -246,6 +259,10 @@ public void cleanup() throws PulsarClientException { if (client != null) { client.close(); } + + if (admin != null) { + admin.close(); + } } @Override @@ -253,16 +270,51 @@ public PulsarClient getPulsarClient() { return client; } + @Override + public PulsarAdmin getPulsarAdmin() { + return admin; + } + @Override public String getPulsarBrokerRootURL() { return brokerUrl; } + + public SchemaInfo getTopicSchema(String[] topicNames) { + if (topicNames == null || topicNames.length < 1) { + return null; + } + + SchemaInfo info = null; + + try { + for (String topic : topicNames) { + if (info == null) { + info = getPulsarAdmin().schemas().getSchemaInfo(topic); + } else { + IsCompatibilityResponse resp = getPulsarAdmin().schemas().testCompatibility(topic, info); + if (!resp.isCompatibility()) { + // The specified topics have incompatible schema types. + return null; + } + } + } + + } catch (final PulsarAdminException paEx) { + getLogger().error("Unable to retrieve topic schema information", paEx); + } + return info; + } + + public SchemaInfo getTopicSchemaByRegex(String regex) { + return null; + } - private ClientBuilder getClientBuilder(ConfigurationContext context) throws UnsupportedAuthenticationException, MalformedURLException { + private PulsarClient getClient(ConfigurationContext context) throws MalformedURLException, PulsarClientException { ClientBuilder builder = PulsarClient.builder() - .allowTlsInsecureConnection(context.getProperty(ALLOW_TLS_INSECURE_CONNECTION).evaluateAttributeExpressions().asBoolean()) - .enableTlsHostnameVerification(context.getProperty(ENABLE_TLS_HOSTNAME_VERIFICATION).evaluateAttributeExpressions().asBoolean()) + .allowTlsInsecureConnection(context.getProperty(ALLOW_TLS_INSECURE_CONNECTION).asBoolean()) + .enableTlsHostnameVerification(context.getProperty(ENABLE_TLS_HOSTNAME_VERIFICATION).asBoolean()) .maxConcurrentLookupRequests(context.getProperty(CONCURRENT_LOOKUP_REQUESTS).evaluateAttributeExpressions().asInteger()) .connectionsPerBroker(context.getProperty(CONNECTIONS_PER_BROKER).evaluateAttributeExpressions().asInteger()) .ioThreads(context.getProperty(IO_THREADS).evaluateAttributeExpressions().asInteger()) @@ -288,7 +340,29 @@ private ClientBuilder getClientBuilder(ConfigurationContext context) throws Unsu } builder = builder.serviceUrl(context.getProperty(PULSAR_SERVICE_URL).evaluateAttributeExpressions().getValue()); - return builder; + return builder.build(); + } + + private PulsarAdmin getAdmin(ConfigurationContext context) throws PulsarClientException { + PulsarAdminBuilder builder = PulsarAdmin.builder() + .allowTlsInsecureConnection(context.getProperty(ALLOW_TLS_INSECURE_CONNECTION).asBoolean()) + .enableTlsHostnameVerification(context.getProperty(ENABLE_TLS_HOSTNAME_VERIFICATION).asBoolean()) + .serviceHttpUrl(context.getProperty(PULSAR_SERVICE_HTTP_URL).evaluateAttributeExpressions().getValue()); + + // Configure Authentication + final PulsarClientAuthenticationService authenticationService = + context.getProperty(AUTHENTICATION_SERVICE) + .asControllerService(PulsarClientAuthenticationService.class); + + if (authenticationService != null) { + builder = builder.authentication(authenticationService.getAuthentication()); + + if (StringUtils.isNotBlank(authenticationService.getTlsTrustCertsFilePath())) { + builder = builder.tlsTrustCertsFilePath(authenticationService.getTlsTrustCertsFilePath()); + } + } + + return builder.build(); } } diff --git a/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java b/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java index f9c4bd9..e9aaab2 100644 --- a/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java +++ b/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java @@ -24,12 +24,13 @@ public class TestStandardPulsarClientService { @Test - public void testService() throws InitializationException { + public void validServiceTest() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); final PulsarClientService service = new StandardPulsarClientService(); runner.addControllerService("test-good", service); runner.setProperty(service, StandardPulsarClientService.PULSAR_SERVICE_URL, "pulsar://localhost:6650"); + runner.setProperty(service, StandardPulsarClientService.PULSAR_SERVICE_HTTP_URL, "http://localhost:8080"); runner.enableControllerService(service); runner.assertValid(service); diff --git a/nifi-pulsar-processors/pom.xml b/nifi-pulsar-processors/pom.xml index 3fb9fa8..a89f5d9 100644 --- a/nifi-pulsar-processors/pom.xml +++ b/nifi-pulsar-processors/pom.xml @@ -24,6 +24,67 @@ nifi-pulsar-processors jar + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + + + + compile + compile-custom + test-compile + test-compile-custom + + + + + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + + initialize + + detect + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.2.0 + + + add-classes + generate-test-sources + + add-test-source + + + + target/generated-test-sources/protobuf/java + + + + + + + + @@ -59,6 +120,30 @@ provided + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + + org.apache.directory.studio + org.apache.commons.io + ${commons.io.version} + + + + com.fasterxml.jackson.core + jackson-core + ${jackson-core.version} + + + + com.google.protobuf + protobuf-java + ${protobuf3.version} + + org.apache.nifi nifi-mock @@ -77,24 +162,6 @@ test - - org.apache.commons - commons-lang3 - ${commons-lang3.version} - - - - org.apache.directory.studio - org.apache.commons.io - ${commons.io.version} - - - - com.fasterxml.jackson.core - jackson-core - ${jackson-core.version} - - diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java index 91ac5d6..c43262b 100644 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java @@ -47,8 +47,9 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.util.ConsumerBuilderFactory; import org.apache.nifi.pulsar.PulsarClientService; -import org.apache.nifi.pulsar.cache.PulsarClientLRUCache; +import org.apache.nifi.pulsar.cache.PulsarConsumerLRUCache; import org.apache.nifi.util.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -263,7 +264,8 @@ public abstract class AbstractPulsarConsumerProcessor extends AbstractProcess } private PulsarClientService pulsarClientService; - private PulsarClientLRUCache> consumers; + private ConsumerBuilderFactory consumerFactory; + private PulsarConsumerLRUCache> consumers; private ExecutorService consumerPool; private ExecutorCompletionService>> consumerService; private ExecutorService ackPool; @@ -311,6 +313,7 @@ public void init(ProcessContext context) { } setPulsarClientService(context.getProperty(PULSAR_CLIENT_SERVICE).asControllerService(PulsarClientService.class)); + consumerFactory = new ConsumerBuilderFactory(getPulsarClientService().getPulsarClient()); } @OnUnscheduled @@ -411,15 +414,22 @@ protected synchronized Consumer getConsumer(ProcessContext context, String to return (consumer != null && consumer.isConnected()) ? consumer : null; } - protected synchronized ConsumerBuilder getConsumerBuilder(ProcessContext context) throws PulsarClientException { - - ConsumerBuilder builder = (ConsumerBuilder) getPulsarClientService().getPulsarClient().newConsumer(); + @SuppressWarnings("unchecked") + protected synchronized ConsumerBuilder getConsumerBuilder(ProcessContext context) throws PulsarClientException { + + @SuppressWarnings("rawtypes") + ConsumerBuilder builder = null; if (context.getProperty(TOPICS).isSet()) { - builder = builder.topic(Arrays.stream(context.getProperty(TOPICS).evaluateAttributeExpressions().getValue().split("[, ]")) - .map(String::trim).toArray(String[]::new)); + String[] topics = Arrays.stream(context.getProperty(TOPICS).evaluateAttributeExpressions().getValue().split("[, ]")) + .map(String::trim).toArray(String[]::new); + + builder = this.consumerFactory.build(getPulsarClientService().getTopicSchema(topics)); + builder = builder.topic(topics); } else if (context.getProperty(TOPICS_PATTERN).isSet()) { - builder = builder.topicsPattern(context.getProperty(TOPICS_PATTERN).getValue()); + String topicsPattern = context.getProperty(TOPICS_PATTERN).getValue(); + builder = consumerFactory.build(getPulsarClientService().getTopicSchemaByRegex(topicsPattern)); + builder = builder.topicsPattern(topicsPattern); } if (context.getProperty(CONSUMER_NAME).isSet()) { @@ -433,7 +443,7 @@ protected synchronized ConsumerBuilder getConsumerBuilder(ProcessContext cont .subscriptionType(SubscriptionType.valueOf(context.getProperty(SUBSCRIPTION_TYPE).getValue())); } - protected synchronized ExecutorService getConsumerPool() { + protected synchronized ExecutorService getConsumerPool() { return consumerPool; } @@ -473,14 +483,14 @@ protected synchronized void setPulsarClientService(PulsarClientService pulsarCli this.pulsarClientService = pulsarClientService; } - protected synchronized PulsarClientLRUCache> getConsumers() { + protected synchronized PulsarConsumerLRUCache> getConsumers() { if (consumers == null) { - consumers = new PulsarClientLRUCache>(20); + consumers = new PulsarConsumerLRUCache>(20); } return consumers; } - protected void setConsumers(PulsarClientLRUCache> consumers) { + protected void setConsumers(PulsarConsumerLRUCache> consumers) { this.consumers = consumers; } diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java index 05ed3eb..c7e3209 100644 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java @@ -48,7 +48,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.pulsar.PulsarClientService; -import org.apache.nifi.pulsar.cache.PulsarClientLRUCache; +import org.apache.nifi.pulsar.cache.PulsarConsumerLRUCache; import org.apache.nifi.util.StringUtils; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.MessageId; @@ -271,7 +271,7 @@ protected List getSupportedPropertyDescriptors() { } private PulsarClientService pulsarClientService; - private PulsarClientLRUCache> producers; + private PulsarConsumerLRUCache> producers; private ExecutorService publisherPool; // Used to sync between onTrigger method and shutdown code block. @@ -441,14 +441,14 @@ protected synchronized void setPulsarClientService(PulsarClientService pulsarCli this.pulsarClientService = pulsarClientService; } - protected synchronized PulsarClientLRUCache> getProducers() { + protected synchronized PulsarConsumerLRUCache> getProducers() { if (producers == null) { - producers = new PulsarClientLRUCache>(20); + producers = new PulsarConsumerLRUCache>(20); } return producers; } - protected synchronized void setProducers(PulsarClientLRUCache> producers) { + protected synchronized void setProducers(PulsarConsumerLRUCache> producers) { this.producers = producers; } diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactory.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactory.java new file mode 100644 index 0000000..42ee1d9 --- /dev/null +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactory.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.util; + +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.ProtobufSchema; +import org.apache.pulsar.common.schema.SchemaInfo; + +/** + * Creates a schema-aware consumer based on the SchemaInfo + * provided. This allows us to consume from existing Pulsar + * topics that have an associated schema. + * + * Currently, it only supports Primitive schema types. + */ +public class ConsumerBuilderFactory { + + private PulsarClient client; + + public ConsumerBuilderFactory(PulsarClient c) { + this.client = c; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public ConsumerBuilder build(SchemaInfo info) { + + switch (info.getType()) { + + case AVRO : + return client.newConsumer( + AvroSchema.of( + SchemaDefinition.builder() + .withJsonDef(new String(info.getSchema())) + .build()) + ); + + case JSON : + return client.newConsumer( + JSONSchema.of( + SchemaDefinition.builder() + .withJsonDef(new String(info.getSchema())) + .build()) + ); + + case PROTOBUF : + return client.newConsumer( + ProtobufSchema.of( + SchemaDefinition.builder() + .withJsonDef(new String(info.getSchema())) + .build()) + ); + + case BOOLEAN : return client.newConsumer(Schema.BOOL); + + case DATE : return client.newConsumer(Schema.DATE); + + case DOUBLE : return client.newConsumer(Schema.DOUBLE); + + case FLOAT : return client.newConsumer(Schema.FLOAT); + + case INSTANT : return client.newConsumer(Schema.INSTANT); + + case INT8 : return client.newConsumer(Schema.INT8); + + case INT16 : return client.newConsumer(Schema.INT16); + + case INT32 : return client.newConsumer(Schema.INT32); + + case INT64 : return client.newConsumer(Schema.INT64); + + case LOCAL_DATE : return client.newConsumer(Schema.LOCAL_DATE); + + case LOCAL_DATE_TIME : return client.newConsumer(Schema.LOCAL_DATE_TIME); + + case LOCAL_TIME : return client.newConsumer(Schema.LOCAL_TIME); + + case STRING : return client.newConsumer(Schema.STRING); + + case TIME : return client.newConsumer(Schema.TIME); + + case TIMESTAMP : return client.newConsumer(Schema.TIMESTAMP); + + case BYTES : + default: return client.newConsumer(Schema.BYTES); + + } + + } +} diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java index b7f9452..9dffb8f 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java @@ -25,6 +25,7 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -36,15 +37,15 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.mockito.ArgumentMatcher; -import org.mockito.Matchers; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.mockito.stubbing.OngoingStubbing; - import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; @@ -52,12 +53,16 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; +@SuppressWarnings("unchecked") public class MockPulsarClientService extends AbstractControllerService implements PulsarClientService { @Mock PulsarClient mockClient = mock(PulsarClient.class); - + @Mock + PulsarAdmin mockAdmin = mock(PulsarAdmin.class); + + @Mock ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class); @Mock @@ -74,6 +79,9 @@ public class MockPulsarClientService extends AbstractControllerService implem @Mock protected Message mockMessage; + + @Mock + SchemaInfo mockSchema = mock(SchemaInfo.class); @Mock MessageId mockMessageId = mock(MessageId.class); @@ -83,7 +91,8 @@ public class MockPulsarClientService extends AbstractControllerService implem public MockPulsarClientService() { when(mockClient.newProducer()).thenReturn((ProducerBuilder) mockProducerBuilder); when(mockClient.newConsumer()).thenReturn((ConsumerBuilder) mockConsumerBuilder); - + when(mockClient.newConsumer(any(Schema.class))).thenReturn((ConsumerBuilder) mockConsumerBuilder); + when(mockProducerBuilder.topic(anyString())).thenReturn(mockProducerBuilder); when(mockProducerBuilder.enableBatching(anyBoolean())).thenReturn(mockProducerBuilder); when(mockProducerBuilder.batchingMaxMessages(anyInt())).thenReturn(mockProducerBuilder); @@ -102,6 +111,8 @@ public MockPulsarClientService() { when(mockConsumerBuilder.priorityLevel(anyInt())).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.receiverQueueSize(anyInt())).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder); + + when(mockSchema.getType()).thenReturn(SchemaType.BYTES); try { when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer); @@ -199,9 +210,24 @@ public TypedMessageBuilder getMockTypedMessageBuilder() { public PulsarClient getPulsarClient() { return mockClient; } + + @Override + public PulsarAdmin getPulsarAdmin() { + return mockAdmin; + } @Override public String getPulsarBrokerRootURL() { return "pulsar://mocked:6650"; } + + @Override + public SchemaInfo getTopicSchema(String[] topicNames) { + return mockSchema; + } + + @Override + public SchemaInfo getTopicSchemaByRegex(String regex) { + return mockSchema; + } } diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactoryTests.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactoryTests.java new file mode 100644 index 0000000..b2a4030 --- /dev/null +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactoryTests.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; + +import org.apache.nifi.testing.*; + +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ConsumerBuilderImpl; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.ProtobufSchema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import com.google.protobuf.GeneratedMessageV3; + +@SuppressWarnings("rawtypes") +public class ConsumerBuilderFactoryTests { + + private PulsarClient pulsarClient; + + @Mock + private SchemaInfo mockSchema = mock(SchemaInfo.class); + + private ConsumerBuilderFactory factory; + + @Before + public final void init() throws PulsarClientException { + pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); + factory = new ConsumerBuilderFactory(pulsarClient); + } + + @Test + public final void booleanSchemaTest() { + when(mockSchema.getType()).thenReturn(SchemaType.BOOLEAN); + ConsumerBuilderImpl impl = (ConsumerBuilderImpl)factory.build(mockSchema); + assertEquals(Schema.BOOL, impl.getSchema()); + } + + @Test + public final void dateSchemaTest() { + when(mockSchema.getType()).thenReturn(SchemaType.DATE); + ConsumerBuilderImpl impl = (ConsumerBuilderImpl)factory.build(mockSchema); + assertEquals(Schema.DATE, impl.getSchema()); + } + + @Test + public final void avroSchemaTest() { + String def = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.nifi.processors.pulsar.util.ConsumerBuilderFactoryTests\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}]}"; + + AvroSchema userSchema = AvroSchema.of(User.class); + when(mockSchema.getType()).thenReturn(userSchema.getSchemaInfo().getType()); + when(mockSchema.getSchema()).thenReturn(userSchema.getSchemaInfo().getSchema()); + + ConsumerBuilderImpl impl = (ConsumerBuilderImpl)factory.build(mockSchema); + assertEquals(def, impl.getSchema().getSchemaInfo().getSchemaDefinition()); + } + + @Test + public final void jsonSchemaTest() { + JSONSchema userSchema = JSONSchema.of(User.class); + + when(mockSchema.getType()).thenReturn(userSchema.getSchemaInfo().getType()); + when(mockSchema.getSchema()).thenReturn(userSchema.getSchemaInfo().getSchema()); + + ConsumerBuilderImpl impl = (ConsumerBuilderImpl)factory.build(mockSchema); + assertEquals("", impl.getSchema().getSchemaInfo().getSchemaDefinition()); + } + + @Test + public final void protobufTest() { + ProtobufSchema userSchema = + ProtobufSchema.of(Person.class, new HashMap()); + + when(mockSchema.getType()).thenReturn(userSchema.getSchemaInfo().getType()); + when(mockSchema.getSchema()).thenReturn(userSchema.getSchemaInfo().getSchema()); + + ConsumerBuilderImpl impl = (ConsumerBuilderImpl)factory.build(mockSchema); + assertEquals("", impl.getSchema().getSchemaInfo().getSchemaDefinition()); + } + + private static final class User { + private String name; + private int age; + } +} diff --git a/nifi-pulsar-processors/src/test/proto/org/apache/nifi/testing/addressbook.proto b/nifi-pulsar-processors/src/test/proto/org/apache/nifi/testing/addressbook.proto new file mode 100644 index 0000000..9f4dd0a --- /dev/null +++ b/nifi-pulsar-processors/src/test/proto/org/apache/nifi/testing/addressbook.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package tutorial; + +option java_multiple_files = true; +option java_package = "org.apache.nifi.testing"; +option java_outer_classname = "AddressBookProtos"; + +message Person { + string name = 1; + int32 id = 2; + string email = 3; + + enum PhoneType { + MOBILE = 0; + HOME = 1; + WORK = 2; + } + + message PhoneNumber { + string number = 1; + PhoneType type = 2; + } + + repeated PhoneNumber phones = 4; +} + +message AddressBook { + repeated Person people = 1; +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index e3c5917..0a08873 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,10 @@ 3.8.1 2.4 2.5.2 + 1.41.0 2.12.3 + 3.11.4 + 3.11.4 2.8.0 11 From 5dc459aad4ca8e4b07a5e7c83fab47b0d6160b54 Mon Sep 17 00:00:00 2001 From: david-streamlio Date: Tue, 26 Oct 2021 18:30:36 -0700 Subject: [PATCH 2/2] Changed Consumer to use GenericRecord rather than byte[] Schema --- docker/Dockerfile | 2 +- nifi-pulsar-client-service-api/pom.xml | 2 +- .../nifi/pulsar/PulsarClientService.java | 8 -- nifi-pulsar-client-service/pom.xml | 7 +- .../pulsar/StandardPulsarClientService.java | 78 ------------ .../validator/PulsarBrokerUrlValidator.java | 2 +- .../TestStandardPulsarClientService.java | 1 - nifi-pulsar-processors/pom.xml | 68 +---------- .../AbstractPulsarConsumerProcessor.java | 45 ++++--- .../pulsar/pubsub/ConsumePulsar.java | 45 +++---- .../pulsar/pubsub/ConsumePulsarRecord.java | 75 ++++++------ .../pulsar/util/ConsumerBuilderFactory.java | 108 ----------------- .../pulsar/pubsub/TestConsumePulsar.java | 63 ++++------ .../pubsub/TestConsumePulsarRecord.java | 29 +++-- .../pubsub/async/TestAsyncConsumePulsar.java | 21 +--- .../async/TestAsyncConsumePulsarRecord.java | 7 +- .../pubsub/mocks/MockPulsarClientService.java | 40 ++----- .../pubsub/sync/TestSyncConsumePulsar.java | 24 +--- .../sync/TestSyncConsumePulsarRecord.java | 6 +- .../util/ConsumerBuilderFactoryTests.java | 111 ------------------ .../org/apache/nifi/testing/addressbook.proto | 30 ----- pom.xml | 3 +- 22 files changed, 172 insertions(+), 603 deletions(-) delete mode 100644 nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactory.java delete mode 100644 nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactoryTests.java delete mode 100644 nifi-pulsar-processors/src/test/proto/org/apache/nifi/testing/addressbook.proto diff --git a/docker/Dockerfile b/docker/Dockerfile index 323a35f..8d0dd43 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -2,4 +2,4 @@ FROM apache/nifi:1.14.0 COPY --chown=nifi:nifi lib/*.nar /opt/nifi/nifi-current/lib/ -COPY --chown=nifi:nifi creds/*.json /tmp \ No newline at end of file +COPY --chown=nifi:nifi creds/*.json /tmp diff --git a/nifi-pulsar-client-service-api/pom.xml b/nifi-pulsar-client-service-api/pom.xml index ce3e8d6..d2f0e00 100644 --- a/nifi-pulsar-client-service-api/pom.xml +++ b/nifi-pulsar-client-service-api/pom.xml @@ -35,7 +35,7 @@ org.apache.pulsar - pulsar-client-original + pulsar-client ${pulsar.version} diff --git a/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java b/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java index 83cf563..9561e21 100644 --- a/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java +++ b/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientService.java @@ -19,9 +19,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.controller.ControllerService; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.common.schema.SchemaInfo; @Tags({"Pulsar", "client", "pool"}) @CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, " @@ -29,13 +27,7 @@ public interface PulsarClientService extends ControllerService { public PulsarClient getPulsarClient(); - - public PulsarAdmin getPulsarAdmin(); public String getPulsarBrokerRootURL(); - public SchemaInfo getTopicSchema(String[] topicNames); - - public SchemaInfo getTopicSchemaByRegex(String regex); - } diff --git a/nifi-pulsar-client-service/pom.xml b/nifi-pulsar-client-service/pom.xml index 4fb5b0a..cb53d2a 100644 --- a/nifi-pulsar-client-service/pom.xml +++ b/nifi-pulsar-client-service/pom.xml @@ -27,6 +27,12 @@ + + org.apache.pulsar + pulsar-client + ${pulsar.version} + + org.apache.pulsar pulsar-client-admin @@ -37,7 +43,6 @@ org.apache.nifi nifi-pulsar-client-service-api ${version} - provided diff --git a/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java b/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java index 38c1eb0..e69b76f 100644 --- a/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java +++ b/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java @@ -36,15 +36,10 @@ import org.apache.nifi.pulsar.auth.PulsarClientAuthenticationService; import org.apache.nifi.pulsar.validator.PulsarBrokerUrlValidator; import org.apache.nifi.reporting.InitializationException; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminBuilder; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; -import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse; -import org.apache.pulsar.common.schema.SchemaInfo; @Tags({"Pulsar", "client", "pool"}) @CapabilityDescription("Standard implementation of the PulsarClientService. " @@ -174,15 +169,6 @@ public class StandardPulsarClientService extends AbstractControllerService imple .addValidator(new PulsarBrokerUrlValidator()) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); - - public static final PropertyDescriptor PULSAR_SERVICE_HTTP_URL = new PropertyDescriptor.Builder() - .name("PULSAR_SERVICE_HTTP_URL") - .displayName("Pulsar Service HTTP URL") - .description("URL for the administrative endpoint of the Pulsar cluster, e.g. pulsar://localhost:8080") - .required(true) - .addValidator(StandardValidators.URL_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); public static final PropertyDescriptor STATS_INTERVAL = new PropertyDescriptor.Builder() .name("STATS_INTERVAL") @@ -209,13 +195,11 @@ public class StandardPulsarClientService extends AbstractControllerService imple private static List properties; private volatile PulsarClient client; - private volatile PulsarAdmin admin; private String brokerUrl; static { final List props = new ArrayList<>(); props.add(PULSAR_SERVICE_URL); - props.add(PULSAR_SERVICE_HTTP_URL); props.add(AUTHENTICATION_SERVICE); props.add(CONCURRENT_LOOKUP_REQUESTS); props.add(CONNECTIONS_PER_BROKER); @@ -246,7 +230,6 @@ protected List getSupportedPropertyDescriptors() { public void onEnabled(final ConfigurationContext context) throws InitializationException, UnsupportedAuthenticationException { try { client = getClient(context); - admin = getAdmin(context); brokerUrl = context.getProperty(PULSAR_SERVICE_URL).evaluateAttributeExpressions().getValue(); } catch (Exception e) { throw new InitializationException("Unable to connect to the Pulsar cluster ", e); @@ -259,10 +242,6 @@ public void cleanup() throws PulsarClientException { if (client != null) { client.close(); } - - if (admin != null) { - admin.close(); - } } @Override @@ -270,45 +249,10 @@ public PulsarClient getPulsarClient() { return client; } - @Override - public PulsarAdmin getPulsarAdmin() { - return admin; - } - @Override public String getPulsarBrokerRootURL() { return brokerUrl; } - - public SchemaInfo getTopicSchema(String[] topicNames) { - if (topicNames == null || topicNames.length < 1) { - return null; - } - - SchemaInfo info = null; - - try { - for (String topic : topicNames) { - if (info == null) { - info = getPulsarAdmin().schemas().getSchemaInfo(topic); - } else { - IsCompatibilityResponse resp = getPulsarAdmin().schemas().testCompatibility(topic, info); - if (!resp.isCompatibility()) { - // The specified topics have incompatible schema types. - return null; - } - } - } - - } catch (final PulsarAdminException paEx) { - getLogger().error("Unable to retrieve topic schema information", paEx); - } - return info; - } - - public SchemaInfo getTopicSchemaByRegex(String regex) { - return null; - } private PulsarClient getClient(ConfigurationContext context) throws MalformedURLException, PulsarClientException { @@ -342,27 +286,5 @@ private PulsarClient getClient(ConfigurationContext context) throws MalformedURL builder = builder.serviceUrl(context.getProperty(PULSAR_SERVICE_URL).evaluateAttributeExpressions().getValue()); return builder.build(); } - - private PulsarAdmin getAdmin(ConfigurationContext context) throws PulsarClientException { - PulsarAdminBuilder builder = PulsarAdmin.builder() - .allowTlsInsecureConnection(context.getProperty(ALLOW_TLS_INSECURE_CONNECTION).asBoolean()) - .enableTlsHostnameVerification(context.getProperty(ENABLE_TLS_HOSTNAME_VERIFICATION).asBoolean()) - .serviceHttpUrl(context.getProperty(PULSAR_SERVICE_HTTP_URL).evaluateAttributeExpressions().getValue()); - - // Configure Authentication - final PulsarClientAuthenticationService authenticationService = - context.getProperty(AUTHENTICATION_SERVICE) - .asControllerService(PulsarClientAuthenticationService.class); - - if (authenticationService != null) { - builder = builder.authentication(authenticationService.getAuthentication()); - - if (StringUtils.isNotBlank(authenticationService.getTlsTrustCertsFilePath())) { - builder = builder.tlsTrustCertsFilePath(authenticationService.getTlsTrustCertsFilePath()); - } - } - - return builder.build(); - } } diff --git a/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/validator/PulsarBrokerUrlValidator.java b/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/validator/PulsarBrokerUrlValidator.java index 8f11ed2..b0d21c5 100644 --- a/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/validator/PulsarBrokerUrlValidator.java +++ b/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/validator/PulsarBrokerUrlValidator.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.pulsar.validator; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; diff --git a/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java b/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java index e9aaab2..1efb941 100644 --- a/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java +++ b/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java @@ -30,7 +30,6 @@ public void validServiceTest() throws InitializationException { runner.addControllerService("test-good", service); runner.setProperty(service, StandardPulsarClientService.PULSAR_SERVICE_URL, "pulsar://localhost:6650"); - runner.setProperty(service, StandardPulsarClientService.PULSAR_SERVICE_HTTP_URL, "http://localhost:8080"); runner.enableControllerService(service); runner.assertValid(service); diff --git a/nifi-pulsar-processors/pom.xml b/nifi-pulsar-processors/pom.xml index a89f5d9..af044c1 100644 --- a/nifi-pulsar-processors/pom.xml +++ b/nifi-pulsar-processors/pom.xml @@ -25,67 +25,6 @@ nifi-pulsar-processors jar - - - - org.xolstice.maven.plugins - protobuf-maven-plugin - 0.6.1 - - com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} - grpc-java - io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} - - - - - compile - compile-custom - test-compile - test-compile-custom - - - - - - - kr.motd.maven - os-maven-plugin - 1.6.2 - - - initialize - - detect - - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 3.2.0 - - - add-classes - generate-test-sources - - add-test-source - - - - target/generated-test-sources/protobuf/java - - - - - - - - - org.apache.nifi @@ -117,7 +56,6 @@ org.apache.nifi nifi-pulsar-client-service-api ${version} - provided @@ -126,6 +64,12 @@ ${commons-lang3.version} + + org.apache.commons + commons-compress + ${commons-compress.version} + + org.apache.directory.studio org.apache.commons.io diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java index c43262b..93638d4 100644 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java @@ -47,7 +47,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.pulsar.util.ConsumerBuilderFactory; +import org.apache.nifi.processors.pulsar.PropertyMappingUtils; import org.apache.nifi.pulsar.PulsarClientService; import org.apache.nifi.pulsar.cache.PulsarConsumerLRUCache; import org.apache.nifi.util.StringUtils; @@ -56,7 +56,9 @@ import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.schema.GenericRecord; public abstract class AbstractPulsarConsumerProcessor extends AbstractProcessor { protected static final String PULSAR_MESSAGE_KEY = "__KEY__"; @@ -264,10 +266,9 @@ public abstract class AbstractPulsarConsumerProcessor extends AbstractProcess } private PulsarClientService pulsarClientService; - private ConsumerBuilderFactory consumerFactory; - private PulsarConsumerLRUCache> consumers; + private PulsarConsumerLRUCache> consumers; private ExecutorService consumerPool; - private ExecutorCompletionService>> consumerService; + private ExecutorCompletionService>> consumerService; private ExecutorService ackPool; private ExecutorCompletionService ackService; @@ -313,7 +314,6 @@ public void init(ProcessContext context) { } setPulsarClientService(context.getProperty(PULSAR_CLIENT_SERVICE).asControllerService(PulsarClientService.class)); - consumerFactory = new ConsumerBuilderFactory(getPulsarClientService().getPulsarClient()); } @OnUnscheduled @@ -370,14 +370,15 @@ protected String getConsumerId(final ProcessContext context, FlowFile flowFile) return sb.toString(); } - protected void consumeAsync(final Consumer consumer, ProcessContext context, ProcessSession session) throws PulsarClientException { + protected void consumeAsync(final Consumer consumer, + ProcessContext context, ProcessSession session) throws PulsarClientException { try { final int maxMessages = context.getProperty(CONSUMER_BATCH_SIZE).isSet() ? context.getProperty(CONSUMER_BATCH_SIZE) .evaluateAttributeExpressions().asInteger() : Integer.MAX_VALUE; getConsumerService().submit(() -> { - List> messages = new LinkedList>(); - Message msg = null; + List> messages = new LinkedList>(); + Message msg = null; AtomicInteger msgCount = new AtomicInteger(0); while (((msg = consumer.receive(0, TimeUnit.SECONDS)) != null) && msgCount.get() < maxMessages) { @@ -392,14 +393,14 @@ protected void consumeAsync(final Consumer consumer, ProcessContext context, } } - protected synchronized Consumer getConsumer(ProcessContext context, String topic) throws PulsarClientException { + protected synchronized Consumer getConsumer(ProcessContext context, String topic) throws PulsarClientException { /* Avoid creating producers for non-existent topics */ if (StringUtils.isBlank(topic)) { return null; } - Consumer consumer = getConsumers().get(topic); + Consumer consumer = getConsumers().get(topic); if (consumer != null && consumer.isConnected()) { return consumer; @@ -414,21 +415,18 @@ protected synchronized Consumer getConsumer(ProcessContext context, String to return (consumer != null && consumer.isConnected()) ? consumer : null; } - @SuppressWarnings("unchecked") - protected synchronized ConsumerBuilder getConsumerBuilder(ProcessContext context) throws PulsarClientException { + protected synchronized ConsumerBuilder getConsumerBuilder(ProcessContext context) throws PulsarClientException { - @SuppressWarnings("rawtypes") - ConsumerBuilder builder = null; + ConsumerBuilder builder = + getPulsarClientService().getPulsarClient().newConsumer(Schema.AUTO_CONSUME()); if (context.getProperty(TOPICS).isSet()) { String[] topics = Arrays.stream(context.getProperty(TOPICS).evaluateAttributeExpressions().getValue().split("[, ]")) .map(String::trim).toArray(String[]::new); - builder = this.consumerFactory.build(getPulsarClientService().getTopicSchema(topics)); builder = builder.topic(topics); } else if (context.getProperty(TOPICS_PATTERN).isSet()) { String topicsPattern = context.getProperty(TOPICS_PATTERN).getValue(); - builder = consumerFactory.build(getPulsarClientService().getTopicSchemaByRegex(topicsPattern)); builder = builder.topicsPattern(topicsPattern); } @@ -451,11 +449,11 @@ protected synchronized void setConsumerPool(ExecutorService pool) { this.consumerPool = pool; } - protected synchronized ExecutorCompletionService>> getConsumerService() { + protected synchronized ExecutorCompletionService>> getConsumerService() { return consumerService; } - protected synchronized void setConsumerService(ExecutorCompletionService>> service) { + protected synchronized void setConsumerService(ExecutorCompletionService>> service) { this.consumerService = service; } @@ -483,21 +481,22 @@ protected synchronized void setPulsarClientService(PulsarClientService pulsarCli this.pulsarClientService = pulsarClientService; } - protected synchronized PulsarConsumerLRUCache> getConsumers() { + protected synchronized PulsarConsumerLRUCache> getConsumers() { if (consumers == null) { - consumers = new PulsarConsumerLRUCache>(20); + consumers = new PulsarConsumerLRUCache>(20); } return consumers; } - protected void setConsumers(PulsarConsumerLRUCache> consumers) { + protected void setConsumers(PulsarConsumerLRUCache> consumers) { this.consumers = consumers; } - protected Map getMappedFlowFileAttributes(ProcessContext context, final Message message) { + protected Map getMappedFlowFileAttributes(ProcessContext context, final Message msg) { String mappings = context.getProperty(MAPPED_FLOWFILE_ATTRIBUTES).getValue(); - return PropertyMappingUtils.getMappedValues(mappings, (p) -> PULSAR_MESSAGE_KEY.equals(p) ? message.getKey() : message.getProperty(p)); + return PropertyMappingUtils.getMappedValues(mappings, + (p) -> PULSAR_MESSAGE_KEY.equals(p) ? msg.getKey() : msg.getProperty(p)); } protected boolean isSharedSubscription(ProcessContext context) { diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java index ae12a9e..98fe2e4 100644 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.commons.io.IOUtils; @SeeAlso({PublishPulsar.class, ConsumePulsarRecord.class, PublishPulsarRecord.class}) @@ -58,7 +59,7 @@ public class ConsumePulsar extends AbstractPulsarConsumerProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { try { - Consumer consumer = getConsumer(context, getConsumerId(context, session.get())); + Consumer consumer = getConsumer(context, getConsumerId(context, session.get())); if (consumer == null) { context.yield(); @@ -78,9 +79,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro } } - private void handleAsync(final Consumer consumer, ProcessContext context, ProcessSession session) { + private void handleAsync(final Consumer consumer, ProcessContext context, ProcessSession session) { try { - Future>> done = getConsumerService().poll(5, TimeUnit.SECONDS); + Future>> done = getConsumerService().poll(5, TimeUnit.SECONDS); if (done != null) { @@ -90,7 +91,7 @@ private void handleAsync(final Consumer consumer, ProcessContext context // Cumulative acks are NOT permitted on Shared subscriptions. final boolean shared = isSharedSubscription(context); - List> messages = done.get(); + List> messages = done.get(); if (CollectionUtils.isNotEmpty(messages)) { FlowFile flowFile = null; @@ -98,10 +99,10 @@ private void handleAsync(final Consumer consumer, ProcessContext context AtomicInteger msgCount = new AtomicInteger(0); Map lastAttributes = null; - Message lastMessage = null; + Message lastMessage = null; Map currentAttributes = null; - for (Message msg : messages) { + for (Message msg : messages) { currentAttributes = getMappedFlowFileAttributes(context, msg); if (lastAttributes != null && !lastAttributes.equals(currentAttributes)) { @@ -114,7 +115,7 @@ private void handleAsync(final Consumer consumer, ProcessContext context session.commitAsync(); if (!shared) { - final Message finalMessage = lastMessage; + final Message finalMessage = lastMessage; // Cumulatively acknowledge consuming the messages for non-shared subs getAckService().submit(new Callable() { @@ -156,8 +157,13 @@ public Object call() throws Exception { out.write(demarcatorBytes); } - out.write(msg.getValue()); - msgCount.getAndIncrement(); + byte[] data = msg.getData(); + + if (data != null && data.length > 0) { + out.write(data); + msgCount.getAndIncrement(); + } + } catch (final IOException ioEx) { session.rollback(); return; @@ -186,7 +192,7 @@ public Object call() throws Exception { } } - private void consume(Consumer consumer, ProcessContext context, ProcessSession session) throws PulsarClientException { + private void consume(Consumer consumer, ProcessContext context, ProcessSession session) throws PulsarClientException { try { final int maxMessages = context.getProperty(CONSUMER_BATCH_SIZE).isSet() ? context.getProperty(CONSUMER_BATCH_SIZE) @@ -200,8 +206,8 @@ private void consume(Consumer consumer, ProcessContext context, ProcessS FlowFile flowFile = null; OutputStream out = null; - Message msg = null; - Message lastMsg = null; + Message msg = null; + Message lastMsg = null; AtomicInteger msgCount = new AtomicInteger(0); AtomicInteger loopCounter = new AtomicInteger(0); @@ -249,20 +255,19 @@ private void consume(Consumer consumer, ProcessContext context, ProcessS if (shared) { consumer.acknowledge(msg); } - - byte[] msgValue = msg.getValue(); - // Skip empty messages, as they cause NPE's when we write them to the OutputStream - if (msgValue == null || msgValue.length < 1) { - continue; - } // only write demarcators between messages if (msgCount.get() > 0) { out.write(demarcatorBytes); } - out.write(msgValue); - msgCount.getAndIncrement(); + byte[] data = msg.getData(); + + if (data != null && data.length > 0) { + out.write(data); + msgCount.getAndIncrement(); + } + } catch (final IOException ioEx) { getLogger().error("Unable to create flow file ", ioEx); session.rollback(); diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java index aa0a018..9b9b68b 100644 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java @@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.schema.GenericRecord; @CapabilityDescription("Consumes messages from Apache Pulsar. " + "The complementary NiFi processor for sending messages is PublishPulsarRecord. Please note that, at this time, " @@ -81,7 +82,7 @@ }) @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @SeeAlso({PublishPulsar.class, ConsumePulsar.class, PublishPulsarRecord.class}) -public class ConsumePulsarRecord extends AbstractPulsarConsumerProcessor { +public class ConsumePulsarRecord extends AbstractPulsarConsumerProcessor { public static final String MSG_COUNT = "record.count"; private static final String RECORD_SEPARATOR = "\n"; @@ -159,7 +160,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro .evaluateAttributeExpressions().getValue().getBytes() : RECORD_SEPARATOR.getBytes(); try { - Consumer consumer = getConsumer(context, getConsumerId(context, session.get())); + Consumer consumer = getConsumer(context, getConsumerId(context, session.get())); if (consumer == null) { /* If we aren't connected to Pulsar, then just yield */ context.yield(); @@ -187,9 +188,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro * @return A List of Messages * @throws PulsarClientException in the event we cannot communicate with the Pulsar broker. */ - private List> getMessages(final Consumer consumer, int maxMessages) throws PulsarClientException { - List> messages = new LinkedList>(); - Message msg = null; + private List> getMessages(final Consumer consumer, int maxMessages) throws PulsarClientException { + List> messages = new LinkedList>(); + Message msg = null; AtomicInteger msgCount = new AtomicInteger(0); while (((msg = consumer.receive(0, TimeUnit.SECONDS)) != null) && msgCount.get() < maxMessages) { @@ -212,34 +213,37 @@ private List> getMessages(final Consumer consumer, int m * @param writerFactory - The factory used to write the messages. * @throws PulsarClientException if there is an issue communicating with Apache Pulsar. */ - private void consumeMessages(ProcessContext context, ProcessSession session, final Consumer consumer, final List> messages, - final RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, final byte[] demarcator, final boolean async) throws PulsarClientException { + private void consumeMessages(ProcessContext context, ProcessSession session, + final Consumer consumer, final List> messages, + final RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, + final byte[] demarcator, final boolean async) throws PulsarClientException { if (CollectionUtils.isEmpty(messages)) { return; } + final BlockingQueue> parseFailures = + new LinkedBlockingQueue>(); + RecordSchema schema = null; - final BlockingQueue> parseFailures = new LinkedBlockingQueue>(); FlowFile flowFile = null; OutputStream rawOut = null; RecordSetWriter writer = null; Map lastAttributes = null; - Message lastMessage = null; + Message lastMessage = null; Map currentAttributes = null; // Cumulative acks are NOT permitted on Shared subscriptions final boolean shared = isSharedSubscription(context); try { - for (Message msg : messages) { + for (Message msg : messages) { currentAttributes = getMappedFlowFileAttributes(context, msg); - byte[] msgValue = msg.getValue(); - + // if the current message's mapped attribute values differ from the previous set's, // write out the active record set and clear various references so that we'll start a new one - if (lastMessage != null && !lastAttributes.equals(currentAttributes)) { + if (lastAttributes != null && !lastAttributes.equals(currentAttributes)) { WriteResult result = writer.finishRecordSet(); IOUtils.closeQuietly(writer); IOUtils.closeQuietly(rawOut); @@ -257,7 +261,7 @@ private void consumeMessages(ProcessContext context, ProcessSession session, fin parseFailures.clear(); if (!shared) { - acknowledgeCumulative(consumer, lastMessage, async); + acknowledgeCumulative(consumer, lastMessage, async); } lastAttributes = null; @@ -265,19 +269,19 @@ private void consumeMessages(ProcessContext context, ProcessSession session, fin } // if there's no record set actively being written, begin one + byte[] data = msg.getData(); if (lastMessage == null) { flowFile = session.create(); flowFile = session.putAllAttributes(flowFile, currentAttributes); - schema = getSchema(flowFile, readerFactory, msgValue); + schema = getSchema(flowFile, readerFactory, data); rawOut = session.write(flowFile); - writer = getRecordWriter(writerFactory, schema, rawOut); + writer = getRecordWriter(writerFactory, schema, rawOut, flowFile); if (schema == null || writer == null) { parseFailures.add(msg); session.remove(flowFile); IOUtils.closeQuietly(rawOut); getLogger().error("Unable to create a record writer to consume from the Pulsar topic"); - continue; } @@ -288,13 +292,14 @@ private void consumeMessages(ProcessContext context, ProcessSession session, fin lastMessage = msg; if (shared) { - acknowledge(consumer, msg, async); + acknowledge(consumer, msg, async); } - // write each of records in the current message to the active record set. These will each + // write each of the records in the current message to the active record set. These will each // have the same mapped flowfile attribute values, which means that it's ok that they are all placed // in the same output flowfile. - final InputStream in = new ByteArrayInputStream(msgValue); + + final InputStream in = new ByteArrayInputStream(data); try { RecordReader r = readerFactory.createRecordReader(flowFile, in, getLogger()); for (Record record = r.nextRecord(); record != null; record = r.nextRecord()) { @@ -325,11 +330,11 @@ private void consumeMessages(ProcessContext context, ProcessSession session, fin handleFailures(session, parseFailures, demarcator); if (!shared) { - acknowledgeCumulative(consumer, messages.get(messages.size() - 1), async); + acknowledgeCumulative(consumer, messages.get(messages.size() - 1), async); } } - private void acknowledge(final Consumer consumer, final Message msg, final boolean async) throws PulsarClientException { + private void acknowledge(final Consumer consumer, final Message msg, final boolean async) throws PulsarClientException { if (async) { getAckService().submit(new Callable() { @Override @@ -343,7 +348,7 @@ public Object call() throws Exception { } } - private void acknowledgeCumulative(final Consumer consumer, final Message msg, final boolean async) throws PulsarClientException { + private void acknowledgeCumulative(final Consumer consumer, final Message msg, final boolean async) throws PulsarClientException { if (async) { getAckService().submit(new Callable() { @Override @@ -357,7 +362,8 @@ public Object call() throws Exception { } } - private void handleFailures(ProcessSession session, BlockingQueue> parseFailures, byte[] demarcator) { + private void handleFailures(ProcessSession session, + BlockingQueue> parseFailures, byte[] demarcator) { if (CollectionUtils.isEmpty(parseFailures)) { return; @@ -367,18 +373,17 @@ private void handleFailures(ProcessSession session, BlockingQueue> failureIterator = parseFailures.iterator(); + Iterator> failureIterator = parseFailures.iterator(); for (int idx = 0; failureIterator.hasNext(); idx++) { - Message msg = failureIterator.next(); - byte[] msgValue = msg.getValue(); + Message msg = failureIterator.next(); - if (msgValue != null && msgValue.length > 0) { + if (msg != null && msg.getData() != null) { if (idx > 0) { rawOut.write(demarcator); } - rawOut.write(msgValue); + rawOut.write(msg.getData()); } } IOUtils.closeQuietly(rawOut); @@ -392,18 +397,18 @@ private void handleFailures(ProcessSession session, BlockingQueue consumer, + protected void handleAsync(ProcessContext context, ProcessSession session, final Consumer consumer, final RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, byte[] demarcator) throws PulsarClientException { final Integer queryTimeout = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue(); try { - Future>> done = null; + Future>> done = null; do { done = getConsumerService().poll(queryTimeout, TimeUnit.SECONDS); if (done != null) { - List> messages = done.get(); + List> messages = done.get(); if (CollectionUtils.isNotEmpty(messages)) { consumeMessages(context, session, consumer, messages, readerFactory, writerFactory, demarcator, true); } @@ -423,6 +428,7 @@ private RecordSchema getSchema(FlowFile flowFile, RecordReaderFactory readerFact in = new ByteArrayInputStream(msgValue); schema = readerFactory.createRecordReader(flowFile, in, getLogger()).getSchema(); } catch (MalformedRecordException | IOException | SchemaNotFoundException e) { + getLogger().error("Unable to determine the schema", e); return null; } finally { IOUtils.closeQuietly(in); @@ -431,10 +437,11 @@ private RecordSchema getSchema(FlowFile flowFile, RecordReaderFactory readerFact return schema; } - private RecordSetWriter getRecordWriter(RecordSetWriterFactory writerFactory, RecordSchema srcSchema, OutputStream out) { + private RecordSetWriter getRecordWriter(RecordSetWriterFactory writerFactory, + RecordSchema srcSchema, OutputStream out, FlowFile flowFile) { try { RecordSchema writeSchema = writerFactory.getSchema(Collections.emptyMap(), srcSchema); - return writerFactory.createWriter(getLogger(), writeSchema, out); + return writerFactory.createWriter(getLogger(), writeSchema, out, flowFile); } catch (SchemaNotFoundException | IOException e) { return null; } diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactory.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactory.java deleted file mode 100644 index 42ee1d9..0000000 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactory.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.pulsar.util; - -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.schema.SchemaDefinition; -import org.apache.pulsar.client.impl.schema.AvroSchema; -import org.apache.pulsar.client.impl.schema.JSONSchema; -import org.apache.pulsar.client.impl.schema.ProtobufSchema; -import org.apache.pulsar.common.schema.SchemaInfo; - -/** - * Creates a schema-aware consumer based on the SchemaInfo - * provided. This allows us to consume from existing Pulsar - * topics that have an associated schema. - * - * Currently, it only supports Primitive schema types. - */ -public class ConsumerBuilderFactory { - - private PulsarClient client; - - public ConsumerBuilderFactory(PulsarClient c) { - this.client = c; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public ConsumerBuilder build(SchemaInfo info) { - - switch (info.getType()) { - - case AVRO : - return client.newConsumer( - AvroSchema.of( - SchemaDefinition.builder() - .withJsonDef(new String(info.getSchema())) - .build()) - ); - - case JSON : - return client.newConsumer( - JSONSchema.of( - SchemaDefinition.builder() - .withJsonDef(new String(info.getSchema())) - .build()) - ); - - case PROTOBUF : - return client.newConsumer( - ProtobufSchema.of( - SchemaDefinition.builder() - .withJsonDef(new String(info.getSchema())) - .build()) - ); - - case BOOLEAN : return client.newConsumer(Schema.BOOL); - - case DATE : return client.newConsumer(Schema.DATE); - - case DOUBLE : return client.newConsumer(Schema.DOUBLE); - - case FLOAT : return client.newConsumer(Schema.FLOAT); - - case INSTANT : return client.newConsumer(Schema.INSTANT); - - case INT8 : return client.newConsumer(Schema.INT8); - - case INT16 : return client.newConsumer(Schema.INT16); - - case INT32 : return client.newConsumer(Schema.INT32); - - case INT64 : return client.newConsumer(Schema.INT64); - - case LOCAL_DATE : return client.newConsumer(Schema.LOCAL_DATE); - - case LOCAL_DATE_TIME : return client.newConsumer(Schema.LOCAL_DATE_TIME); - - case LOCAL_TIME : return client.newConsumer(Schema.LOCAL_TIME); - - case STRING : return client.newConsumer(Schema.STRING); - - case TIME : return client.newConsumer(Schema.TIME); - - case TIMESTAMP : return client.newConsumer(Schema.TIMESTAMP); - - case BYTES : - default: return client.newConsumer(Schema.BYTES); - - } - - } -} diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java index cbda218..8e81064 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java @@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -31,8 +32,6 @@ import org.mockito.junit.MockitoRule; import static org.junit.Assert.assertEquals; - -import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -44,11 +43,12 @@ public class TestConsumePulsar extends AbstractPulsarProcessorTest { @Mock - protected Message mockMessage; + protected Message mockMessage; @Rule public MockitoRule mockitoRule = MockitoJUnit.rule(); - @Before + @SuppressWarnings("unchecked") + @Before public void init() throws InitializationException { runner = TestRunners.newTestRunner(ConsumePulsar.class); mockMessage = mock(Message.class); @@ -94,38 +94,10 @@ public void multipleAsyncMessagesBatchTest() throws PulsarClientException { public void multipleAsyncMessagesBatchSharedSubTest() throws PulsarClientException { this.batchMessages("Mocked Message", "foo", "bar", true, 40, "Shared"); } - - /* - * Verify that the consumer gets closed. - */ - @Test - public void onStoppedTest() throws NoSuchMethodException, SecurityException, PulsarClientException { - when(mockMessage.getValue()).thenReturn("Mocked Message".getBytes()); - mockClientService.setMockMessage(mockMessage); - - runner.setProperty(ConsumePulsar.TOPICS, "foo"); - runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar"); - runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive"); - runner.run(10, true); - runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS); - - runner.assertQueueEmpty(); - - // Verify that the receive method on the consumer was called 10 times - int batchSize = Integer.parseInt(ConsumePulsar.CONSUMER_BATCH_SIZE.getDefaultValue()); - verify(mockClientService.getMockConsumer(), atLeast(10 * batchSize)).receive(0, TimeUnit.SECONDS); - - // Verify that each message was acknowledged - verify(mockClientService.getMockConsumer(), times(10)).acknowledgeCumulative(mockMessage); - - // Verify that the consumer was closed - verify(mockClientService.getMockConsumer(), times(1)).close(); - - } @Test public void keySharedTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("Mocked Message".getBytes()); + when(mockMessage.getData()).thenReturn("Mocked Message".getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.TOPICS, "foo"); @@ -144,7 +116,7 @@ protected void batchMessages(String msg, String topic, String sub, boolean async } protected void batchMessages(String msg, String topic, String sub, boolean async, int batchSize, String subType) throws PulsarClientException { - when(mockMessage.getValue()).thenReturn(msg.getBytes()); + when(mockMessage.getData()).thenReturn(msg.getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.ASYNC_ENABLED, Boolean.toString(async)); @@ -200,7 +172,7 @@ protected void sendMessages(String msg, String topic, String sub, boolean async, protected void sendMessages(String msg, String topic, String sub, boolean async, int iterations, String subType) throws PulsarClientException { - when(mockMessage.getValue()).thenReturn(msg.getBytes()); + when(mockMessage.getData()).thenReturn(msg.getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.ASYNC_ENABLED, Boolean.toString(async)); @@ -240,9 +212,24 @@ protected void sendMessages(String msg, String topic, String sub, boolean async, } protected void doMappedAttributesTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("A".getBytes()).thenReturn("B".getBytes()).thenReturn("C".getBytes()).thenReturn("D".getBytes()); - when(mockMessage.getProperty("prop")).thenReturn(null).thenReturn(null).thenReturn("val").thenReturn("val"); - when(mockMessage.getKey()).thenReturn(null).thenReturn(null).thenReturn(null).thenReturn("K"); + when(mockMessage.getData()) + .thenReturn("A".getBytes()) + .thenReturn("B".getBytes()) + .thenReturn("C".getBytes()) + .thenReturn("D".getBytes()); + + when(mockMessage.getProperty("prop")) + .thenReturn(null) + .thenReturn(null) + .thenReturn("val") + .thenReturn("val"); + + when(mockMessage.getKey()) + .thenReturn(null) + .thenReturn(null) + .thenReturn(null) + .thenReturn("K"); + mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.MAPPED_FLOWFILE_ATTRIBUTES, "prop,key=__KEY__"); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord.java index 696ef71..b915162 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord.java @@ -33,12 +33,12 @@ import org.apache.nifi.processors.pulsar.pubsub.mocks.MockRecordParser; import org.apache.nifi.processors.pulsar.pubsub.mocks.MockRecordWriter; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunners; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -51,12 +51,13 @@ public class TestConsumePulsarRecord extends AbstractPulsarProcessorTest protected static String DEFAULT_SUB = "bar"; @Mock - protected Message mockMessage; + protected Message mockMessage; protected MockRecordParser readerService; protected MockRecordWriter writerService; - @Before + @SuppressWarnings("unchecked") + @Before public void setup() throws InitializationException { mockMessage = mock(Message.class); @@ -113,7 +114,7 @@ protected void doFailedParseHandlingTest(String msg, String topic, String sub, b runner.setProperty(ConsumePulsarRecord.MAX_WAIT_TIME, "0 sec"); } - when(mockMessage.getValue()).thenReturn(msg.getBytes()); + when(mockMessage.getData()).thenReturn(msg.getBytes()); mockClientService.setMockMessage(mockMessage); runner.run(); @@ -142,7 +143,7 @@ protected List sendMessages(String msg, String topic, String sub, } protected List sendMessages(String msg, String topic, String sub, boolean async, int iterations, int batchSize, String subType) throws PulsarClientException { - when(mockMessage.getValue()).thenReturn(msg.getBytes()); + when(mockMessage.getData()).thenReturn(msg.getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.ASYNC_ENABLED, Boolean.toString(async)); @@ -186,9 +187,21 @@ protected List sendMessages(String msg, String topic, String sub, } protected void doMappedAttributesTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("A,10".getBytes()).thenReturn("B,10".getBytes()).thenReturn("C,10".getBytes()).thenReturn("D,10".getBytes()); - when(mockMessage.getProperty("prop")).thenReturn(null).thenReturn(null).thenReturn("val").thenReturn("val"); - when(mockMessage.getKey()).thenReturn(null).thenReturn(null).thenReturn(null).thenReturn("K"); + when(mockMessage.getData()).thenReturn("A,10".getBytes()) + .thenReturn("B,10".getBytes()) + .thenReturn("C,10".getBytes()) + .thenReturn("D,10".getBytes()); + + when(mockMessage.getProperty("prop")).thenReturn(null) + .thenReturn(null) + .thenReturn("val") + .thenReturn("val"); + + when(mockMessage.getKey()).thenReturn(null) + .thenReturn(null) + .thenReturn(null) + .thenReturn("K"); + mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.MAPPED_FLOWFILE_ATTRIBUTES, "prop,key=__KEY__"); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsar.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsar.java index 074738b..aedee67 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsar.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsar.java @@ -52,7 +52,7 @@ public void pulsarClientExceptionTest() throws PulsarClientException { @Test public void emptyMessageTest() { - when(mockMessage.getValue()).thenReturn("".getBytes()); + when(mockMessage.getData()).thenReturn("".getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.TOPICS, "foo"); @@ -76,25 +76,6 @@ public void multipleMessagesTest() throws PulsarClientException { this.sendMessages("Mocked Message", "foo", "bar", true, 40); } - /* - * Verify that the consumer gets closed. - */ - @Test - public void onStoppedTest() throws NoSuchMethodException, SecurityException, PulsarClientException { - when(mockMessage.getValue()).thenReturn("Mocked Message".getBytes()); - mockClientService.setMockMessage(mockMessage); - - runner.setProperty(ConsumePulsar.TOPICS, "foo"); - runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar"); - runner.run(10, true); - runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS); - - runner.assertQueueEmpty(); - - // Verify that the consumer was closed - verify(mockClientService.getMockConsumer(), times(1)).close(); - } - @Test public void mappedAttributesTest() throws PulsarClientException { runner.setProperty(ConsumePulsar.ASYNC_ENABLED, Boolean.toString(true)); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsarRecord.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsarRecord.java index 15806f0..d8dd007 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsarRecord.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/async/TestAsyncConsumePulsarRecord.java @@ -24,7 +24,6 @@ import java.util.List; -import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsar; import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsarRecord; import org.apache.nifi.processors.pulsar.pubsub.TestConsumePulsarRecord; import org.apache.nifi.util.MockFlowFile; @@ -35,7 +34,7 @@ public class TestAsyncConsumePulsarRecord extends TestConsumePulsarRecord { @Test public void emptyMessageTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("".getBytes()); + when(mockMessage.getData()).thenReturn("".getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.TOPICS, DEFAULT_TOPIC); @@ -51,7 +50,7 @@ public void emptyMessageTest() throws PulsarClientException { @Test public void singleMalformedMessageTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn(BAD_MSG.getBytes()); + when(mockMessage.getData()).thenReturn(BAD_MSG.getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.TOPICS, DEFAULT_TOPIC); @@ -110,7 +109,7 @@ public void singleMessageWithGoodAndBadRecordsTest() throws PulsarClientExceptio } } - when(mockMessage.getValue()).thenReturn(input.toString().getBytes()); + when(mockMessage.getData()).thenReturn(input.toString().getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.ASYNC_ENABLED, Boolean.toString(false)); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java index 9dffb8f..4821564 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/mocks/MockPulsarClientService.java @@ -40,7 +40,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.mockito.Mock; @@ -66,19 +66,19 @@ public class MockPulsarClientService extends AbstractControllerService implem ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class); @Mock - ConsumerBuilder mockConsumerBuilder = mock(ConsumerBuilder.class); + ConsumerBuilder mockConsumerBuilder = mock(ConsumerBuilder.class); @Mock Producer mockProducer = mock(Producer.class); @Mock - Consumer mockConsumer = mock(Consumer.class); + Consumer mockConsumer = mock(Consumer.class); @Mock TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilder.class); @Mock - protected Message mockMessage; + protected Message mockMessage; @Mock SchemaInfo mockSchema = mock(SchemaInfo.class); @@ -90,8 +90,8 @@ public class MockPulsarClientService extends AbstractControllerService implem public MockPulsarClientService() { when(mockClient.newProducer()).thenReturn((ProducerBuilder) mockProducerBuilder); - when(mockClient.newConsumer()).thenReturn((ConsumerBuilder) mockConsumerBuilder); - when(mockClient.newConsumer(any(Schema.class))).thenReturn((ConsumerBuilder) mockConsumerBuilder); + when(mockClient.newConsumer(Schema.AUTO_CONSUME())).thenReturn((ConsumerBuilder) mockConsumerBuilder); + when(mockClient.newConsumer(any(Schema.class))).thenReturn((ConsumerBuilder) mockConsumerBuilder); when(mockProducerBuilder.topic(anyString())).thenReturn(mockProducerBuilder); when(mockProducerBuilder.enableBatching(anyBoolean())).thenReturn(mockProducerBuilder); @@ -118,8 +118,8 @@ public MockPulsarClientService() { when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer); when(mockConsumer.isConnected()).thenReturn(true); when(mockConsumer.receive()).thenReturn(mockMessage); - doAnswer(new Answer>() { - public Message answer(InvocationOnMock invocation) { + doAnswer(new Answer>() { + public Message answer(InvocationOnMock invocation) { return mockMessage; } }).when(mockConsumer).receive(0, TimeUnit.SECONDS); @@ -131,8 +131,8 @@ public Message answer(InvocationOnMock invocation) { } } - public void setMockMessage(Message msg) { - this.mockMessage = msg; + public void setMockMessage(Message mockMessage2) { + this.mockMessage = mockMessage2; // Configure the consumer behavior try { @@ -141,7 +141,7 @@ public void setMockMessage(Message msg) { e.printStackTrace(); } - CompletableFuture> future = CompletableFuture.supplyAsync(() -> { + CompletableFuture> future = CompletableFuture.supplyAsync(() -> { return mockMessage; }); @@ -190,7 +190,7 @@ private void defineDefaultProducerBehavior() { } } - public Consumer getMockConsumer() { + public Consumer getMockConsumer() { return mockConsumer; } @@ -198,7 +198,7 @@ public ProducerBuilder getMockProducerBuilder() { return mockProducerBuilder; } - public ConsumerBuilder getMockConsumerBuilder() { + public ConsumerBuilder getMockConsumerBuilder() { return mockConsumerBuilder; } @@ -210,24 +210,10 @@ public TypedMessageBuilder getMockTypedMessageBuilder() { public PulsarClient getPulsarClient() { return mockClient; } - - @Override - public PulsarAdmin getPulsarAdmin() { - return mockAdmin; - } @Override public String getPulsarBrokerRootURL() { return "pulsar://mocked:6650"; } - @Override - public SchemaInfo getTopicSchema(String[] topicNames) { - return mockSchema; - } - - @Override - public SchemaInfo getTopicSchemaByRegex(String regex) { - return mockSchema; - } } diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java index 00fafd6..c35f4fc 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java @@ -28,7 +28,6 @@ import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsar; import org.apache.nifi.processors.pulsar.pubsub.TestConsumePulsar; import org.apache.nifi.util.MockFlowFile; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.junit.Test; @@ -73,7 +72,7 @@ public void pulsarClientExceptionTest() throws PulsarClientException { @Test public void emptyMessageTest() throws PulsarClientException { when(mockClientService.getMockConsumer().receive(0, TimeUnit.SECONDS)).thenReturn(mockMessage).thenReturn(null); - when(mockMessage.getValue()).thenReturn("".getBytes()); + when(mockMessage.getData()).thenReturn("".getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsar.TOPICS, "foo"); @@ -104,27 +103,6 @@ public final void batchMessageTest() throws PulsarClientException { this.batchMessages("Mocked Message", "foo", "bar", false, 400); } - /* - * Verify that the consumer gets closed. - */ - @Test - public void onStoppedTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("Mocked Message".getBytes()); - mockClientService.setMockMessage(mockMessage); - - runner.setProperty(ConsumePulsar.TOPICS, "foo"); - runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar"); - runner.setProperty(ConsumePulsar.CONSUMER_BATCH_SIZE, 1 + ""); - runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive"); - runner.run(1, true); - runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS); - - runner.assertQueueEmpty(); - - // Verify that the consumer was closed - verify(mockClientService.getMockConsumer(), times(1)).close(); - } - @Test public void mappedAttributesTest() throws PulsarClientException { super.doMappedAttributesTest(); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsarRecord.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsarRecord.java index bc5d96d..1ab2d6c 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsarRecord.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsarRecord.java @@ -34,7 +34,7 @@ public class TestSyncConsumePulsarRecord extends TestConsumePulsarRecord { @Test public void emptyMessageTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn("".getBytes()); + when(mockMessage.getData()).thenReturn("".getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.TOPICS, DEFAULT_TOPIC); @@ -49,7 +49,7 @@ public void emptyMessageTest() throws PulsarClientException { @Test public void singleMalformedMessageTest() throws PulsarClientException { - when(mockMessage.getValue()).thenReturn(BAD_MSG.getBytes()); + when(mockMessage.getData()).thenReturn(BAD_MSG.getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.TOPICS, DEFAULT_TOPIC); @@ -107,7 +107,7 @@ public void singleMessageWithGoodAndBadRecordsTest() throws PulsarClientExceptio } } - when(mockMessage.getValue()).thenReturn(input.toString().getBytes()); + when(mockMessage.getData()).thenReturn(input.toString().getBytes()); mockClientService.setMockMessage(mockMessage); runner.setProperty(ConsumePulsarRecord.ASYNC_ENABLED, Boolean.toString(false)); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactoryTests.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactoryTests.java deleted file mode 100644 index b2a4030..0000000 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/util/ConsumerBuilderFactoryTests.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.pulsar.util; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; - -import org.apache.nifi.testing.*; - -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.ConsumerBuilderImpl; -import org.apache.pulsar.client.impl.schema.AvroSchema; -import org.apache.pulsar.client.impl.schema.JSONSchema; -import org.apache.pulsar.client.impl.schema.ProtobufSchema; -import org.apache.pulsar.common.schema.SchemaInfo; -import org.apache.pulsar.common.schema.SchemaType; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; - -import com.google.protobuf.GeneratedMessageV3; - -@SuppressWarnings("rawtypes") -public class ConsumerBuilderFactoryTests { - - private PulsarClient pulsarClient; - - @Mock - private SchemaInfo mockSchema = mock(SchemaInfo.class); - - private ConsumerBuilderFactory factory; - - @Before - public final void init() throws PulsarClientException { - pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); - factory = new ConsumerBuilderFactory(pulsarClient); - } - - @Test - public final void booleanSchemaTest() { - when(mockSchema.getType()).thenReturn(SchemaType.BOOLEAN); - ConsumerBuilderImpl impl = (ConsumerBuilderImpl)factory.build(mockSchema); - assertEquals(Schema.BOOL, impl.getSchema()); - } - - @Test - public final void dateSchemaTest() { - when(mockSchema.getType()).thenReturn(SchemaType.DATE); - ConsumerBuilderImpl impl = (ConsumerBuilderImpl)factory.build(mockSchema); - assertEquals(Schema.DATE, impl.getSchema()); - } - - @Test - public final void avroSchemaTest() { - String def = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.nifi.processors.pulsar.util.ConsumerBuilderFactoryTests\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}]}"; - - AvroSchema userSchema = AvroSchema.of(User.class); - when(mockSchema.getType()).thenReturn(userSchema.getSchemaInfo().getType()); - when(mockSchema.getSchema()).thenReturn(userSchema.getSchemaInfo().getSchema()); - - ConsumerBuilderImpl impl = (ConsumerBuilderImpl)factory.build(mockSchema); - assertEquals(def, impl.getSchema().getSchemaInfo().getSchemaDefinition()); - } - - @Test - public final void jsonSchemaTest() { - JSONSchema userSchema = JSONSchema.of(User.class); - - when(mockSchema.getType()).thenReturn(userSchema.getSchemaInfo().getType()); - when(mockSchema.getSchema()).thenReturn(userSchema.getSchemaInfo().getSchema()); - - ConsumerBuilderImpl impl = (ConsumerBuilderImpl)factory.build(mockSchema); - assertEquals("", impl.getSchema().getSchemaInfo().getSchemaDefinition()); - } - - @Test - public final void protobufTest() { - ProtobufSchema userSchema = - ProtobufSchema.of(Person.class, new HashMap()); - - when(mockSchema.getType()).thenReturn(userSchema.getSchemaInfo().getType()); - when(mockSchema.getSchema()).thenReturn(userSchema.getSchemaInfo().getSchema()); - - ConsumerBuilderImpl impl = (ConsumerBuilderImpl)factory.build(mockSchema); - assertEquals("", impl.getSchema().getSchemaInfo().getSchemaDefinition()); - } - - private static final class User { - private String name; - private int age; - } -} diff --git a/nifi-pulsar-processors/src/test/proto/org/apache/nifi/testing/addressbook.proto b/nifi-pulsar-processors/src/test/proto/org/apache/nifi/testing/addressbook.proto deleted file mode 100644 index 9f4dd0a..0000000 --- a/nifi-pulsar-processors/src/test/proto/org/apache/nifi/testing/addressbook.proto +++ /dev/null @@ -1,30 +0,0 @@ -syntax = "proto3"; - -package tutorial; - -option java_multiple_files = true; -option java_package = "org.apache.nifi.testing"; -option java_outer_classname = "AddressBookProtos"; - -message Person { - string name = 1; - int32 id = 2; - string email = 3; - - enum PhoneType { - MOBILE = 0; - HOME = 1; - WORK = 2; - } - - message PhoneNumber { - string number = 1; - PhoneType type = 2; - } - - repeated PhoneNumber phones = 4; -} - -message AddressBook { - repeated Person people = 1; -} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 0a08873..77d9581 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,7 @@ 4.2 + 1.21 3.8.1 2.4 2.5.2 @@ -31,7 +32,7 @@ 2.12.3 3.11.4 3.11.4 - 2.8.0 + 2.8.1 11