From d9f4fe02e575aa41591c0a543e3ca7452ebbca48 Mon Sep 17 00:00:00 2001 From: SJ Date: Fri, 12 Apr 2019 09:30:45 -0700 Subject: [PATCH] performance improvement for EventHubs writer (#441) --- .../eventhubs/client/EventHubsClient.scala | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala b/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala index 8a01d3ece..a3c6896d6 100644 --- a/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala +++ b/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala @@ -28,9 +28,11 @@ import org.json4s.jackson.Serialization import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ ArrayBuffer, ListBuffer } +import scala.compat.java8.FutureConverters import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{ Await, Future } +import scala.util.{ Failure, Success } /** * A [[Client]] which connects to an event hub instance. All interaction @@ -48,6 +50,8 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf) private implicit val formats = Serialization.formats(NoTypeHints) + private var pendingWorks = new ListBuffer[Future[Any]] + private var _client: EventHubClient = _ private def client = synchronized { @@ -83,17 +87,19 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf) p.putAll(properties.get.asJava) } - if (partition.isDefined) { + val sendTask = if (partition.isDefined) { if (partitionSender.getPartitionId.toInt != partition.get) { logInfo("Recreating partition sender.") createPartitionSender(partition.get) } - partitionSender.sendSync(event) + partitionSender.send(event) } else if (partitionKey.isDefined) { - client.sendSync(event, partitionKey.get) + client.send(event, partitionKey.get) } else { - client.sendSync(event) + client.send(event) } + + pendingWorks += FutureConverters.toScala(sendTask) } /** @@ -177,10 +183,25 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf) */ override def close(): Unit = { logInfo("close: Closing EventHubsClient.") + + Future.sequence(pendingWorks).onComplete { + case Success(_) => cleanup() + case Failure(e) => + logError(s"failed to complete pending tasks. $ehConf: ", e) + cleanup() + + throw e + } + } + + private def cleanup(): Unit = { + pendingWorks.clear() + if (partitionSender != null) { partitionSender.closeSync() partitionSender = null } + if (_client != null) { ClientConnectionPool.returnClient(ehConf, _client) _client = null