Skip to content

Commit

Permalink
Refactored async processing
Browse files Browse the repository at this point in the history
  • Loading branch information
david-streamlio committed May 14, 2023
1 parent b53f35f commit 1f0a4ce
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 224 deletions.
2 changes: 1 addition & 1 deletion docker-image/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>io.streamnative.connectors</groupId>
<artifactId>nifi-pulsar-bundle</artifactId>
<version>1.20.0</version>
<version>1.15.3</version>
</parent>

<artifactId>docker-image</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion docker-image/src/main/docker/nifi/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

FROM apache/nifi:1.20.0
FROM apache/nifi:1.15.3


#############################################################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,6 @@ public abstract class AbstractPulsarProducerProcessor<T> extends AbstractProcess
.defaultValue("false")
.build();

public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new PropertyDescriptor.Builder()
.name("MAX_ASYNC_REQUESTS")
.displayName("Maximum Async Requests")
.description("The maximum number of outstanding asynchronous publish requests for this processor. "
+ "Each asynchronous call requires memory, so avoid setting this value to high.")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("2")
.build();

public static final PropertyDescriptor AUTO_UPDATE_PARTITIONS = new PropertyDescriptor.Builder()
.name("AUTO_UPDATE_PARTITIONS")
.displayName("Auto update partitions")
Expand Down Expand Up @@ -259,7 +249,6 @@ public abstract class AbstractPulsarProducerProcessor<T> extends AbstractProcess
descriptorList.add(PULSAR_CLIENT_SERVICE);
descriptorList.add(TOPIC);
descriptorList.add(ASYNC_ENABLED);
descriptorList.add(MAX_ASYNC_REQUESTS);
descriptorList.add(AUTO_UPDATE_PARTITIONS);
descriptorList.add(AUTO_UPDATE_PARTITION_INTERVAL);
descriptorList.add(BATCHING_ENABLED);
Expand Down Expand Up @@ -294,131 +283,22 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

private PulsarClientService pulsarClientService;
private PulsarConsumerLRUCache<String, Producer<T>> producers;
private ExecutorService publisherPool;

// Used to sync between onTrigger method and shutdown code block.
protected AtomicBoolean canPublish = new AtomicBoolean(true);

// Used to track whether we are reporting errors back to the user or not.
protected AtomicBoolean trackFailures = new AtomicBoolean();

protected BlockingQueue<MessageTuple<T>> workQueue;
protected BlockingQueue<MessageTuple<T>> failureQueue;
protected List<AsyncPublisher> asyncPublishers;

@OnScheduled
public void init(ProcessContext context) {
int maxRequests = context.getProperty(MAX_ASYNC_REQUESTS).asInteger();
setPulsarClientService(context.getProperty(PULSAR_CLIENT_SERVICE).asControllerService(PulsarClientService.class));

if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
setPublisherPool(Executors.newFixedThreadPool(maxRequests));
setAsyncPublishers(new LinkedList<AsyncPublisher>());
// Limit the depth of the work queue to 500 per worker, to prevent long shutdown times.
workQueue = new LinkedBlockingQueue<>(500 * maxRequests);

if (context.hasConnection(REL_FAILURE)) {
failureQueue = new LinkedBlockingQueue<>();
trackFailures.set(true);
} else {
trackFailures.set(false);
}

for (int idx = 0; idx < maxRequests; idx++) {
AsyncPublisher worker = new AsyncPublisher();
getAsyncPublishers().add(worker);
getPublisherPool().submit(worker);
}
canPublish.set(true);
}
}

@OnUnscheduled
public void shutDown(final ProcessContext context) {
/*
* If we are running in asynchronous mode, then we need to stop all the producer threads that
* are running in the PublisherPool. After, we have stopped them, we need to wait a bit
* to ensure that all the messages are properly acked, in order to prevent re-processing the
* same messages in the event of a shutdown and restart of the processor since the un-acked
* messages would be replayed on startup.
*/
if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
try {
// Stop accepting incoming work
canPublish.set(false);

// Halt the background worker threads, allowing them to empty the workQueue
getAsyncPublishers().forEach(AsyncPublisher::halt);

// Flush all the pending messages in the producers
getProducers().values().forEach(producer -> {
try {
producer.flush();
} catch (PulsarClientException e) {
getLogger().error("Unable to flush messages to Pulsar", e);
}
});

// Shutdown the thread pool
getPublisherPool().shutdown();

boolean shutdown = false;

do {
shutdown = getPublisherPool().awaitTermination(1, TimeUnit.SECONDS);
} while (!shutdown);

} catch (InterruptedException e) {
getLogger().error("Unable to stop all the Pulsar Producers", e);
}
}
}

@OnStopped
public void cleanUp(final ProcessContext context) {
if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean()) {
if (canPublish.get()) {
shutDown(context);
// Flush all the pending messages in the producers
getProducers().values().forEach(producer -> {
try {
producer.flush();
} catch (PulsarClientException e) {
getLogger().error("Unable to flush messages to Pulsar", e);
}
workQueue.clear();
getProducers().clear();
getAsyncPublishers().clear();
}
}

/**
* If the processor is configured to run in asynchronous mode, then we need to periodically
* check the failureList and route those records to the FAILURE relationship, so that the end
* user is aware of the failures and can handle them as they see fit.
*
* @param session - The current processor session
*/
protected void handleFailures(ProcessSession session) {

if (!trackFailures.get() || CollectionUtils.isEmpty(failureQueue)) {
return;
}

MessageTuple<T> failure = failureQueue.poll();

while (failure != null) {
FlowFile flowFile = session.create();
final byte[] value = (byte[]) failure.getContent();
flowFile = session.write(flowFile, out -> {
out.write(value);
});
session.putAttribute(flowFile, TOPIC_NAME, failure.getTopic());
session.transfer(flowFile, REL_FAILURE);
failure = failureQueue.poll();
}
}

private synchronized List<AbstractPulsarProducerProcessor<T>.AsyncPublisher> getAsyncPublishers() {
return asyncPublishers;
}

private synchronized void setAsyncPublishers(List<AbstractPulsarProducerProcessor<T>.AsyncPublisher> list) {
asyncPublishers = list;
});
}

