diff --git a/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala b/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala index d518186..ab1c2d1 100644 --- a/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala +++ b/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala @@ -1,7 +1,7 @@ package org.zalando.react.nakadi import akka.stream.actor.ActorPublisher -import akka.actor.{ActorLogging, ActorRef, PoisonPill, Props} +import akka.actor.{ActorLogging, ActorRef, PoisonPill, Props, ReceiveTimeout} import org.zalando.react.nakadi.commit.OffsetMap import org.zalando.react.nakadi.LeaseManagerActor.Flush import org.zalando.react.nakadi.client.models.EventStreamBatch @@ -10,6 +10,7 @@ import org.zalando.react.nakadi.NakadiActorPublisher.CommitOffsets import org.zalando.react.nakadi.NakadiMessages.{Offset, StringConsumerMessage} import scala.annotation.tailrec +import scala.concurrent.duration.Duration import scala.util.control.NonFatal @@ -24,6 +25,7 @@ object NakadiActorPublisher { } } +case class NakadiTimeout(error: String) extends Exception(error) class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManager: ActorRef) extends ActorPublisher[StringConsumerMessage] with ActorLogging { @@ -37,7 +39,7 @@ class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManage private val partition: String = consumerAndProps.properties.partition private val client: ActorRef = consumerAndProps.nakadiClient private var streamSupervisor: Option[ActorRef] = None - + private val reconnectTimeout: Duration = consumerAndProps.properties.batchFlushTimeoutInSeconds * 1.5 private val MaxBufferSize = 100 private var buf = Vector.empty[StringConsumerMessage] @@ -53,6 +55,7 @@ class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManage case NakadiActorPublisher.Stop => stop() case CommitOffsets(offsetMap) => executeCommit(offsetMap) case NonFatal(err) => onError(err) + case ReceiveTimeout => throw NakadiTimeout(s"No events from Nakadi in the last $reconnectTimeout seconds...") } private def registerSupervisor(ref: ActorRef) = { @@ -124,6 +127,7 @@ class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManage def start() = { if (!isRunning) { + context.setReceiveTimeout(reconnectTimeout) isRunning = true client ! ConsumeCommand.Start }