From 27954334ea8a1878c2230061b9e2dabc070d93bb Mon Sep 17 00:00:00 2001 From: Simon McGloin Date: Fri, 30 Sep 2016 16:15:28 +0100 Subject: [PATCH 1/3] Throw an exception if Nakadi doesn't send an event within the configured timeout. --- .../org/zalando/react/nakadi/NakadiActorPublisher.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala b/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala index d518186..7e4b3da 100644 --- a/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala +++ b/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala @@ -1,7 +1,7 @@ package org.zalando.react.nakadi import akka.stream.actor.ActorPublisher -import akka.actor.{ActorLogging, ActorRef, PoisonPill, Props} +import akka.actor.{ActorLogging, ActorRef, PoisonPill, Props, ReceiveTimeout} import org.zalando.react.nakadi.commit.OffsetMap import org.zalando.react.nakadi.LeaseManagerActor.Flush import org.zalando.react.nakadi.client.models.EventStreamBatch @@ -10,6 +10,8 @@ import org.zalando.react.nakadi.NakadiActorPublisher.CommitOffsets import org.zalando.react.nakadi.NakadiMessages.{Offset, StringConsumerMessage} import scala.annotation.tailrec +import scala.concurrent.duration._ +import scala.language.postfixOps import scala.util.control.NonFatal @@ -24,6 +26,7 @@ object NakadiActorPublisher { } } +case class NakadiTimeout(error: String) extends Exception(error) class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManager: ActorRef) extends ActorPublisher[StringConsumerMessage] with ActorLogging { @@ -37,6 +40,8 @@ class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManage private val partition: String = consumerAndProps.properties.partition private val client: ActorRef = consumerAndProps.nakadiClient private var streamSupervisor: Option[ActorRef] = None + private val reconnectTimeout: Duration = consumerAndProps.properties.batchFlushTimeoutInSeconds * 1.5 + context.setReceiveTimeout(reconnectTimeout) private val MaxBufferSize = 100 private var buf = Vector.empty[StringConsumerMessage] @@ -53,6 +58,7 @@ class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManage case NakadiActorPublisher.Stop => stop() case CommitOffsets(offsetMap) => executeCommit(offsetMap) case NonFatal(err) => onError(err) + case ReceiveTimeout => throw NakadiTimeout(s"No events from Nakadi in the last $reconnectTimeout seconds...") } private def registerSupervisor(ref: ActorRef) = { From da13da6fdcd02908440dbcfdd5bdf62e4d89fc26 Mon Sep 17 00:00:00 2001 From: Simon McGloin Date: Fri, 30 Sep 2016 16:17:29 +0100 Subject: [PATCH 2/3] Removing unneeded imports --- .../scala/org/zalando/react/nakadi/NakadiActorPublisher.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala b/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala index 7e4b3da..6618896 100644 --- a/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala +++ b/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala @@ -10,8 +10,7 @@ import org.zalando.react.nakadi.NakadiActorPublisher.CommitOffsets import org.zalando.react.nakadi.NakadiMessages.{Offset, StringConsumerMessage} import scala.annotation.tailrec -import scala.concurrent.duration._ -import scala.language.postfixOps +import scala.concurrent.duration.Duration import scala.util.control.NonFatal From 594247eb4201b4e0e1856245dd260fa384d1332c Mon Sep 17 00:00:00 2001 From: Simon McGloin Date: Tue, 4 Oct 2016 10:43:18 +0100 Subject: [PATCH 3/3] Moved timeout config to start method --- .../scala/org/zalando/react/nakadi/NakadiActorPublisher.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala b/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala index 6618896..ab1c2d1 100644 --- a/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala +++ b/src/main/scala/org/zalando/react/nakadi/NakadiActorPublisher.scala @@ -40,8 +40,6 @@ class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManage private val client: ActorRef = consumerAndProps.nakadiClient private var streamSupervisor: Option[ActorRef] = None private val reconnectTimeout: Duration = consumerAndProps.properties.batchFlushTimeoutInSeconds * 1.5 - context.setReceiveTimeout(reconnectTimeout) - private val MaxBufferSize = 100 private var buf = Vector.empty[StringConsumerMessage] @@ -129,6 +127,7 @@ class NakadiActorPublisher(consumerAndProps: ReactiveNakadiConsumer, leaseManage def start() = { if (!isRunning) { + context.setReceiveTimeout(reconnectTimeout) isRunning = true client ! ConsumeCommand.Start }