protected synchronized Producer<T> getProducer(ProcessContext context, String topic) {
Expand Down Expand Up @@ -489,14 +369,6 @@ protected synchronized void setProducers(PulsarConsumerLRUCache<String, Producer
this.producers = producers;
}

protected synchronized ExecutorService getPublisherPool() {
return publisherPool;
}

protected synchronized void setPublisherPool(ExecutorService publisherPool) {
this.publisherPool = publisherPool;
}

protected String getMessageKey(ProcessContext context, final FlowFile ff) {
String key = context.getProperty(MESSAGE_KEY).evaluateAttributeExpressions(ff).getValue();

Expand All @@ -509,7 +381,6 @@ protected String getMessageKey(ProcessContext context, final FlowFile ff) {

protected Map<String, String> getMappedMessageProperties(ProcessContext context, final FlowFile ff) {
String mappings = context.getProperty(MAPPED_MESSAGE_PROPERTIES).getValue();

return PropertyMappingUtils.getMappedValues(mappings, (a) -> ff.getAttribute(a));
}

Expand All @@ -523,66 +394,14 @@ protected MessageId send(Producer<T> producer, String key, Map<String, String> p
return tmb.send();
}

private final class AsyncPublisher implements Runnable {
private boolean keepRunning = true;

public void halt() {
keepRunning = false;

// Finish up
do {
process();
} while (!workQueue.isEmpty());
}

@Override
public void run() {
while (keepRunning) {
process();
}
}

private CompletableFuture<MessageId> sendAsync(Producer<T> producer, String key, Map<String, String> properties, T value) {
TypedMessageBuilder<T> tmb = producer.newMessage().properties(properties).value(value);

if (key != null) {
tmb = tmb.key(key);
}
protected CompletableFuture<MessageId> sendAsync(Producer<T> producer, String key, Map<String, String> properties, T value) {
TypedMessageBuilder<T> tmb = producer.newMessage().properties(properties).value(value);

return tmb.sendAsync();
if (key != null) {
tmb = tmb.key(key);
}

private void process() {
try {
MessageTuple<T> item = workQueue.poll(50, TimeUnit.MILLISECONDS);

if (item == null) {
return;
}

Producer<T> producer = getProducers().get(item.getTopic());

if (!trackFailures.get()) {
// We don't care about failures, so just fire & forget
sendAsync(producer, item.getKey(), item.getProperties(), item.getContent());
} else if (producer == null || !producer.isConnected()) {
// We cannot get a valid producer, so add the item to the failure queue
failureQueue.put(item);
} else {
try {
// Send the item asynchronously and confirm we get a messageId back from Pulsar.
if (sendAsync(producer, item.getKey(), item.getProperties(), item.getContent()).join() == null) {
// No messageId indicates failure
failureQueue.put(item);
}
} catch (final Throwable t) {
// Any exception during sendAsync() call indicates failure
failureQueue.put(item);
}
}
} catch (InterruptedException e) {
// Ignore these
}
}
return tmb.sendAsync();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Map;

Expand All @@ -35,6 +38,7 @@
import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
import org.apache.nifi.processors.pulsar.MessageTuple;
import org.apache.nifi.stream.io.util.StreamDemarcator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;

Expand All @@ -52,8 +56,6 @@ public class PublishPulsar extends AbstractPulsarProducerProcessor<byte[]> {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {

handleFailures(session);

final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
Expand Down Expand Up @@ -86,18 +88,29 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
getLogger().error("Failed to connect to Pulsar Server due to {}", new Object[]{e});
session.transfer(flowFile, REL_FAILURE);
}
} else if (canPublish.get()) {
} else {
byte[] messageContent;

try (final InputStream in = session.read(flowFile);
final StreamDemarcator demarcator = new StreamDemarcator(in, demarcatorBytes, Integer.MAX_VALUE)) {
List<CompletableFuture<MessageId>> futureList = new ArrayList<>();

while ((messageContent = demarcator.nextToken()) != null) {
workQueue.put(new MessageTuple<>(
topic,
getMessageKey(context, flowFile),
getMappedMessageProperties(context, flowFile),
messageContent));
futureList.add(sendAsync(producer,
getMessageKey(context, flowFile),
getMappedMessageProperties(context, flowFile),
messageContent));

}
demarcator.close();

// Wait for futures to complete, flush all the producers in parallel etc.
// Block here until work queue is empty and all producers have been flushed.
CompletableFuture<MessageId>[] futureArray = futureList.toArray(new CompletableFuture[0]);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureArray);
allFutures.join(); // wait for all futures to complete
producer.flush();

demarcator.close();
session.transfer(flowFile, REL_SUCCESS);
} catch (Throwable t) {
Expand Down
Loading

0 comments on commit 1f0a4ce

Please sign in to comment.