Skip to content
This repository has been archived by the owner on May 5, 2022. It is now read-only.

Commit

Permalink
Merge pull request #30 from simno/nakadi-timeouts
Browse files Browse the repository at this point in the history
Nakadi timeouts
  • Loading branch information
dr4ke616 committed Oct 4, 2016
2 parents 768c2ea + 594247e commit 23d1895
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.zalando.react.nakadi

import akka.stream.actor.ActorPublisher
import akka.actor.{ActorLogging, ActorRef, PoisonPill, Props}
import akka.actor.{ActorLogging, ActorRef, PoisonPill, Props, ReceiveTimeout}
import org.zalando.react.nakadi.commit.OffsetMap
import org.zalando.react.nakadi.LeaseManagerActor.Flush
import org.zalando.react.nakadi.client.models.EventStreamBatch
Expand All @@ -10,6 +10,7 @@ import org.zalando.react.nakadi.NakadiActorPublisher.CommitOffsets
import org.zalando.react.nakadi.NakadiMessages.{Offset, StringConsumerMessage}

import scala.annotation.tailrec
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal


Expand All @@ -24,6 +25,7 @@ object NakadiActorPublisher {
}
}

case class NakadiTimeout(error: String) extends Exception(error)

class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManager: ActorRef) extends ActorPublisher[StringConsumerMessage]
with ActorLogging {
Expand All @@ -37,7 +39,7 @@ class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManage
private val partition: String = consumerAndProps.properties.partition
private val client: ActorRef = consumerAndProps.nakadiClient
private var streamSupervisor: Option[ActorRef] = None

private val reconnectTimeout: Duration = consumerAndProps.properties.batchFlushTimeoutInSeconds * 1.5
private val MaxBufferSize = 100
private var buf = Vector.empty[StringConsumerMessage]

Expand All @@ -53,6 +55,7 @@ class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManage
case NakadiActorPublisher.Stop => stop()
case CommitOffsets(offsetMap) => executeCommit(offsetMap)
case NonFatal(err) => onError(err)
case ReceiveTimeout => throw NakadiTimeout(s"No events from Nakadi in the last $reconnectTimeout seconds...")
}

private def registerSupervisor(ref: ActorRef) = {
Expand Down Expand Up @@ -124,6 +127,7 @@ class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManage

def start() = {
if (!isRunning) {
context.setReceiveTimeout(reconnectTimeout)
isRunning = true
client ! ConsumeCommand.Start
}
Expand Down

0 comments on commit 23d1895

Please sign in to comment.