Skip to content

An aggregated records producer for Amazon Kinesis

License

Notifications You must be signed in to change notification settings

Countingup/kinesis-producer

 
 

Repository files navigation

Amazon kinesis producer Build status License GoDoc

A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK
and using the same aggregation format that KPL use.

Useful links

Example

package main

import (
	"context"
	"log"
	"time"

	"github.com/sirupsen/logrus"
	"github.com/Countingup/kinesis-producer"
	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/kinesis"
)

func main() {
	cfg, err := config.LoadDefaultConfig(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	client := kinesis.NewFromConfig(cfg)
	pr := producer.New(&producer.Config{
		StreamName:   "test",
		BacklogCount: 2000,
		Client:       client,
	})

	pr.Start()

	// Handle failures
	go func() {
		for r := range pr.NotifyFailures() {
			// r contains `Data`, `PartitionKey` and `Error()`
			log.Error(r)
		}
	}()

	go func() {
		for i := 0; i < 5000; i++ {
			err := pr.Put([]byte("foo"), "bar")
			if err != nil {
				log.WithError(err).Fatal("error producing")
			}
		}
	}()

	time.Sleep(3 * time.Second)
	pr.Stop()
}

Specifying logger implementation

producer.Config takes an optional logging.Logger implementation.

Using a custom logger
customLogger := &CustomLogger{}

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       customLogger,
}

Using logrus

import (
	"github.com/sirupsen/logrus"
	producer "github.com/Countingup/kinesis-producer"
	"github.com/Countingup/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       loggers.Logrus(log),
}

kinesis-producer ships with three logger implementations.

  • producer.Standard used the standard library logger
  • loggers.Logrus uses logrus logger
  • loggers.Zap uses zap logger

License

MIT

About

An aggregated records producer for Amazon Kinesis

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 99.5%
  • Makefile 0.5%