Skip to content

Commit

Permalink
add support for eventhubs
Browse files Browse the repository at this point in the history
  • Loading branch information
Don Tregonning committed Apr 18, 2019
1 parent ca7f6d8 commit 2e389b5
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 47 deletions.
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ repositories {
dependencies {
compile 'args4j:args4j-site:2.0.25'
compile 'args4j:args4j:2.33'
compile 'org.apache.kafka:kafka-clients:2.0.0'
compile 'org.apache.kafka:kafka-clients:2.2.0'
compile 'org.slf4j:slf4j-simple:1.7.5'
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.9.1'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.9.1'
// https://mvnrepository.com/artifact/com.microsoft.azure/azure-eventhubs
compile group: 'com.microsoft.azure', name: 'azure-eventhubs', version: '2.3.0'
compile group: 'com.google.code.gson', name: 'gson', version: '2.3.1'
}

jar {
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Tue Sep 12 13:53:46 PDT 2017
#Thu Apr 11 13:17:47 PDT 2019
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-all.zip
3 changes: 3 additions & 0 deletions out/production/main/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Manifest-Version: 1.0
Main-Class: generator.DataGenerator

21 changes: 21 additions & 0 deletions out/production/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
<File name="data-gen-log" fileName="data-gen.log">
<PatternLayout>
<Pattern>%d %p %c{1.} [%t] %m%n</Pattern>
</PatternLayout>
</File>
</Appenders>
<Loggers>
<Logger name="generator" level="info" additivity="false">
<appender-ref ref="data-gen-log" level="info"/>
</Logger>>
<Root level="error">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
33 changes: 29 additions & 4 deletions src/main/java/generator/CommandLineParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,29 @@ public class CommandLineParams {
@Option(name="-worker-thread-count", usage="Event generating worker threads, default = 4")
public String workerThreadCount;

@Option(name="-output-kafka", usage="output to Kafka, default = true")
public String outputToKafka;

@Option(name="-output-stdout", usage="output to STDOUT, default = true")
public String outputToStdout;

@Option(name="-output-eventhubs", usage="output to Eventhubs, default = false")
public String outputToEventhubs;

//EventHub Specific Options
@Option(name="-eventhub.name", usage="EventHub name")
public String eventHubName;

@Option(name="-eventhub.namespace", usage="EventHub namespace name")
public String eventHubNameSpace;

@Option(name="-eventhub.saskeyname", usage="Sas Key name used for EventHubs")
public String eventHubSaskeyname;

@Option(name="-eventhub.saskey", usage="Sas Key used for EventHubs")
public String eventHubSaskey;

//Kafka Specific Options
@Option(name="-topic", usage="(Required)Kafka Topic to send messages to")
public String topic;

Expand All @@ -46,9 +69,6 @@ public class CommandLineParams {
@Option(name="-event-format", usage="event format selector, default = json")
public String eventFormat;

@Option(name="-output-stdout", usage="output to STDOUT, default = true")
public String outputToStdout;

@Option(name="-generate-kafka-headers", usage="Will generate kafka headers for testing, default = false")
public String includeKafkaHeaders;

Expand All @@ -58,6 +78,10 @@ public class CommandLineParams {
public String toString() {
return "[Command Line Parameters]"
+ "{ message-count: " + messageCount
+ ", eventhub.name: " + eventHubName
+ ", eventhub.namespace: " + eventHubNameSpace
+ ", eventhub.saskey: " + eventHubSaskey
+ ", message-size: " + messageSize
+ ", message-size: " + messageSize
+ ", topic : " + topic
+ ", eps: " + eps
Expand All @@ -70,8 +94,9 @@ public String toString() {
+ ", kafka-buffer-memory: " + kafkaBufferMemory
+ ", event-format: " + eventFormat
+ ", output-stdout: " + outputToStdout
+ ", output-eventhubs: " + outputToEventhubs
+ ", include-kafka-headers: " + includeKafkaHeaders
+ ", header-gen-profile" + headerGenProfile
+ "}";
}
}
}
53 changes: 42 additions & 11 deletions src/main/java/generator/DataGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,46 @@ public static void main(String[] args) {
System.exit(2);
}

if (params.outputToStdout == "false") {
// Check for required parameters
if (params.bootStrapServers == null || params.topic == null) {
logger.error("Missing required commandline parameter - quiting kafka-data-gen - Exit Status 1");
System.exit(1);
}

if (params.outputToEventhubs == params.outputToKafka) {
logger.error("Output to Kafka and Output to EventHubs can't be enabled/disabled at the same time");
System.exit(1);
}

//Set defaults for non required params. And log printout of value default or configured
if(params.workerThreadCount == null) { params.workerThreadCount = "4";}
if(params.eps == null) { params.eps = "0";}
if(params.eventFormat == null) { params.eventFormat = "json";}
if(params.messageSize == null) { params.messageSize = "256";}
if(params.messageCount == null) { params.messageSize = "10000";}
if(params.includeKafkaHeaders == null) { params.includeKafkaHeaders = "false";}
if(params.headerGenProfile == null) { params.headerGenProfile = "-1";}

Properties props = new Properties();

if (Boolean.parseBoolean(params.outputToKafka) == true) {
// Check for required parameters
if (params.bootStrapServers == null || params.topic == null) {
logger.error("Missing required commandline parameter - quiting kafka-data-gen - Exit Status 1");
System.exit(1);
}

if(params.includeKafkaHeaders == null) { params.includeKafkaHeaders = "false";}
if(params.headerGenProfile == null) { params.headerGenProfile = "-1";}
props = parseKafkaArguments(params, props);

}
if (Boolean.parseBoolean(params.outputToEventhubs) == true) {
// Check for required parameters
if (params.eventHubName == null || params.eventHubNameSpace == null
|| params.eventHubSaskey == null || params.eventHubSaskeyname == null) {
logger.error("Missing required commandline parameters for eventhub - quiting kafka-data-gen - Exit Status 1");
System.exit(1);
}
System.out.println("Configuring EventHubs Props");
props = parseEventHubsArguments(params, props);
}

logger.info(params);

//Create and configure Kafka Producer variables. Store in Properties object
Properties props = new Properties();
props = parseKafkaArguments(params, props);

EPSToken epsToken = new EPSToken();

Expand Down Expand Up @@ -99,6 +117,19 @@ public static void main(String[] args) {
logger.info("Program run and complete without interruption");
}

public static Properties parseEventHubsArguments(CommandLineParams params, Properties props) {
try {
props.put("eventhub.name", params.eventHubName);
props.put("eventhub.namespace", params.eventHubNameSpace);
props.put("eventhub.saskeyname", params.eventHubSaskeyname);
props.put("eventhub.saskey", params.eventHubSaskey);
} catch (java.lang.NullPointerException e) {
logger.error(e.getMessage());
logger.error("Config Value Not provided");
}
return props;
}

public static Properties parseKafkaArguments(CommandLineParams params, Properties props) {
try {
props.put("bootstrap.servers", params.bootStrapServers);
Expand Down
124 changes: 96 additions & 28 deletions src/main/java/generator/EPSThread.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
package generator;

import org.apache.kafka.common.header.Headers;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.Random;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

class EPSThread implements Runnable {
private static Logger logger = LogManager.getLogger(EPSThread.class);
private static EPSToken epsTokenObj;
Expand Down Expand Up @@ -51,7 +65,7 @@ public void run() {
logger.info("Total Message count reached, cleaning up program for exit.");
}

} else if(thrd.getName().compareTo("MetricsCalculatorThread") == 0) {
} else if(thrd.getName().compareTo("MetricsCalculatorThread") == 0 && Boolean.parseBoolean(params.outputToKafka) == true) {
if(Boolean.parseBoolean(params.outputToStdout) != true) {
do {
Thread.sleep(5000);
Expand All @@ -60,34 +74,66 @@ public void run() {
}
}
else {
if(Boolean.parseBoolean(params.outputToStdout) == true) {
do {
if (epsTokenObj.takeToken()) {
shipEvent(epsTokenObj, params);
if(Boolean.parseBoolean(params.outputToKafka) == true) {
if (Boolean.parseBoolean(params.outputToStdout) == true) {
do {
if (epsTokenObj.takeToken()) {
shipEvent(epsTokenObj, params);
}
} while (epsTokenObj.complete() == false);
} else {
Producer<String, String> producer = new KafkaProducer<>(props);
if (!metricsCalc.addProducer(producer)) {
logger.warn("Error adding producer for metrics Calculator, Metric Calculations may be incorrect" + thrd.getName());
}
} while (epsTokenObj.complete() == false);
}
else {
Producer<String, String> producer = new KafkaProducer<>(props);
if (!metricsCalc.addProducer(producer)) {
logger.warn("Error adding producer for metrics Calculator, Metric Calculations may be incorrect" + thrd.getName());

do {
if (epsTokenObj.takeToken()) {
shipEvent(producer, epsTokenObj, params);
}
} while (epsTokenObj.complete() == false);

producer.close();
}
}
else if(Boolean.parseBoolean(params.outputToEventhubs) == true){
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName(props.getProperty("eventhub.namespace"))
.setEventHubName(props.getProperty("eventhub.name"))
.setSasKeyName(props.getProperty("eventhub.saskeyname"))
.setSasKey(props.getProperty("eventhub.saskey"));
logger.info(connStr);


final Gson gson = new GsonBuilder().create();

// The Executor handles all asynchronous tasks and this is passed to the EventHubClient instance.
// This enables the user to segregate their thread pool based on the work load.
// This pool can then be shared across multiple EventHubClient instances.
// The following sample uses a single thread executor, as there is only one EventHubClient instance,
// handling different flavors of ingestion to Event Hubs here.
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);

// Each EventHubClient instance spins up a new TCP/SSL connection, which is expensive.
// It is always a best practice to reuse these instances. The following sample shows this.
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);

do {
if (epsTokenObj.takeToken()) {
shipEvent(producer, epsTokenObj, params);
shipEventhubEvent(ehClient, epsTokenObj, params);
}
} while (epsTokenObj.complete() == false);

producer.close();
ehClient.closeSync();
executorService.shutdown();
}
}
} catch (InterruptedException exc) {
} catch (InterruptedException | EventHubException | IOException exc ) {
System.out.println("Thread Interrupted");
}
}

public static void shipEvent(Producer<String, String> producer,EPSToken epsTokenObj , CommandLineParams params) {
public static void shipEvent(Producer<String, String> producer, EPSToken epsTokenObj, CommandLineParams params) {
int sequenceNumber = epsTokenObj.getMessageKeyAndInc();

//TODO: Smarter Live Logging, hardcoded 10000 value. 10% of total messages?
Expand All @@ -106,6 +152,40 @@ public static void shipEvent(Producer<String, String> producer,EPSToken epsToken
}
}

public static void shipEventhubEvent(EventHubClient ehClient, EPSToken epsTokenObj , CommandLineParams params) {
try {
int sequenceNumber = epsTokenObj.getMessageKeyAndInc();
if(sequenceNumber % 100000 == 0) { logger.info("Current message with sequence number: " + sequenceNumber); }
byte[] event = createEvent(params, sequenceNumber);

//byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
EventData sendEvent = EventData.create(event);

// Send - not tied to any partition
// Event Hubs service will round-robin the events across all Event Hubs partitions.
// This is the recommended & most reliable way to send to Event Hubs.
ehClient.sendSync(sendEvent);

logger.debug("Event batched" + event.toString());
} catch (Exception e) {
e.printStackTrace();
}
}

public static void shipEvent(EPSToken epsTokenObj , CommandLineParams params) {
int sequenceNumber = epsTokenObj.getMessageKeyAndInc();

if(sequenceNumber % 100000 == 0) { logger.info("Current message with sequence number: " + sequenceNumber); }

byte[] event = createEvent(params, sequenceNumber);
try {
String s = new String(event);
System.out.println(s);
} catch (Exception e) {
e.printStackTrace();
}
}

public static ProducerRecord<String, String> includeKafkaHeaders(ProducerRecord<String, String> record, int sequenceNumber) {
Random random = new Random();
//same header values
Expand Down Expand Up @@ -208,19 +288,7 @@ else if(Integer.parseInt(params.headerGenProfile) == -1) {
return record;
}

public static void shipEvent(EPSToken epsTokenObj , CommandLineParams params) {
int sequenceNumber = epsTokenObj.getMessageKeyAndInc();

if(sequenceNumber % 100000 == 0) { logger.info("Current message with sequence number: " + sequenceNumber); }

byte[] event = createEvent(params, sequenceNumber);
try {
String s = new String(event);
System.out.println(s);
} catch (Exception e) {
e.printStackTrace();
}
}

public static byte[] createEvent(CommandLineParams params, int eventKey) {
String s = "";
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/generator/MetricsCalculator.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public double getKafkaProducerMetrics(String metricName, String metricGroup) {
String name = entry.getKey().name();
String group = entry.getKey().group();
if (name.equalsIgnoreCase(metricName) && group.equalsIgnoreCase(metricGroup)) {
consolidatedMetricValue += entry.getValue().value();
Object metricValue = entry.getValue().metricValue();
if (metricValue instanceof Double)
consolidatedMetricValue += (Double)metricValue;
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/main/main.iml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/java" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

0 comments on commit 2e389b5

Please sign in to comment.