Skip to content

Commit

Permalink
Merge pull request #2 from kaiwaehner/master
Browse files Browse the repository at this point in the history
Sync with upstream
  • Loading branch information
jukkakarvanen authored Apr 3, 2019
2 parents 93759fb + 1179c72 commit 21f3a43
Show file tree
Hide file tree
Showing 67 changed files with 1,709 additions and 640 deletions.
19 changes: 19 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Eclipse
.classpath
.project
.settings/

# Intellij
.idea/
*.iml
*.iws

# Mac
.DS_Store

# Maven
log/
target/

# Visual Studio Code
.vscode/
165 changes: 165 additions & 0 deletions dl4j-deeplearning-iris/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.kaiwaehner.kafka.streams.machinelearning</groupId>
<artifactId>dl4j-deeplearning-iris</artifactId>
<version>CP51_AK21</version>

<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>

<properties>
<java.version>1.8</java.version>
<kafka.version>2.1.0</kafka.version>
<kafka.scala.version>2.11</kafka.scala.version>
<scala.version>${kafka.scala.version}.8</scala.version>
<confluent.version>5.1.0</confluent.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>

<!-- Kafka Streams API is the only dependency you need for building Streams
applications. The only other thing is the specific Machine Learning dependencies
for the models you want to build and / or deploy -->

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>

<!-- DeepLearning4J needs the following dependencies: 1) deeplearning4j-core,
which contains the neural network implementations 2) nd4j-native-platform,
the CPU version of the ND4J library that powers DL4J 3) datavec-api - Datavec
is our library vectorizing and loading data -->

<!-- ND4J backend. You need one in every DL4J project. Normally define
artifactId as either "nd4j-native-platform" or "nd4j-cuda-7.5-platform" -->
<!-- https://mvnrepository.com/artifact/org.nd4j/nd4j-native-platform -->
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-native-platform</artifactId>
<version>1.0.0-beta3</version>
</dependency>

<!-- Core DL4J functionality -->
<!-- https://mvnrepository.com/artifact/org.deeplearning4j/deeplearning4j-core -->
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>deeplearning4j-core</artifactId>
<version>1.0.0-beta3</version>
</dependency>

<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>deeplearning4j-modelimport</artifactId>
<version>1.0.0-beta3</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.version}</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${confluent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${confluent.version}</version>
<!-- Required for e.g. schema registry's RestApp -->
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!--force java 8 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<!--package as one fat jar -->
<!-- this example does not contain actual application yet
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.2</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>com.github.megachucky.kafka.streams.machinelearning.StreamsStarterApp</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
-->
</plugins>
</build>
</project>
18 changes: 18 additions & 0 deletions dl4j-deeplearning-iris/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Machine Learning + Kafka Streams Examples

General info in main [Readme](../readme.md)

### Example 3 - Iris Prediction using a Neural Network with DeepLearning4J (DL4J)
**Use Case**

Iris Species Prediction using a Neural Network.
This is a famous example: Prediction of the Iris Species - implemented with many different ML algorithms. Here I use DeepLearning4J (DL4J) to build a neural network using Iris Dataset.

**Machine Learning Technology**
* [DeepLearning4J](https://deeplearning4j.org)
* Pretty simple example to demo how to build, save and load neural networks with DL4J. [MultiLayerNetwork](https://deeplearning4j.org/doc/org/deeplearning4j/nn/multilayer/MultiLayerNetwork.html) and [INDArray](http://nd4j.org/doc/org/nd4j/linalg/api/ndarray/INDArray.html) are the key APIs to look at if you want to understand the details.
* The model is created via [DeepLearning4J_CSV_Model.java](src/main/java/com/github/megachucky/kafka/streams/machinelearning/models/DeepLearning4J_CSV_Model.java) and stored in the resources: [DL4J_Iris_Model.zip](https://github.com/kaiwaehner/kafka-streams-machine-learning-examples/tree/master/src/main/resources/generatedModels/DL4J). No need to re-train, just for reference. Kudos to Adam Gibson who created this example as part of the DL4J project.

**Unit Test**
[Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java](src/test/java/com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java)

File renamed without changes.
5 changes: 5 additions & 0 deletions dl4j-deeplearning-iris/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%p %m (%c:%L) %n
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.github.jukkakarvanen.kafka.streams.integration.utils;

import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/** This is helper class to workaround for Failing stream tests in Windows environment KAFKA-6647.
*
* @author Jukka Karvanen
*
* The causing issue is https://issues.apache.org/jira/browse/KAFKA-6647
* Replacing EmbeddedKafkaCluster with TestEmbeddedKafkaCluster will catch and ignore the exception
* happening during the tear down of the test
* The exception does not have affect to functionality
*/

public class TestEmbeddedKafkaCluster extends EmbeddedKafkaCluster {
private static final Logger log = LoggerFactory.getLogger(TestEmbeddedKafkaCluster.class);

public TestEmbeddedKafkaCluster(int numBrokers) {
super(numBrokers);
}

public TestEmbeddedKafkaCluster(int numBrokers, Properties brokerConfig) {
super(numBrokers, brokerConfig);
}

public TestEmbeddedKafkaCluster(int numBrokers, Properties brokerConfig, long mockTimeMillisStart) {
super(numBrokers, brokerConfig, mockTimeMillisStart);
}

public TestEmbeddedKafkaCluster(int numBrokers, Properties brokerConfig, long mockTimeMillisStart, long mockTimeNanoStart) {
super(numBrokers, brokerConfig, mockTimeMillisStart, mockTimeNanoStart);
}

public void after() {
try {
super.after();
} catch (RuntimeException e) {
log.warn("Ignoring exception, test failing in Windows due this exception {}", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.github.jukkakarvanen.kafka.streams.integration.utils;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/** This is helper class to workaround for Failing stream tests in Windows environment KAFKA-6647.
*
* @author Jukka Karvanen
*
* The causing issue is https://issues.apache.org/jira/browse/KAFKA-6647
* Replacing KafkaStreams with TestKafkaStreams will catch and ignore the exception caused by cleanUp
* The exception does not have affect to functionality
*/

public class TestKafkaStreams extends KafkaStreams {
private static final Logger log = LoggerFactory.getLogger(TestKafkaStreams.class);

public TestKafkaStreams(Topology topology, Properties props) {
super(topology, props);
}

public TestKafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier) {
super(topology, props, clientSupplier);
}

public TestKafkaStreams(Topology topology, Properties props, Time time) {
super(topology, props, time);
}

public TestKafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier, Time time) {
super(topology, props, clientSupplier, time);
}

public void cleanUp() {
try {
super.cleanUp();
} catch (RuntimeException e) {
log.warn("Ignoring exception, test failing in Windows due this exception {}", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Properties;
import java.util.stream.Stream;

import com.github.jukkakarvanen.kafka.streams.integration.utils.TestEmbeddedKafkaCluster;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
Expand Down Expand Up @@ -43,7 +44,7 @@
public class Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest {

@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
public static final EmbeddedKafkaCluster CLUSTER = new TestEmbeddedKafkaCluster(1);

private static final String inputTopic = "IrisInputTopic";
private static final String outputTopic = "IrisOutputTopic";
Expand Down
Loading

0 comments on commit 21f3a43

Please sign in to comment.