Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit interval #76

Merged
merged 4 commits into from
Jul 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ RUN ./gradlew distTar && \
tar xf build/distributions/*.tar && \
rm build/distributions/*.tar

FROM confluentinc/cp-base:3.3.1
FROM confluentinc/cp-base:4.1.0

MAINTAINER Nivethika M <[email protected]> , Joris Borgdorff <[email protected]>
MAINTAINER Nivethika M <[email protected]> , Joris Borgdorff <[email protected]> , Yatharth Ranjan <[email protected]>

LABEL description="RADAR-CNS Backend streams and monitor"

Expand Down
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ plugins {
//---------------------------------------------------------------------------//

group = 'org.radarcns'
version = '0.2.4-SNAPSHOT'
version = '0.3.0-SNAPSHOT'

ext.description = 'Kafka backend for processing device data.'

Expand All @@ -25,9 +25,9 @@ sourceCompatibility = '1.8'

ext.boundaryVersion = '1.0.6'
ext.codacyVersion = '1.0.10'
ext.confluentVersion = '3.3.1'
ext.confluentVersion = '4.1.0'
ext.hamcrestVersion = '1.3'
ext.kafkaVersion = '0.11.0.2'
ext.kafkaVersion = '1.1.0'
ext.jacksonVersion='2.8.5'
ext.javaMailVersion = '1.5.6'
ext.junitVersion = '4.12'
Expand Down
8 changes: 6 additions & 2 deletions gradle/codacy.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@ configurations {
codacy
}

jacoco {
toolVersion = "0.8.1"
}

dependencies {
codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: '2.0.1'
codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: '4.0.1'
}

jacocoTestReport {
executionData test, integrationTest
reports {
xml.enabled true
csv.enabled false
html.enabled true
}
executionData test, integrationTest
}

task sendCoverageToCodacy(type: JavaExec, dependsOn: jacocoTestReport) {
Expand Down
2 changes: 1 addition & 1 deletion gradle/test.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies {
integrationTestImplementation group: 'org.slf4j', name: 'slf4j-log4j12', version: slf4jVersion

// For Topic name validation based on Kafka classes
testImplementation (group: 'org.apache.kafka', name: 'kafka_2.11', version: kafkaVersion) {
testCompile (group: 'org.apache.kafka', name: 'kafka_2.11', version: kafkaVersion) {
exclude group: 'org.apache.kafka', module: 'kafka-clients'
exclude group: 'net.sf.jopt-simple'
exclude group: 'com.yammer.metrics'
Expand Down
1 change: 0 additions & 1 deletion radar.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ broker:

#Kafka internal parameters
stream_properties:
auto_commit_interval_ms: 1000
max.request.size: 3500042 #Set message.max.bytes for kafka brokers higher than or equal to this value
retries: 15
session_timeout_ms: 20000
Expand Down
32 changes: 24 additions & 8 deletions src/integrationTest/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
---
version: '2'

networks:
kafka:
driver: bridge
services:
#---------------------------------------------------------------------------#
# Zookeeper Cluster #
#---------------------------------------------------------------------------#
zookeeper-1:
image: confluentinc/cp-zookeeper:3.3.1
image: confluentinc/cp-zookeeper:4.1.0
networks:
- kafka
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
Expand All @@ -19,9 +24,11 @@ services:
# Kafka Cluster #
#---------------------------------------------------------------------------#
kafka-1:
image: confluentinc/cp-kafka:3.3.1
image: confluentinc/cp-kafka:4.1.0
depends_on:
- zookeeper-1
networks:
- kafka
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
Expand All @@ -33,9 +40,11 @@ services:
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

kafka-2:
image: confluentinc/cp-kafka:3.3.1
image: confluentinc/cp-kafka:4.1.0
depends_on:
- zookeeper-1
networks:
- kafka
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
Expand All @@ -47,9 +56,11 @@ services:
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

kafka-3:
image: confluentinc/cp-kafka:3.3.1
image: confluentinc/cp-kafka:4.1.0
depends_on:
- zookeeper-1
networks:
- kafka
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
Expand All @@ -64,12 +75,14 @@ services:
# Schema Registry #
#---------------------------------------------------------------------------#
schema-registry-1:
image: confluentinc/cp-schema-registry:3.3.1
image: confluentinc/cp-schema-registry:4.1.0
depends_on:
- zookeeper-1
- kafka-1
- kafka-2
- kafka-3
networks:
- kafka
restart: always
ports:
- "8081:8081"
Expand All @@ -83,12 +96,14 @@ services:
# REST proxy #
#---------------------------------------------------------------------------#
rest-proxy-1:
image: confluentinc/cp-kafka-rest:3.3.1
image: confluentinc/cp-kafka-rest:4.1.0
depends_on:
- kafka-1
- kafka-2
- kafka-3
- schema-registry-1
networks:
- kafka
ports:
- "8082:8082"
environment:
Expand All @@ -109,8 +124,9 @@ services:
depends_on:
- kafka-1
- schema-registry-1
command:
- integrationTest
networks:
- kafka
command: integrationTest
volumes:
- ../../../build/jacoco:/code/build/jacoco
- ../../../build/reports:/code/build/reports
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public SourceStatisticsMonitor(RadarPropertyHandler radar,
SourceStatisticsMonitorConfig config) {
super(radar, config.getTopics(), Objects.requireNonNull(config.getName(),
"Source statistics monitor must have a name"), "1-"
+ config.getOutputTopic(),
+ config.getOutputTopic() + UUID.randomUUID(),
new SourceStatisticsState());

if (getStateStore() == null) {
Expand Down
42 changes: 41 additions & 1 deletion src/main/java/org/radarcns/stream/GeneralStreamGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected Collection<StreamDefinition> createWindowedSensorStream(String input,
Collection<StreamDefinition> streams = Arrays.stream(TimeWindowMetadata.values())
.map(w -> new StreamDefinition(
new KafkaTopic(input), new KafkaTopic(w.getTopicLabel(outputBase)),
w.getIntervalInMilliSec()))
w.getIntervalInMilliSec(), getCommitIntervalForTimeWindow(w)))
.collect(Collectors.toList());

topicNames.addAll(streams.stream()
Expand All @@ -135,6 +135,25 @@ public void addTopicNames(Collection<String> topicNames) {
this.topicNames.addAll(topicNames);
}

public long getCommitIntervalForTimeWindow(TimeWindowMetadata metadata) {
switch (metadata) {
case ONE_DAY:
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_DAY.getCommitInterval();
case ONE_MIN:
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_MIN.getCommitInterval();
case TEN_MIN:
return CommitInterval.COMMIT_INTERVAL_FOR_TEN_MIN.getCommitInterval();
case ONE_HOUR:
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_HOUR.getCommitInterval();
case ONE_WEEK:
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_WEEK.getCommitInterval();
case TEN_SECOND:
return CommitInterval.COMMIT_INTERVAL_FOR_TEN_SECOND.getCommitInterval();
default:
return CommitInterval.COMMIT_INTERVAL_DEFAULT.getCommitInterval();
}
}

@Override
public Collection<StreamDefinition> getStreamDefinition(String inputTopic) {
Collection<StreamDefinition> topic = topicMap.get(inputTopic);
Expand All @@ -150,4 +169,25 @@ public List<String> getTopicNames() {
topicList.sort(String.CASE_INSENSITIVE_ORDER);
return topicList;
}

public enum CommitInterval {
COMMIT_INTERVAL_FOR_TEN_SECOND(10_000L),
COMMIT_INTERVAL_FOR_ONE_MIN(30_000L),
COMMIT_INTERVAL_FOR_TEN_MIN(300_000L),
COMMIT_INTERVAL_FOR_ONE_HOUR(1800_000L),
COMMIT_INTERVAL_FOR_ONE_DAY(86400_000L),
COMMIT_INTERVAL_FOR_ONE_WEEK(86400_000L),
COMMIT_INTERVAL_DEFAULT(30_000L);

private final long commitInterval;


CommitInterval(long commitInterval) {
this.commitInterval = commitInterval;
}

public long getCommitInterval() {
return commitInterval;
}
}
}
7 changes: 6 additions & 1 deletion src/main/java/org/radarcns/stream/KStreamWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
Expand Down Expand Up @@ -126,8 +127,12 @@ protected Properties getStreamProperties(@Nonnull StreamDefinition definition) {
localClientId += '-' + window.sizeMs + '-' + window.advanceMs;
}

return kafkaProperty.getStreamProperties(localClientId, numThreads,
Properties props = kafkaProperty.getStreamProperties(localClientId, numThreads,
DeviceTimestampExtractor.class);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
String.valueOf(definition.getCommitIntervalMs()));

return props;
}

/**
Expand Down
31 changes: 28 additions & 3 deletions src/main/java/org/radarcns/stream/StreamDefinition.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.radarcns.stream;

import static org.radarcns.stream.GeneralStreamGroup.CommitInterval.COMMIT_INTERVAL_DEFAULT;
import static org.radarcns.util.Comparison.compare;

import java.util.Objects;
Expand All @@ -24,10 +25,12 @@
import org.apache.kafka.streams.kstream.TimeWindows;
import org.radarcns.topic.KafkaTopic;


public class StreamDefinition implements Comparable<StreamDefinition> {
private final KafkaTopic inputTopic;
private final KafkaTopic outputTopic;
private final TimeWindows window;
private final long commitIntervalMs;

/**
* Constructor. It takes in input the topic name to be consumed and to topic name where the
Expand All @@ -36,7 +39,7 @@ public class StreamDefinition implements Comparable<StreamDefinition> {
* @param output output {@link KafkaTopic}
*/
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output) {
this(input, output, 0L);
this(input, output, 0L, COMMIT_INTERVAL_DEFAULT.getCommitInterval());
}

