Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Properties, Camel Headers, Body return #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ The processor has 5 properties:
* Maximum Deliveries: Maximum of retries in case of an error
* Delivery Delay: Delay between retries
* Log Level: The loglevel of the Camel route
* Return Headers: true/false, if Camel headers must be returned into Nifi attributes


The processor accepts dynamic properties prefixed with "camel."
![Alt text](docs/dynamic-properties.jpg?raw=true "Dynamic Properties")


Camel headers can be returned into Nifi attributes
![Alt text](docs/camel-headers.jpg?raw=true "Camel Headers")


For the URI format of the camel component see [Camel's component reference](https://camel.apache.org/components/latest/). For
example to use File uri: file://C/out
example to use File uri: file://C/out
Binary file added docs/camel-headers.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/dynamic-properties.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.Processor;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
Expand All @@ -28,6 +30,8 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
Expand All @@ -37,17 +41,24 @@
import org.apache.nifi.processor.util.StandardValidators;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

import java.util.UUID;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;

import org.assimbly.connector.Connector;
import org.assimbly.docconverter.DocConverter;
Expand Down Expand Up @@ -87,7 +98,7 @@ public class ProduceWithCamel extends AbstractProcessor {

public static final PropertyDescriptor REDELIVERY_DELAY = new PropertyDescriptor
.Builder().name("REDELIVERY_DELAY")
.displayName("Redelivery delay")
.displayName("Redelivery Delay")
.description("Delay in ms between redeliveries")
.defaultValue("3000")
.required(true)
Expand All @@ -102,8 +113,37 @@ public class ProduceWithCamel extends AbstractProcessor {
.defaultValue("OFF")
.allowableValues("OFF","INFO","WARN","ERROR","DEBUG","TRACE")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

.build();

public static final PropertyDescriptor RETURN_HEADERS = new PropertyDescriptor
.Builder().name("RETURN_HEADERS")
.displayName("Return Headers")
.description("Return Camel exchange out headers into Nifi flowfile attributes")
.required(true)
.allowableValues("true","false")
.defaultValue("false")
.build();

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
if (propertyDescriptorName.startsWith("camel.")) {
String displayName;
String description;
displayName = propertyDescriptorName.substring(6);
description = String.format("Camel equivalent to <setHeader name=\"%s\"><simple>xxxx<simple></setHeader> (nifi expressions supported)", displayName);
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
.description(description)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.build();
}
return null;
}

public static final Relationship SUCCESS = new Relationship.Builder()
.name("SUCCESS")
.description("Succes relationship")
Expand Down Expand Up @@ -135,6 +175,7 @@ protected void init(final ProcessorInitializationContext context) {
descriptors.add(MAXIMUM_REDELIVERIES);
descriptors.add(REDELIVERY_DELAY);
descriptors.add(LOG_LEVEL);
descriptors.add(RETURN_HEADERS);
this.descriptors = Collections.unmodifiableList(descriptors);

final Set<Relationship> relationships = new HashSet<Relationship>();
Expand Down Expand Up @@ -207,22 +248,57 @@ public void onScheduled(final ProcessContext context) {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

final AtomicReference<byte[]> body = new AtomicReference<>(new byte[]{});
final Map<String, Object> attributes = new ConcurrentHashMap<>();

FlowFile flowfile = session.get();

if ( flowfile == null ) {
return;
}


Map<String, String> headers = new HashMap<>();
//Embed evaluated expression into headers map
for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
if (propertyDescriptor.isDynamic()) {
PropertyValue myValue = context.newPropertyValue( context.getProperty(propertyDescriptor).getValue());
headers.put(propertyDescriptor.getName().substring(6), myValue.evaluateAttributeExpressions(flowfile).getValue().toString());
}
}

session.read(flowfile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try{

//Convert flowFile to a string
input = DocConverter.convertStreamToString(in);

//Send the message to the Camel route
template.sendBody(input);
//Convert flowFile to a string
input = DocConverter.convertStreamToString(in);

//Enrich and process the exchange as InOut
Exchange exchange = template.request("direct:nifi-" + flowId, new Processor() {
public void process(Exchange exchange) throws Exception {
for (Map.Entry<String, String> entry : headers.entrySet()) {
exchange.getIn().setHeader(entry.getKey(), entry.getValue());
}
exchange.getOut().setHeaders(exchange.getIn().getHeaders());
exchange.getIn().setBody(input);
}
});

//Retrieve out headers
Map<String, Object> map = exchange.getOut().getHeaders();
for (Map.Entry<String, Object> entry : map.entrySet()) {
try {
if (!entry.getKey().startsWith("Assimbly")) {
attributes.put(String.format("camel.%s", entry.getKey()), entry.getValue().toString());
}
}catch(Exception ex){
//Some returned headers may not be string safe...
getLogger().debug(String.format("Unconverted back camel header: \"%s\"", entry.getKey()));
}
}
//TODO: would it be possible to fail converting body into bytes ?
body.set(exchange.getOut().getBody(byte[].class));

}catch(Exception ex){
ex.printStackTrace();
Expand All @@ -231,6 +307,23 @@ public void process(InputStream in) throws IOException {
}
});

if (context.getProperty(RETURN_HEADERS).getValue().equals("true")) {
//Write the camel headers into attributes
for (Map.Entry<String, Object> entry : attributes.entrySet()) {
flowfile = session.putAttribute(flowfile, entry.getKey(), entry.getValue().toString());
}
}

if (body.get() != null) {
//To write the body/result back out into flow file
flowfile = session.write(flowfile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(body.get());
}
});
}

session.transfer(flowfile, SUCCESS);

}
Expand Down