File tree Expand file tree Collapse file tree 1 file changed +16
-4
lines changed
src/main/scala/com/contxt/kinesis Expand file tree Collapse file tree 1 file changed +16
-4
lines changed Original file line number Diff line number Diff line change @@ -71,13 +71,17 @@ private[kinesis] class ScalaKinesisProducerImpl(
7171 }
7272 }
7373
74- def shutdown (): Future [Unit ] = {
74+ def shutdown (): Future [Unit ] = shutdownOnce
75+
76+ private lazy val shutdownOnce : Future [Unit ] = {
7577 val allFlushedFuture = flushAll()
78+ val shutdownPromise = Promise [Unit ]
7679 allFlushedFuture.onComplete { _ =>
77- producer.destroy()
78- stats.reportShutdown(streamId)
80+ shutdownPromise.completeWith(destroyProducer())
7981 }
80- allFlushedFuture
82+ val combinedFuture = allFlushedFuture.zip(shutdownPromise.future).map(_ => ())
83+ combinedFuture.onComplete(_ => stats.reportShutdown(streamId))
84+ combinedFuture
8185 }
8286
8387 private def throwSendFailedException (result : UserRecordResult ): Nothing = {
@@ -95,4 +99,12 @@ private[kinesis] class ScalaKinesisProducerImpl(
9599 }
96100 }
97101 }
102+
103+ private def destroyProducer (): Future [Unit ] = {
104+ Future {
105+ blocking {
106+ producer.destroy()
107+ }
108+ }
109+ }
98110}
You can’t perform that action at this time.
0 commit comments