/**
Expand All @@ -47,24 +50,41 @@ public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output) {
* @param window time window for aggregation.
*/
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output, long window) {
this(input, output, window == 0 ? null : TimeWindows.of(window));
this(input, output, window == 0 ? null : TimeWindows.of(window),
COMMIT_INTERVAL_DEFAULT.getCommitInterval());
}

/**
* Constructor. It takes in input the topic name to be consumed and to topic name where the
* related stream will write the computed values.
* @param input source {@link KafkaTopic}
* @param output output {@link KafkaTopic}
* @param window time window for aggregation.
* @param commitIntervalMs The commit.interval.ms config for the stream
*/
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output, long window,
long commitIntervalMs) {
this(input, output, window == 0 ? null : TimeWindows.of(window), commitIntervalMs);
}


/**
* Constructor. It takes in input the topic name to be consumed and to topic name where the
* related stream will write the computed values.
* @param input source {@link KafkaTopic}
* @param output output {@link KafkaTopic}
* @param window time window for aggregation.
* @param commitIntervalMs The commit.interval.ms config for the stream
*/
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output,
@Nullable TimeWindows window) {
@Nullable TimeWindows window, @Nonnull long commitIntervalMs) {
Objects.requireNonNull(input);
Objects.requireNonNull(output);

this.inputTopic = input;
this.outputTopic = output;
this.window = window;
this.commitIntervalMs = commitIntervalMs;
}

@Nonnull
Expand Down Expand Up @@ -94,6 +114,11 @@ public TimeWindows getTimeWindows() {
return window;
}

@Nullable
public long getCommitIntervalMs(){
return commitIntervalMs;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down