@@ -2,68 +2,95 @@ package com.contxt.kinesis
22
33import com .amazonaws .services .kinesis .producer .{ KinesisProducer , KinesisProducerConfiguration , UserRecordResult }
44import com .google .common .util .concurrent .ListenableFuture
5+ import com .typesafe .config .{ Config , ConfigFactory }
56import java .nio .ByteBuffer
67import scala .concurrent ._
78import scala .language .implicitConversions
89import scala .util .Try
10+ import scala .collection .JavaConversions ._
11+ import scala .concurrent .ExecutionContext .Implicits .global
912
1013/** A lightweight Scala wrapper around Kinesis Producer Library (KPL). */
1114trait ScalaKinesisProducer {
1215
16+ def streamId : StreamId
17+
1318 /** Sends a record to a stream. See
1419 * [[[com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(String, String, String, ByteBuffer):ListenableFuture[UserRecordResult]*]]].
1520 */
1621 def send (partitionKey : String , data : ByteBuffer , explicitHashKey : Option [String ] = None ): Future [UserRecordResult ]
1722
18- /** Flushes all the outgoing messages, returning a Future that completes when all the flushed messages have been sent.
19- * See [[com.amazonaws.services.kinesis.producer.KinesisProducer.flushSync ]].
20- */
21- def flushAll (): Future [Unit ]
22-
2323 /** Performs an orderly shutdown, waiting for all the outgoing messages before destroying the underlying producer. */
2424 def shutdown (): Future [Unit ]
2525}
2626
2727object ScalaKinesisProducer {
28- def apply (streamName : String , producerConfig : KinesisProducerConfiguration ): ScalaKinesisProducer = {
29- val producer = new KinesisProducer (producerConfig)
30- new ScalaKinesisProducerImpl (streamName, producer)
28+ def apply (
29+ streamName : String ,
30+ kplConfig : KinesisProducerConfiguration ,
31+ config : Config = ConfigFactory .load()
32+ ): ScalaKinesisProducer = {
33+ val producerStats = ProducerStats .getInstance(config)
34+ ScalaKinesisProducer (streamName, kplConfig, producerStats)
35+ }
36+
37+ def apply (
38+ streamName : String ,
39+ kplConfig : KinesisProducerConfiguration ,
40+ producerStats : ProducerStats
41+ ): ScalaKinesisProducer = {
42+ val streamId = StreamId (kplConfig.getRegion, streamName)
43+ val producer = new KinesisProducer (kplConfig)
44+ new ScalaKinesisProducerImpl (streamId, producer, producerStats)
3145 }
3246
3347 private [kinesis] implicit def listenableToScalaFuture [A ](listenable : ListenableFuture [A ]): Future [A ] = {
34- implicit val executionContext : ExecutionContextExecutor = scala.concurrent.ExecutionContext .global
3548 val promise = Promise [A ]
3649 val callback = new Runnable {
3750 override def run (): Unit = promise.tryComplete(Try (listenable.get()))
3851 }
39- listenable.addListener(callback, executionContext )
52+ listenable.addListener(callback, global )
4053 promise.future
4154 }
4255}
4356
4457private [kinesis] class ScalaKinesisProducerImpl (
45- val streamName : String ,
46- private val producer : KinesisProducer
58+ val streamId : StreamId ,
59+ private val producer : KinesisProducer ,
60+ private val stats : ProducerStats
4761) extends ScalaKinesisProducer {
4862 import ScalaKinesisProducer .listenableToScalaFuture
4963
5064 def send (partitionKey : String , data : ByteBuffer , explicitHashKey : Option [String ]): Future [UserRecordResult ] = {
51- producer.addUserRecord(streamName, partitionKey, explicitHashKey.orNull, data)
52- }
53-
54- def flushAll (): Future [Unit ] = {
55- import scala .concurrent .ExecutionContext .Implicits .global
56- Future {
57- blocking {
58- producer.flushSync()
65+ stats.trackSend(streamId, data.remaining) {
66+ producer.addUserRecord(streamId.streamName, partitionKey, explicitHashKey.orNull, data).map { result =>
67+ if (! result.isSuccessful) throwSendFailedException(result) else result
5968 }
6069 }
6170 }
6271
6372 def shutdown (): Future [Unit ] = {
64- import scala .concurrent .ExecutionContext .Implicits .global
6573 val allFlushedFuture = flushAll()
66- allFlushedFuture.onComplete(_ => producer.destroy())
74+ allFlushedFuture.onComplete { _ =>
75+ producer.destroy()
76+ stats.reportShutdown(streamId)
77+ }
6778 allFlushedFuture
6879 }
80+
81+ private def throwSendFailedException (result : UserRecordResult ): Nothing = {
82+ val attemptCount = result.getAttempts.size
83+ val errorMessage = result.getAttempts.lastOption.map(_.getErrorMessage)
84+ throw new RuntimeException (
85+ s " Sending a record to $streamId failed after $attemptCount attempts, last error message: $errorMessage. "
86+ )
87+ }
88+
89+ private def flushAll (): Future [Unit ] = {
90+ Future {
91+ blocking {
92+ producer.flushSync()
93+ }
94+ }
95+ }
6996}
0 commit comments