Skip to content

Commit

Permalink
Convert types.Event -> []byte for sqs dest
Browse files Browse the repository at this point in the history
  • Loading branch information
ejcx committed Aug 27, 2023
1 parent 86a43a4 commit 9fde325
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
4 changes: 2 additions & 2 deletions cmd/kawad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func init() {
loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] {
return &PrinterConfig{}
})
loader.Register("sqs", func() loader.Builder[kawa.Destination[types.Event]] {
loader.Register("sqs", func() loader.Builder[kawa.Destination[[]byte]] {
return &SQSConfig{}
})
loader.Register("s3", func() loader.Builder[kawa.Destination[types.Event]] {
Expand Down Expand Up @@ -121,7 +121,7 @@ func (c *S3Config) Configure() (kawa.Destination[types.Event], error) {
), nil
}

func (c *SQSConfig) Configure() (kawa.Destination[types.Event], error) {
func (c *SQSConfig) Configure() (kawa.Destination[[]byte], error) {
slog.Info("configuring sqs")
return sqs.New(
sqs.WithQueueURL(c.QueueURL),
Expand Down
13 changes: 6 additions & 7 deletions internal/destinations/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
awssqs "github.com/aws/aws-sdk-go/service/sqs"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
batch "github.com/runreveal/kawa/x/batcher"
"github.com/segmentio/ksuid"
)
Expand Down Expand Up @@ -49,7 +48,7 @@ func WithBatchSize(batchSize int) Option {
}

type sqs struct {
batcher *batch.Destination[types.Event]
batcher *batch.Destination[[]byte]

queueURL string
region string
Expand All @@ -68,7 +67,7 @@ func New(opts ...Option) *sqs {
if ret.batchSize == 0 {
ret.batchSize = 100
}
ret.batcher = batch.NewDestination[types.Event](ret,
ret.batcher = batch.NewDestination[[]byte](ret,
batch.FlushLength(ret.batchSize),
batch.FlushFrequency(5*time.Second),
)
Expand All @@ -83,12 +82,12 @@ func (s *sqs) Run(ctx context.Context) error {
return s.batcher.Run(ctx)
}

func (s *sqs) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types.Event]) error {
func (s *sqs) Send(ctx context.Context, ack func(), msgs ...kawa.Message[[]byte]) error {
return s.batcher.Send(ctx, ack, msgs...)
}

// Flush sends the given messages of type kawa.Message[type.Event] to an sqs queue
func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error {
// Flush sends the given messages of type kawa.Message[[]byte] to an sqs queue
func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[[]byte]) error {

var config = &aws.Config{}
if s.accessKeyID != "" && s.accessSecretKey != "" {
Expand All @@ -107,7 +106,7 @@ func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error
for _, msg := range msgs {
messageRequest := &awssqs.SendMessageBatchRequestEntry{
Id: aws.String(ksuid.New().String()),
MessageBody: aws.String(string(msg.Value.RawLog)),
MessageBody: aws.String(string(msg.Value)),
}
if strings.HasSuffix(s.queueURL, ".fifo") {
messageRequest.MessageGroupId = messageRequest.Id
Expand Down

0 comments on commit 9fde325

Please sign in to comment.