Skip to content

Commit

Permalink
Add compression, tweaked some settings
Browse files Browse the repository at this point in the history
  • Loading branch information
andreas-sa-schroeder committed Jan 10, 2019
1 parent 0fb6eb0 commit 87edcb0
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
5 changes: 3 additions & 2 deletions app/src/main/scala/segmenter/Segmenter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Segmenter(config: AppConfig, serdes: SegmenterSerdes) {
PROCESSING_GUARANTEE_CONFIG -> EXACTLY_ONCE,
COMMIT_INTERVAL_MS_CONFIG -> "500",
NUM_STANDBY_REPLICAS_CONFIG -> "1",
producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG) -> "lz4",
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG -> classOf[LogAndContinueExceptionHandler].getName,
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG -> classOf[SegmenterTimestampExtractor].getName,
consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG) -> "6000",
Expand Down Expand Up @@ -86,8 +87,8 @@ class Segmenter(config: AppConfig, serdes: SegmenterSerdes) {

// replace the event key with a session key based on the current session.
events
.join(sessions)((evt, session) => (evt, session)) // session lookup.
.map { case (key, (evt, session)) => SessionKey(key, session.sessionId) -> evt }
.join(sessions)((evt, session) => (evt, session.sessionId)) // session lookup.
.map { case (key, (evt, sessionId)) => SessionKey(key, sessionId) -> evt }
.to("session-events")

private def printMetrics(streams: KafkaStreams): Unit = {
Expand Down
8 changes: 5 additions & 3 deletions driver/src/main/scala/driver/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ class Driver(config: AppConfig) {
Map(
CLIENT_ID_CONFIG -> "driver",
BOOTSTRAP_SERVERS_CONFIG -> config.kafka.bootstrapServers,
ENABLE_IDEMPOTENCE_CONFIG -> "true",
RETRIES_CONFIG -> Int.MaxValue.toString,
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> "1",
ACKS_CONFIG -> "all"
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> "5",
ACKS_CONFIG -> "all",
COMPRESSION_TYPE_CONFIG -> "lz4"
)
}

val keys: Seq[Device] = (1 to 5).map(i => Device(s"device-$i"))
val keys: Seq[Device] = (1 to 10).map(i => Device(s"device-$i"))
val metrics: Seq[String] = Seq("pressure", "temperature", "wind", "humidity")

val registry: String = config.kafka.schemaRegistry
Expand Down

0 comments on commit 87edcb0

Please sign in to comment.