Skip to content

Commit 50f0e15

Browse files
author
Aleksey Nikiforov
committed
Added ScalaKinesisProducer implementation.
1 parent 7394e25 commit 50f0e15

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.contxt.kinesis
2+
3+
import com.amazonaws.services.kinesis.producer.{ KinesisProducer, KinesisProducerConfiguration, UserRecordResult }
4+
import com.google.common.util.concurrent.ListenableFuture
5+
import java.nio.ByteBuffer
6+
import scala.concurrent._
7+
import scala.language.implicitConversions
8+
import scala.util.Try
9+
10+
/** A lightweight Scala wrapper around Kinesis Producer Library (KPL). */
11+
trait ScalaKinesisProducer {
12+
13+
/** Sends a record to a stream. See
14+
* [[[com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(String, String, String, ByteBuffer):ListenableFuture[UserRecordResult]*]]].
15+
*/
16+
def send(partitionKey: String, data: ByteBuffer, explicitHashKey: Option[String] = None): Future[UserRecordResult]
17+
18+
/** Flushes all the outgoing messages, returning a Future that completes when all the flushed messages have been sent.
19+
* See [[com.amazonaws.services.kinesis.producer.KinesisProducer.flushSync]].
20+
*/
21+
def flushAll(): Future[Unit]
22+
23+
/** Performs an orderly shutdown, waiting for all the outgoing messages before destroying the underlying producer. */
24+
def shutdown(): Future[Unit]
25+
}
26+
27+
object ScalaKinesisProducer {
28+
def apply(streamName: String, producerConfig: KinesisProducerConfiguration): ScalaKinesisProducer = {
29+
val producer = new KinesisProducer(producerConfig)
30+
new ScalaKinesisProducerImpl(streamName, producer)
31+
}
32+
33+
private[kinesis] implicit def listenableToScalaFuture[A](listenable: ListenableFuture[A]): Future[A] = {
34+
implicit val executionContext: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
35+
val promise = Promise[A]
36+
val callback = new Runnable {
37+
override def run(): Unit = promise.tryComplete(Try(listenable.get()))
38+
}
39+
listenable.addListener(callback, executionContext)
40+
promise.future
41+
}
42+
}
43+
44+
private[kinesis] class ScalaKinesisProducerImpl(
45+
val streamName: String,
46+
private val producer: KinesisProducer
47+
) extends ScalaKinesisProducer {
48+
import ScalaKinesisProducer.listenableToScalaFuture
49+
50+
def send(partitionKey: String, data: ByteBuffer, explicitHashKey: Option[String]): Future[UserRecordResult] = {
51+
producer.addUserRecord(streamName, partitionKey, explicitHashKey.orNull, data)
52+
}
53+
54+
def flushAll(): Future[Unit] = {
55+
import scala.concurrent.ExecutionContext.Implicits.global
56+
Future {
57+
blocking {
58+
producer.flushSync()
59+
}
60+
}
61+
}
62+
63+
def shutdown(): Future[Unit] = {
64+
import scala.concurrent.ExecutionContext.Implicits.global
65+
val allFlushedFuture = flushAll()
66+
allFlushedFuture.onComplete(_ => producer.destroy())
67+
allFlushedFuture
68+
}
69+
}

0 commit comments

Comments
 (0)