Skip to content

Commit 581b1bb

Browse files
authored
Merge pull request #9 from StreetContxt/readme-how-to
Readme Usage Section (and minor code tweaks)
2 parents bef8419 + 29d3785 commit 581b1bb

File tree

2 files changed

+55
-4
lines changed

2 files changed

+55
-4
lines changed

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,45 @@ The main benefit of this library is working with Scala-native Futures when
55
interacting with KPL.
66

77

8+
## Installation
9+
10+
```
11+
resolvers in ThisBuild += Resolver.bintrayRepo("streetcontxt", "maven")
12+
libraryDependencies += "com.contxt" %% "kpl-scala" % "1.0.4"
13+
```
14+
15+
16+
## Usage
17+
18+
Here is a simple app that initializes the Kinesis producer and sends a string message.
19+
20+
```
21+
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
22+
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration
23+
import com.contxt.kinesis.ScalaKinesisProducer
24+
import java.nio.ByteBuffer
25+
import scala.concurrent.Await
26+
import scala.concurrent.duration._
27+
28+
object Main {
29+
def main(args: Array[String]): Unit = {
30+
val producerConfig = new KinesisProducerConfiguration()
31+
.setCredentialsProvider(new DefaultAWSCredentialsProviderChain)
32+
.setRegion("us-east-1")
33+
34+
val producer = ScalaKinesisProducer("myStream", producerConfig)
35+
36+
val sendFuture = producer.send(
37+
partitionKey = "myKey",
38+
data = ByteBuffer.wrap("myMessage".getBytes("UTF-8"))
39+
)
40+
Await.result(sendFuture, 10.seconds)
41+
Await.result(producer.shutdown(), Duration.Inf)
42+
}
43+
}
44+
```
45+
46+
847
## Amazon Licensing Restrictions
948
**KPL license is not compatible with open source licenses!** See
1049
[this discussion](https://issues.apache.org/jira/browse/LEGAL-198) for more details.

src/main/scala/com/contxt/kinesis/ScalaKinesisProducer.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,17 @@ private[kinesis] class ScalaKinesisProducerImpl(
7171
}
7272
}
7373

74-
def shutdown(): Future[Unit] = {
74+
def shutdown(): Future[Unit] = shutdownOnce
75+
76+
private lazy val shutdownOnce: Future[Unit] = {
7577
val allFlushedFuture = flushAll()
78+
val shutdownPromise = Promise[Unit]
7679
allFlushedFuture.onComplete { _ =>
77-
producer.destroy()
78-
stats.reportShutdown(streamId)
80+
shutdownPromise.completeWith(destroyProducer())
7981
}
80-
allFlushedFuture
82+
val combinedFuture = allFlushedFuture.zip(shutdownPromise.future).map(_ => ())
83+
combinedFuture.onComplete(_ => stats.reportShutdown(streamId))
84+
combinedFuture
8185
}
8286

8387
private def throwSendFailedException(result: UserRecordResult): Nothing = {
@@ -95,4 +99,12 @@ private[kinesis] class ScalaKinesisProducerImpl(
9599
}
96100
}
97101
}
102+
103+
private def destroyProducer(): Future[Unit] = {
104+
Future {
105+
blocking {
106+
producer.destroy()
107+
}
108+
}
109+
}
98110
}

0 commit comments

Comments
 (0)