Skip to content
This repository has been archived by the owner on Sep 15, 2023. It is now read-only.

Commit

Permalink
Adding customer classification stream + persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
hhiden committed May 31, 2018
1 parent 695a9d6 commit 642673e
Show file tree
Hide file tree
Showing 11 changed files with 476 additions and 1 deletion.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ all.csv
large.csv
medium.csv

/demand-level-kstream/nbproject/private/
/demand-level-kstream/nbproject/private/
/customer-profile-kstream/nbproject/private/
86 changes: 86 additions & 0 deletions customer-profile-kstream/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.redhat.datastreaming</groupId>
<artifactId>clnr-demo</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<groupId>com.redhat</groupId>
<artifactId>customer-profile-kstream</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>

<build>
<finalName>demand-level</finalName>

<resources>
<resource>
<directory>src/main/config-openshift</directory>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<proc>none</proc>
</configuration>
</plugin>

</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.wildfly.swarm</groupId>
<artifactId>microprofile</artifactId>
</dependency>

<dependency>
<groupId>org.wildfly.swarm</groupId>
<artifactId>ejb</artifactId>
</dependency>

<dependency>
<groupId>org.aerogear.kafka</groupId>
<artifactId>kafka-cdi-extension</artifactId>
</dependency>
</dependencies>

<profiles>
<profile>
<id>openshift</id>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>fabric8-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>resource</goal>
<goal>build</goal>
</goals>
</execution>
</executions>
<configuration>
<resources>
<labels>
<service>
<expose>true</expose>
</service>
</labels>
</resources>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
13 changes: 13 additions & 0 deletions customer-profile-kstream/src/main/fabric8/deployment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: v1
kind: Deployment
metadata:
name: ${project.artifactId}
spec:
template:
spec:
containers:
- env:
- name: KAFKA_SERVICE_HOST
value: "my-cluster-kafka"
- name: KAFKA_SERVICE_PORT
value: "9092"
12 changes: 12 additions & 0 deletions customer-profile-kstream/src/main/fabric8/topic-cm.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: profile.out
labels:
strimzi.io/kind: topic
strimzi.io/cluster: my-cluster
data:
name: profile.out
partitions: "2"
replicas: "1"

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.redhat.demo.clnr;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.util.Date;
import javax.enterprise.context.ApplicationScoped;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.stream.JsonParser;
import org.aerogear.kafka.cdi.annotation.KafkaConfig;
import org.aerogear.kafka.cdi.annotation.KafkaStream;
import org.aerogear.kafka.serialization.CafdiSerdes;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.WindowStore;

/**
* This class attaches to a KStream of MeterReadings and aggregates the data into customer profiles
*
* @author hhiden
*/
@ApplicationScoped
@KafkaConfig(bootstrapServers = "#{KAFKA_SERVICE_HOST}:#{KAFKA_SERVICE_PORT}")
public class CustomerAggregatorBean {

private final ObjectMapper mapper = new ObjectMapper();

@KafkaStream(input = "ingest.api.out", output = "profile.out")
public KStream<String, String> demandStream(final KStream<String, JsonObject> source) {

return source
.selectKey((key, value) -> value.getString("customerId"))
.map((key, value) -> {
MeterReading mr = new MeterReading();
mr.setCustomerId(value.getString("customerId"));
mr.setTimestamp(value.getString("timestamp"));
mr.setValue(value.getJsonNumber("kWh").doubleValue());
return new KeyValue<>(key, mr);
})

.groupByKey(Serialized.with(new Serdes.StringSerde(), new MeterReadingSerializer()))
/*.windowedBy(TimeWindows.of(24 * 60 * 60 * 1000).until(96 * 60 * 60 * 1000))*/
.aggregate(()->new CustomerRecord(), (k, v, a)-> a.update(v), CafdiSerdes.Generic(CustomerRecord.class))

/*
.aggregate(() -> new CustomerRecord(), (k, v, a) -> a.update(v),
Materialized.<String, CustomerRecord, WindowStore<Bytes, byte[]>>as("sum-store")
.withValueSerde(CafdiSerdes.Generic(CustomerRecord.class))
.withKeySerde(Serdes.String()))
*/
.toStream().map((k, v)->{
String json = "";
try {
json = mapper.writeValueAsString(v);
} catch (Exception e){
e.printStackTrace();
}
return new KeyValue<>(v.customerId, json);
});


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.redhat.demo.clnr;

import java.io.Serializable;
import java.text.NumberFormat;
import java.util.Date;
import java.util.HashMap;

/**
* Holds summary data for a specific customer
* @author hhiden
*/
public class CustomerRecord implements Serializable {
public static final long serialVersionUID = 0L;
public String customerId;
public HashMap<Integer, Double> hourBins = new HashMap<>();
public Date windowStart;

public CustomerRecord() {
initHourBins();
}

public CustomerRecord(String customerId) {
this.customerId = customerId;
initHourBins();
}

public CustomerRecord update(MeterReading reading){
try {
if(customerId==null){
customerId = reading.customerId;
}
int hour = reading.getHourOfDay();
double existing = hourBins.get(hour);
hourBins.put(hour, existing + reading.value);
return this;
} catch (Exception e){
System.out.println(e.getMessage());
return this;
}
}

private void initHourBins(){
for(int i=0;i<24;i++){
hourBins.put(i, 0.0);
}
}

@Override
public String toString() {
NumberFormat fmt = NumberFormat.getNumberInstance();
fmt.setMinimumIntegerDigits(1);
fmt.setMinimumFractionDigits(4);
fmt.setMaximumFractionDigits(4);
StringBuilder builder = new StringBuilder();
builder.append(customerId);
builder.append(":");
for(int i=0;i<hourBins.size();i++){
if(i>0){
builder.append(",");
}
builder.append(fmt.format(hourBins.get(i)));
}
return builder.toString();
}

public Date getWindowStart() {
return windowStart;
}

public void setWindowStart(Date windowStart) {
this.windowStart = windowStart;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.redhat.demo.clnr;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.util.logging.Logger;
import javax.ejb.Stateless;
import javax.enterprise.context.ApplicationScoped;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.stream.JsonParser;
import org.aerogear.kafka.cdi.annotation.Consumer;
import org.aerogear.kafka.cdi.annotation.KafkaConfig;

/**
* Attach to a KStream and persist data into MongoDB
* @author hhiden
*/
@ApplicationScoped
@KafkaConfig(bootstrapServers = "#{KAFKA_SERVICE_HOST}:#{KAFKA_SERVICE_PORT}")
public class CustomerRecordMongoPersist {
private static final Logger logger = Logger.getLogger(CustomerRecordMongoPersist.class.getName());

ObjectMapper mapper = new ObjectMapper();


public CustomerRecordMongoPersist() {
}

/** Respond to customer profile updates */
@Consumer(topics = "profile.out", groupId = "1")
public void onMessage(String key, String value){
try {
CustomerRecord record = mapper.readValue(value, CustomerRecord.class);
logger.info(record.toString());

} catch(Exception e){
e.printStackTrace();
}

}
}
Loading

0 comments on commit 642673e

Please sign in to comment.