forked from github.com/rewardStyle/kinetic w/ a 100+ commit PR merged in
Kinetic is an MIT-licensed high-performance AWS Kinesis Client for Go
Kinetic wraps aws-sdk-go to provide maximum throughput with built-in fault tolerance and retry logic for AWS Kinesis producers and consumers. The Kinetic producer can write to Kinesis or Firehose and the Kinetic consumer can consume stream data from Kinesis using the aws-sdk-go or using the Kinesis client library (written in Java).
Before using kinetic, you should make sure you have a created a Kinesis stream and your configuration file has the credentails necessary to read and write to the stream. Once this stream exists in AWS, kinetic will ensure it is in the "ACTIVE" state before running.
Tests are written using goconvey and kinesalite. Make sure you have kinesalite running locally before attempting to run the tests. They can be run either via the comamnd line:
$ go test -v -cover -race
or via web interface:
$ goconvey
Kinetic can be used to interface with kinesis like so:
import (
"github.com/jyu617/kinetic"
"sync"
)
// Create a kinetic object associated with a local kinesalite stream
k, _ := kinetic.NewKinetic(
kinetic.AwsConfigCredentials("some-access-key", "some-secret-key", "some-security-token"),
kinetic.AwsConfigRegion("some-region"),
kinetic.AwsConfigEndpoint(""http://127.0.0.1:4567""),
)
// Create a kinetic producer
p, _ := kinetic.NewProducer(k.Session.Config, "stream-name")
// Create a kinetic consumer
c, err := kinetic.NewConsumer(k.Session.Config, "stream-name", "shard-name")
// Retrieve one message using the consumer's Retrieve function
msg, err := c.Retrieve()
if err != nil {
println(err)
}
// Using Listen - will block unless sent in goroutine
go c.Listen(func(b []byte, wg *sync.WaitGroup){
defer wg.Done()
println(string(b))
})
// Send a message using the producer
p.Send(&kinetic.Message{
Data: []byte(`{"foo":"bar"}`),
})
For more examples take a look at the tests or the test program in the testexec
directory. For
additional information see the kinetic package documentation here.