Skip to content

Commit

Permalink
Removed outdated comment, renamings
Browse files Browse the repository at this point in the history
  • Loading branch information
andreas-sa-schroeder committed Jan 5, 2019
1 parent 41700af commit 0fb6eb0
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
10 changes: 4 additions & 6 deletions app/src/main/scala/segmenter/Segmenter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Segmenter(config: AppConfig, serdes: SegmenterSerdes) {
BOOTSTRAP_SERVERS_CONFIG -> config.kafka.bootstrapServers,
producerPrefix(ProducerConfig.ACKS_CONFIG) -> "all",
PROCESSING_GUARANTEE_CONFIG -> EXACTLY_ONCE,
COMMIT_INTERVAL_MS_CONFIG -> 500.toString,
COMMIT_INTERVAL_MS_CONFIG -> "500",
NUM_STANDBY_REPLICAS_CONFIG -> "1",
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG -> classOf[LogAndContinueExceptionHandler].getName,
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG -> classOf[SegmenterTimestampExtractor].getName,
Expand Down Expand Up @@ -65,25 +65,23 @@ class Segmenter(config: AppConfig, serdes: SegmenterSerdes) {
private def rndSessionId(): String = Random.alphanumeric.take(8).mkString

// upsert sessions by aggregating events by device.
val sessions: KTable[Device, Session] = events.groupByKey.aggregate[Session](null) { (key, evt, last) =>
val sessions: KTable[Device, Session] = events.groupByKey.aggregate[Session](null) { (key, evt, current) =>
def newSession(completed: Boolean = false) = Session(key, rndSessionId(), evt.timestamp, completed)

evt.event match {
case DeviceWokeUp() => newSession()

case AllDataReceived() =>
if (last != null) last.copy(completed = true, timestamp = evt.timestamp)
if (current != null) current.copy(completed = true, timestamp = evt.timestamp)
else newSession(completed = true)

case _ =>
if (last != null && !last.completed) last.copy(timestamp = evt.timestamp)
if (current != null && !current.completed) current.copy(timestamp = evt.timestamp)
else newSession() // create a new session if no session exists or the previous one was completed
}
}

// eventually purge sessions (completed ones immediately, others via timeout)
// the purger does not really process the output from sessions - it uses the sessions stream as
// clock / time provider to trigger and control purging.
sessions.toStream.process(() => new SessionPurger(), "sessions")

// replace the event key with a session key based on the current session.
Expand Down
7 changes: 4 additions & 3 deletions app/src/main/scala/segmenter/SessionPurger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ class SessionPurger extends Processor[Device, Session] with Punctuator {
purging = context.schedule(Duration.ofSeconds(4), PunctuationType.STREAM_TIME, this)
}

override def process(key: Device, value: Session): Unit = if (value != null && value.completed) sessions.delete(key)

override def close(): Unit = purging.cancel()
override def process(key: Device, session: Session): Unit =
if (session != null && session.completed) sessions.delete(key)

override def punctuate(timestamp: Long): Unit = sessions.all().forEachRemaining { kv =>
val sessionAge = timestamp - kv.value.timestamp.toEpochMilli
if (sessionAge >= timeout) sessions.delete(kv.key)
}

override def close(): Unit = purging.cancel()
}

0 comments on commit 0fb6eb0

Please sign in to comment.