From ba930513b35492f9f51763894bc2db7ce73c8eb6 Mon Sep 17 00:00:00 2001 From: Udi Luxenburg Date: Mon, 9 Mar 2020 17:12:41 +0200 Subject: [PATCH 1/2] support AWS SNS sink --- sink/helper.go | 2 + sink/sns.go | 123 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 sink/sns.go diff --git a/sink/helper.go b/sink/helper.go index 4f1657c2..854db48d 100644 --- a/sink/helper.go +++ b/sink/helper.go @@ -35,6 +35,8 @@ func GetSink(resourceName string) (Sink, error) { return NewSyslog() case "sqs": return NewSQS(resourceName) + case "sns": + return NewSNS() default: return nil, fmt.Errorf("Invalid SINK_TYPE: %s, Valid values: amqp, http, kafka, kinesis, mongodb, nsq, rabbitmq, redis, sqs, stdout, syslog", sinkType) } diff --git a/sink/sns.go b/sink/sns.go new file mode 100644 index 00000000..06a9f13d --- /dev/null +++ b/sink/sns.go @@ -0,0 +1,123 @@ +package sink + +import ( + "strconv" + "time" + + "os" + + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sns" + log "github.com/sirupsen/logrus" +) + +// SNSSink ... +type SNSSink struct { + session *session.Session + sns *sns.SNS + topicArn string + stopCh chan interface{} + putCh chan []byte + workerCount int +} + +// NewSNS ... +func NewSNS() (*SNSSink, error) { + topicArn := os.Getenv("SINK_SNS_TOPIC_ARN") + if topicArn == "" { + return nil, fmt.Errorf("[sink/sns] Missing SINK_SNS_TOPIC_ARN") + } + + workerCountStr := os.Getenv("SINK_SNS_WORKERS") + if workerCountStr == "" { + workerCountStr = "1" + } + + workerCount, err := strconv.Atoi(workerCountStr) + if err != nil { + return nil, fmt.Errorf("invalid SINK_SNS_WORKERS, must be an integer") + } + + sess := session.Must(session.NewSession()) + svc := sns.New(sess) + + return &SNSSink{ + session: sess, + sns: svc, + topicArn: topicArn, + stopCh: make(chan interface{}), + putCh: make(chan []byte, 1000), + workerCount: workerCount, + }, nil +} + +// Start ... +func (s *SNSSink) Start() error { + // Stop chan for all tasks to depend on + s.stopCh = make(chan interface{}) + + for i := 0; i < s.workerCount; i++ { + go s.write(i) + } + + // wait forever for a stop signal to happen + for { + select { + case <-s.stopCh: + break + } + break + } + + return nil +} + +// Stop ... +func (s *SNSSink) Stop() { + log.Infof("[sink/sns] ensure writer queue is empty (%d messages left)", len(s.putCh)) + + for len(s.putCh) > 0 { + log.Info("[sink/sns] Waiting for queue to drain - (%d messages left)", len(s.putCh)) + time.Sleep(1 * time.Second) + } + + close(s.stopCh) +} + +// Put .. +func (s *SNSSink) Put(data []byte) error { + s.putCh <- data + + return nil +} + +func (s *SNSSink) write(id int) { + log.Infof("[sink/sns/%d] Starting writer", id) + + topicArn := aws.String(s.topicArn) + + for { + select { + case data := <-s.putCh: + message := aws.String(string(data)) + putOutput, err := s.sns.Publish(&sns.PublishInput{ + Message: message, + TopicArn: topicArn, + }) + + if err != nil { + log.Errorf("[sink/sns/%d] %s", id, err) + } else { + log.Infof("[sink/sns/%d] %v", id, putOutput) + } + } + } +} + +// Name .. +func (s *SNSSink) Name() string { + return "sns" +} From a11794817ef3542bc85e9d68d6943c3061b29ded Mon Sep 17 00:00:00 2001 From: Udi Luxenburg Date: Mon, 9 Mar 2020 17:30:54 +0200 Subject: [PATCH 2/2] add sns sink --- Gopkg.lock | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Gopkg.lock b/Gopkg.lock index 19e85abf..9f283eb5 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -56,6 +56,7 @@ "private/protocol/xml/xmlutil", "service/kinesis", "service/sqs", + "service/sns", "service/sts", "service/sts/stsiface", ] @@ -496,6 +497,7 @@ "github.com/aws/aws-sdk-go/aws/session", "github.com/aws/aws-sdk-go/service/kinesis", "github.com/aws/aws-sdk-go/service/sqs", + "github.com/aws/aws-sdk-go/service/sns", "github.com/garyburd/redigo/redis", "github.com/hashicorp/consul/api", "github.com/hashicorp/nomad/api",