Skip to content

Commit

Permalink
Merge pull request #17 from david-streamlio/schema-enablement
Browse files Browse the repository at this point in the history
Schema enablement
  • Loading branch information
david-streamlio committed Oct 27, 2021
2 parents fcc653b + 5dc459a commit 48061d2
Show file tree
Hide file tree
Showing 22 changed files with 251 additions and 238 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ FROM apache/nifi:1.14.0

COPY --chown=nifi:nifi lib/*.nar /opt/nifi/nifi-current/lib/

COPY --chown=nifi:nifi creds/*.json /tmp
COPY --chown=nifi:nifi creds/*.json /tmp
2 changes: 1 addition & 1 deletion nifi-pulsar-client-service-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-original</artifactId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface PulsarClientService extends ControllerService {
public PulsarClient getPulsarClient();

public String getPulsarBrokerRootURL();

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@

import org.apache.commons.collections4.map.LRUMap;

public class PulsarClientLRUCache<K, V extends Closeable> extends LRUMap<K, V> {
public class PulsarConsumerLRUCache<K, V extends Closeable> extends LRUMap<K, V> {

private static final long serialVersionUID = 730163138087670453L;
private final static float LOAD_FACTOR = 0.75F;
private final static boolean SCAN_UNTIL_REMOVABLE = false;

public PulsarClientLRUCache(int maxSize) {
public PulsarConsumerLRUCache(int maxSize) {
this(maxSize, LOAD_FACTOR, SCAN_UNTIL_REMOVABLE);
}

public PulsarClientLRUCache(int maxSize, float loadFactor, boolean scanUntilRemovable) {
public PulsarConsumerLRUCache(int maxSize, float loadFactor, boolean scanUntilRemovable) {
super(maxSize, loadFactor, scanUntilRemovable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void setUp() throws InterruptedException {
*/
@Test
public void simpleTest() {
PulsarClientLRUCache<String, Producer> cache = new PulsarClientLRUCache<String, Producer>(10);
PulsarConsumerLRUCache<String, Producer> cache = new PulsarConsumerLRUCache<String, Producer>(10);

for (Character i='A'; i<='E'; i++){
cache.put(i.toString(), mockedPulsarProducer);
Expand All @@ -60,7 +60,7 @@ public void simpleTest() {
@Test
public void evictionTest() {

PulsarClientLRUCache<String, Producer> cache = new PulsarClientLRUCache<String, Producer>(5);
PulsarConsumerLRUCache<String, Producer> cache = new PulsarConsumerLRUCache<String, Producer>(5);

for (Character i='A'; i<='Z'; i++){
cache.put(i.toString(), mockedPulsarProducer);
Expand All @@ -78,7 +78,7 @@ public void evictionTest() {
@Test
public void evictionLruTest() {

PulsarClientLRUCache<String, Producer> cache = new PulsarClientLRUCache<String, Producer>(5);
PulsarConsumerLRUCache<String, Producer> cache = new PulsarConsumerLRUCache<String, Producer>(5);

final Character A = 'A';

Expand All @@ -102,7 +102,7 @@ public void evictionLruTest() {

@Test
public void clearTest() throws PulsarClientException {
PulsarClientLRUCache<String, Producer> cache = new PulsarClientLRUCache<String, Producer>(26);
PulsarConsumerLRUCache<String, Producer> cache = new PulsarConsumerLRUCache<String, Producer>(26);

for (Character i='A'; i<='Z'; i++) {
cache.put(i.toString(), mockedPulsarProducer);
Expand Down
13 changes: 12 additions & 1 deletion nifi-pulsar-client-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,22 @@

<dependencies>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>${pulsar.version}</version>
</dependency>

<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-pulsar-client-service-api</artifactId>
<version>${version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ public class StandardPulsarClientService extends AbstractControllerService imple
.defaultValue("false")
.description("")
.displayName("Allow TLS Insecure Connection")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

public static final PropertyDescriptor AUTHENTICATION_SERVICE = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -96,9 +94,7 @@ public class StandardPulsarClientService extends AbstractControllerService imple
+ "It validates incoming x509 certificate and matches provided hostname(CN/SAN) with expected "
+ "broker's host name. It follows RFC 2818, 3.1. Server Identity hostname verification.")
.displayName("Enable TLS Hostname Verification")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -233,10 +229,10 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, UnsupportedAuthenticationException {
try {
client = getClientBuilder(context).build();
client = getClient(context);
brokerUrl = context.getProperty(PULSAR_SERVICE_URL).evaluateAttributeExpressions().getValue();
} catch (Exception e) {
throw new InitializationException("Unable to create Pulsar Client", e);
throw new InitializationException("Unable to connect to the Pulsar cluster ", e);
}
}

Expand All @@ -258,11 +254,11 @@ public String getPulsarBrokerRootURL() {
return brokerUrl;
}

private ClientBuilder getClientBuilder(ConfigurationContext context) throws UnsupportedAuthenticationException, MalformedURLException {
private PulsarClient getClient(ConfigurationContext context) throws MalformedURLException, PulsarClientException {

ClientBuilder builder = PulsarClient.builder()
.allowTlsInsecureConnection(context.getProperty(ALLOW_TLS_INSECURE_CONNECTION).evaluateAttributeExpressions().asBoolean())
.enableTlsHostnameVerification(context.getProperty(ENABLE_TLS_HOSTNAME_VERIFICATION).evaluateAttributeExpressions().asBoolean())
.allowTlsInsecureConnection(context.getProperty(ALLOW_TLS_INSECURE_CONNECTION).asBoolean())
.enableTlsHostnameVerification(context.getProperty(ENABLE_TLS_HOSTNAME_VERIFICATION).asBoolean())
.maxConcurrentLookupRequests(context.getProperty(CONCURRENT_LOOKUP_REQUESTS).evaluateAttributeExpressions().asInteger())
.connectionsPerBroker(context.getProperty(CONNECTIONS_PER_BROKER).evaluateAttributeExpressions().asInteger())
.ioThreads(context.getProperty(IO_THREADS).evaluateAttributeExpressions().asInteger())
Expand All @@ -288,7 +284,7 @@ private ClientBuilder getClientBuilder(ConfigurationContext context) throws Unsu
}

builder = builder.serviceUrl(context.getProperty(PULSAR_SERVICE_URL).evaluateAttributeExpressions().getValue());
return builder;
return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.nifi.pulsar.validator;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public class TestStandardPulsarClientService {

@Test
public void testService() throws InitializationException {
public void validServiceTest() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final PulsarClientService service = new StandardPulsarClientService();
runner.addControllerService("test-good", service);
Expand Down
51 changes: 31 additions & 20 deletions nifi-pulsar-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<artifactId>nifi-pulsar-processors</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand Down Expand Up @@ -56,9 +56,38 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-pulsar-client-service-api</artifactId>
<version>${version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
</dependency>

<dependency>
<groupId>org.apache.directory.studio</groupId>
<artifactId>org.apache.commons.io</artifactId>
<version>${commons.io.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson-core.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf3.version}</version>
</dependency>

<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
Expand All @@ -77,24 +106,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>

<dependency>
<groupId>org.apache.directory.studio</groupId>
<artifactId>org.apache.commons.io</artifactId>
<version>${commons.io.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson-core.version}</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.pulsar.PropertyMappingUtils;
import org.apache.nifi.pulsar.PulsarClientService;
import org.apache.nifi.pulsar.cache.PulsarClientLRUCache;
import org.apache.nifi.pulsar.cache.PulsarConsumerLRUCache;
import org.apache.nifi.util.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericRecord;

public abstract class AbstractPulsarConsumerProcessor<T> extends AbstractProcessor {
protected static final String PULSAR_MESSAGE_KEY = "__KEY__";
Expand Down Expand Up @@ -263,9 +266,9 @@ public abstract class AbstractPulsarConsumerProcessor<T> extends AbstractProcess
}

private PulsarClientService pulsarClientService;
private PulsarClientLRUCache<String, Consumer<T>> consumers;
private PulsarConsumerLRUCache<String, Consumer<GenericRecord>> consumers;
private ExecutorService consumerPool;
private ExecutorCompletionService<List<Message<T>>> consumerService;
private ExecutorCompletionService<List<Message<GenericRecord>>> consumerService;
private ExecutorService ackPool;
private ExecutorCompletionService<Object> ackService;

Expand Down Expand Up @@ -367,14 +370,15 @@ protected String getConsumerId(final ProcessContext context, FlowFile flowFile)
return sb.toString();
}

protected void consumeAsync(final Consumer<T> consumer, ProcessContext context, ProcessSession session) throws PulsarClientException {
protected void consumeAsync(final Consumer<GenericRecord> consumer,
ProcessContext context, ProcessSession session) throws PulsarClientException {
try {
final int maxMessages = context.getProperty(CONSUMER_BATCH_SIZE).isSet() ? context.getProperty(CONSUMER_BATCH_SIZE)
.evaluateAttributeExpressions().asInteger() : Integer.MAX_VALUE;

getConsumerService().submit(() -> {
List<Message<T>> messages = new LinkedList<Message<T>>();
Message<T> msg = null;
List<Message<GenericRecord>> messages = new LinkedList<Message<GenericRecord>>();
Message<GenericRecord> msg = null;
AtomicInteger msgCount = new AtomicInteger(0);

while (((msg = consumer.receive(0, TimeUnit.SECONDS)) != null) && msgCount.get() < maxMessages) {
Expand All @@ -389,14 +393,14 @@ protected void consumeAsync(final Consumer<T> consumer, ProcessContext context,
}
}

protected synchronized Consumer<T> getConsumer(ProcessContext context, String topic) throws PulsarClientException {
protected synchronized Consumer<GenericRecord> getConsumer(ProcessContext context, String topic) throws PulsarClientException {

/* Avoid creating producers for non-existent topics */
if (StringUtils.isBlank(topic)) {
return null;
}

Consumer<T> consumer = getConsumers().get(topic);
Consumer<GenericRecord> consumer = getConsumers().get(topic);

if (consumer != null && consumer.isConnected()) {
return consumer;
Expand All @@ -411,15 +415,19 @@ protected synchronized Consumer<T> getConsumer(ProcessContext context, String to
return (consumer != null && consumer.isConnected()) ? consumer : null;
}

protected synchronized ConsumerBuilder<T> getConsumerBuilder(ProcessContext context) throws PulsarClientException {

ConsumerBuilder<T> builder = (ConsumerBuilder<T>) getPulsarClientService().getPulsarClient().newConsumer();
protected synchronized ConsumerBuilder<GenericRecord> getConsumerBuilder(ProcessContext context) throws PulsarClientException {

ConsumerBuilder<GenericRecord> builder =
getPulsarClientService().getPulsarClient().newConsumer(Schema.AUTO_CONSUME());

if (context.getProperty(TOPICS).isSet()) {
builder = builder.topic(Arrays.stream(context.getProperty(TOPICS).evaluateAttributeExpressions().getValue().split("[, ]"))
.map(String::trim).toArray(String[]::new));
String[] topics = Arrays.stream(context.getProperty(TOPICS).evaluateAttributeExpressions().getValue().split("[, ]"))
.map(String::trim).toArray(String[]::new);

builder = builder.topic(topics);
} else if (context.getProperty(TOPICS_PATTERN).isSet()) {
builder = builder.topicsPattern(context.getProperty(TOPICS_PATTERN).getValue());
String topicsPattern = context.getProperty(TOPICS_PATTERN).getValue();
builder = builder.topicsPattern(topicsPattern);
}

if (context.getProperty(CONSUMER_NAME).isSet()) {
Expand All @@ -433,19 +441,19 @@ protected synchronized ConsumerBuilder<T> getConsumerBuilder(ProcessContext cont
.subscriptionType(SubscriptionType.valueOf(context.getProperty(SUBSCRIPTION_TYPE).getValue()));
}

protected synchronized ExecutorService getConsumerPool() {
protected synchronized ExecutorService getConsumerPool() {
return consumerPool;
}

protected synchronized void setConsumerPool(ExecutorService pool) {
this.consumerPool = pool;
}

protected synchronized ExecutorCompletionService<List<Message<T>>> getConsumerService() {
protected synchronized ExecutorCompletionService<List<Message<GenericRecord>>> getConsumerService() {
return consumerService;
}

protected synchronized void setConsumerService(ExecutorCompletionService<List<Message<T>>> service) {
protected synchronized void setConsumerService(ExecutorCompletionService<List<Message<GenericRecord>>> service) {
this.consumerService = service;
}

Expand Down Expand Up @@ -473,21 +481,22 @@ protected synchronized void setPulsarClientService(PulsarClientService pulsarCli
this.pulsarClientService = pulsarClientService;
}

protected synchronized PulsarClientLRUCache<String, Consumer<T>> getConsumers() {
protected synchronized PulsarConsumerLRUCache<String, Consumer<GenericRecord>> getConsumers() {
if (consumers == null) {
consumers = new PulsarClientLRUCache<String, Consumer<T>>(20);
consumers = new PulsarConsumerLRUCache<String, Consumer<GenericRecord>>(20);
}
return consumers;
}

protected void setConsumers(PulsarClientLRUCache<String, Consumer<T>> consumers) {
protected void setConsumers(PulsarConsumerLRUCache<String, Consumer<GenericRecord>> consumers) {
this.consumers = consumers;
}

protected Map<String, String> getMappedFlowFileAttributes(ProcessContext context, final Message<T> message) {
protected Map<String, String> getMappedFlowFileAttributes(ProcessContext context, final Message<GenericRecord> msg) {
String mappings = context.getProperty(MAPPED_FLOWFILE_ATTRIBUTES).getValue();

return PropertyMappingUtils.getMappedValues(mappings, (p) -> PULSAR_MESSAGE_KEY.equals(p) ? message.getKey() : message.getProperty(p));
return PropertyMappingUtils.getMappedValues(mappings,
(p) -> PULSAR_MESSAGE_KEY.equals(p) ? msg.getKey() : msg.getProperty(p));
}

protected boolean isSharedSubscription(ProcessContext context) {
Expand Down
Loading

0 comments on commit 48061d2

Please sign in to comment.