Skip to content

Commit

Permalink
add https connection pool for HttpSink and FirebaseSink
Browse files Browse the repository at this point in the history
  • Loading branch information
bennfocus committed Mar 1, 2019
1 parent 17a7896 commit 227271e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.thenetcircle.event_bus.story.tasks.http

import akka.http.scaladsl.model.HttpHeader.ParsingResult.Ok
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpHeader, HttpRequest}
import akka.http.scaladsl.model._
import com.thenetcircle.event_bus.event.Event
import com.thenetcircle.event_bus.story.interfaces.ITaskBuilder
import com.thenetcircle.event_bus.{AppContext, BuildInfo}
Expand Down Expand Up @@ -68,6 +68,8 @@ class FirebaseSinkBuilder() extends HttpSinkBuilder with ITaskBuilder[FirebaseSi
| expected-response = ""
| allow-extra-signals = false
|
| use-https-connection-pool = true
|
| pool {
| max-connections = 4
| min-connections = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ case class HttpSinkSettings(
requestBufferSize: Int = 100,
expectedResponse: Option[String] = Some("ok"),
allowExtraSignals: Boolean = true,
requestContentType: ContentType.NonBinary = ContentTypes.`text/plain(UTF-8)`
requestContentType: ContentType.NonBinary = ContentTypes.`text/plain(UTF-8)`,
useHttpsConnectionPool: Boolean = false
)

case class RetrySenderSettings(
Expand Down Expand Up @@ -242,33 +243,49 @@ class HttpSink(val settings: HttpSinkSettings) extends ISink with ITaskLogging {

implicit val materializer: Materializer = runningContext.getMaterializer()

val http = Http()(runningContext.getActorSystem())
val host = settings.defaultRequest.uri.authority.host.toString()
val port = settings.defaultRequest.uri.effectivePort

val poolClientFlow = if (settings.connectionPoolSettings.isDefined) {
val connectionPoolSettings = settings.connectionPoolSettings.get
taskLogger.info(
s"Initializing a new HttpSender of $host:$port with connection pool settings: $connectionPoolSettings"
)
http.newHostConnectionPool[Promise[HttpResponse]](host, port, connectionPoolSettings)
} else {
taskLogger.info(s"Initializing a new HttpSender of $host:$port")
http.newHostConnectionPool[Promise[HttpResponse]](host, port)
}
val connectionPool = newHostConnectionPool()

httpSender = Some(
Source
.queue[(HttpRequest, Promise[HttpResponse])](settings.requestBufferSize, OverflowStrategy.backpressure)
.via(poolClientFlow)
.via(connectionPool)
.toMat(Sink.foreach {
case (Success(resp), p) => p.success(resp)
case (Failure(e), p) => p.failure(e)
})(Keep.left)
.run()
)

taskLogger.info(s"The HttpSender of $host:$port is initialized.")
taskLogger.info(s"HttpSender is initialized.")
}

protected def newHostConnectionPool()(
implicit runningContext: TaskRunningContext
): Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] = {
implicit val materializer: Materializer = runningContext.getMaterializer()

val http = Http()(runningContext.getActorSystem())
val host = settings.defaultRequest.uri.authority.host.toString()
val port = settings.defaultRequest.uri.effectivePort

if (settings.connectionPoolSettings.isDefined) {
val connectionPoolSettings = settings.connectionPoolSettings.get
taskLogger.info(
s"Initializing a new HttpSender of $host:$port with connection pool settings: $connectionPoolSettings"
)
if (settings.useHttpsConnectionPool) {
http.newHostConnectionPoolHttps[Promise[HttpResponse]](host, port, settings = connectionPoolSettings)
} else {
http.newHostConnectionPool[Promise[HttpResponse]](host, port, settings = connectionPoolSettings)
}
} else {
taskLogger.info(s"Initializing a new HttpSender of $host:$port")
if (settings.useHttpsConnectionPool) {
http.newHostConnectionPoolHttps[Promise[HttpResponse]](host, port)
} else {
http.newHostConnectionPool[Promise[HttpResponse]](host, port)
}
}
}

protected var retrySender: Option[ActorRef] = None
Expand Down Expand Up @@ -447,6 +464,8 @@ class HttpSinkBuilder() extends ITaskBuilder[HttpSink] with Logging {
| retry-duration = 12 h
| }
|
| use-https-connection-pool = false
|
| # pool settings will override the default settings of akka.http.host-connection-pool
| pool {
| max-connections = 32
Expand Down Expand Up @@ -512,7 +531,8 @@ class HttpSinkBuilder() extends ITaskBuilder[HttpSink] with Logging {
config.as[Int]("concurrent-requests"),
config.as[Int]("request-buffer-size"),
config.as[Option[String]]("expected-response").filter(_.trim != ""),
config.as[Boolean]("allow-extra-signals")
config.as[Boolean]("allow-extra-signals"),
useHttpsConnectionPool = config.as[Boolean]("use-https-connection-pool")
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class FirebaseSinkBuilderTest extends TestBase {
settings.requestBufferSize shouldEqual 100
settings.expectedResponse shouldEqual None
settings.allowExtraSignals shouldEqual false
settings.useHttpsConnectionPool shouldEqual true

settings.useRetrySender shouldEqual true
settings.retrySenderSettings.minBackoff shouldEqual 1.second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class HttpSinkBuilderTest extends TestBase {
settings.requestBufferSize shouldEqual 100
settings.expectedResponse shouldEqual Some("ok")
settings.allowExtraSignals shouldEqual true
settings.useHttpsConnectionPool shouldEqual false

settings.useRetrySender shouldEqual true
settings.retrySenderSettings.minBackoff shouldEqual 1.second
Expand Down

0 comments on commit 227271e

Please sign in to comment.