Skip to content

Commit 3781129

Browse files
authored
Merge pull request #1 from StreetContxt/initial-impl
Scala Wrapper for KPL
2 parents 7394e25 + ca69050 commit 3781129

File tree

2 files changed

+81
-1
lines changed

2 files changed

+81
-1
lines changed

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,13 @@
11
# kpl-scala
2-
Kinesis Producer Library (KPL) Scala Wrapper
2+
A lightweight Scala wrapper around Kinesis Producer Library (KPL).
3+
4+
The main benefit of this library is working with Scala-native Futures when
5+
interacting with KPL.
6+
7+
## No Message Ordering
8+
Kinesis producer library **does not provide message ordering guarantees** at a reasonable throughput,
9+
see [this ticket](https://github.com/awslabs/amazon-kinesis-producer/issues/23) for more details.
10+
11+
## Integration Tests
12+
This library is tested as part of [kcl-akka-stream](https://github.com/StreetContxt/kcl-akka-stream)
13+
integration tests.
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.contxt.kinesis
2+
3+
import com.amazonaws.services.kinesis.producer.{ KinesisProducer, KinesisProducerConfiguration, UserRecordResult }
4+
import com.google.common.util.concurrent.ListenableFuture
5+
import java.nio.ByteBuffer
6+
import scala.concurrent._
7+
import scala.language.implicitConversions
8+
import scala.util.Try
9+
10+
/** A lightweight Scala wrapper around Kinesis Producer Library (KPL). */
11+
trait ScalaKinesisProducer {
12+
13+
/** Sends a record to a stream. See
14+
* [[[com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(String, String, String, ByteBuffer):ListenableFuture[UserRecordResult]*]]].
15+
*/
16+
def send(partitionKey: String, data: ByteBuffer, explicitHashKey: Option[String] = None): Future[UserRecordResult]
17+
18+
/** Flushes all the outgoing messages, returning a Future that completes when all the flushed messages have been sent.
19+
* See [[com.amazonaws.services.kinesis.producer.KinesisProducer.flushSync]].
20+
*/
21+
def flushAll(): Future[Unit]
22+
23+
/** Performs an orderly shutdown, waiting for all the outgoing messages before destroying the underlying producer. */
24+
def shutdown(): Future[Unit]
25+
}
26+
27+
object ScalaKinesisProducer {
28+
def apply(streamName: String, producerConfig: KinesisProducerConfiguration): ScalaKinesisProducer = {
29+
val producer = new KinesisProducer(producerConfig)
30+
new ScalaKinesisProducerImpl(streamName, producer)
31+
}
32+
33+
private[kinesis] implicit def listenableToScalaFuture[A](listenable: ListenableFuture[A]): Future[A] = {
34+
implicit val executionContext: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
35+
val promise = Promise[A]
36+
val callback = new Runnable {
37+
override def run(): Unit = promise.tryComplete(Try(listenable.get()))
38+
}
39+
listenable.addListener(callback, executionContext)
40+
promise.future
41+
}
42+
}
43+
44+
private[kinesis] class ScalaKinesisProducerImpl(
45+
val streamName: String,
46+
private val producer: KinesisProducer
47+
) extends ScalaKinesisProducer {
48+
import ScalaKinesisProducer.listenableToScalaFuture
49+
50+
def send(partitionKey: String, data: ByteBuffer, explicitHashKey: Option[String]): Future[UserRecordResult] = {
51+
producer.addUserRecord(streamName, partitionKey, explicitHashKey.orNull, data)
52+
}
53+
54+
def flushAll(): Future[Unit] = {
55+
import scala.concurrent.ExecutionContext.Implicits.global
56+
Future {
57+
blocking {
58+
producer.flushSync()
59+
}
60+
}
61+
}
62+
63+
def shutdown(): Future[Unit] = {
64+
import scala.concurrent.ExecutionContext.Implicits.global
65+
val allFlushedFuture = flushAll()
66+
allFlushedFuture.onComplete(_ => producer.destroy())
67+
allFlushedFuture
68+
}
69+
}

0 commit comments

Comments
 (0)