Skip to content

Commit

Permalink
x/taskqueue first implementation draft
Browse files Browse the repository at this point in the history
  • Loading branch information
widmogrod committed Sep 30, 2023
1 parent 72fe0d1 commit f7e30a1
Show file tree
Hide file tree
Showing 7 changed files with 463 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.19
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.6
github.com/aws/aws-sdk-go-v2/service/kinesis v1.18.5
github.com/aws/aws-sdk-go-v2/service/sqs v1.24.5
github.com/bxcodec/faker/v3 v3.8.1
github.com/fatih/structtag v1.2.0
github.com/opensearch-project/opensearch-go/v2 v2.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 h1:0iKliEXAc
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.18.5 h1:naSZmQiFjoTLxNjfDy/KgEnWdG3odkR6gIEgTx21YOM=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.18.5/go.mod h1:0h3hOcyFXyjvI3wGt8C8vk2+II9XxHwFM7zH2KvLHmA=
github.com/aws/aws-sdk-go-v2/service/sqs v1.24.5 h1:RyDpTOMEJO6ycxw1vU/6s0KLFaH3M0z/z9gXHSndPTk=
github.com/aws/aws-sdk-go-v2/service/sqs v1.24.5/go.mod h1:RZBu4jmYz3Nikzpu/VuVvRnTEJ5a+kf36WT2fcl5Q+Q=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10 h1:UBQjaMTCKwyUYwiVnUt6toEJwGXsLBI6al083tpjJzY=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10 h1:PkHIIJs8qvq0e5QybnZoG1K/9QTrLr9OsqCIo59jOBA=
Expand Down
1 change: 1 addition & 0 deletions x/taskqueue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# x/taskqueue - Golang task queue with transactional support
37 changes: 37 additions & 0 deletions x/taskqueue/queue_inmemory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package taskqueue

import (
"context"
"github.com/widmogrod/mkunion/x/schema"
"github.com/widmogrod/mkunion/x/storage/schemaless"
)

func NewInMemoryQueue[T any]() *Queue[T] {
return &Queue[T]{
queue: make(chan Task[T], 100),
}
}

var _ Queuer[any] = (*Queue[any])(nil)

type Queue[T any] struct {
queue chan Task[T]
}

func (q *Queue[T]) Push(ctx context.Context, task Task[T]) error {
q.queue <- task
return nil
}

func (q *Queue[T]) Pop(ctx context.Context) ([]Task[T], error) {
return []Task[T]{<-q.queue}, nil
}

func (*Queue[T]) Delete(ctx context.Context, tasks []Task[schemaless.Record[schema.Schema]]) error {
return nil
}

func (q *Queue[T]) Close() error {
close(q.queue)
return nil
}
121 changes: 121 additions & 0 deletions x/taskqueue/queue_sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package taskqueue

import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/widmogrod/mkunion/x/schema"
"github.com/widmogrod/mkunion/x/storage/schemaless"
)

func NewSQSQueue(c *sqs.Client, queueURL string) *SQSQueue[schemaless.Record[schema.Schema]] {
return &SQSQueue[schemaless.Record[schema.Schema]]{
client: c,
queueURL: queueURL,
}
}

// SQSQueue is a queue that uses AWS SQS as a backend.
type SQSQueue[T any] struct {
client *sqs.Client
queueURL string
}

var _ Queuer[any] = (*SQSQueue[any])(nil)

func (queue *SQSQueue[T]) Push(ctx context.Context, task Task[T]) error {
schemed := schema.FromGo(task.Data)
body, err := schema.ToJSON(schemed)
if err != nil {
return fmt.Errorf("sqsQueue.Push: ToJSON=%w", err)
}

bodyStr := string(body)

var messageGroupId *string
if groupId, ok := task.Meta["SQS.MessageGroupId"]; ok {
if groupId != "" {
messageGroupId = &groupId
}
}

msg := &sqs.SendMessageInput{
MessageBody: &bodyStr,
QueueUrl: &queue.queueURL,
MessageGroupId: messageGroupId,
MessageDeduplicationId: &task.ID,
}

output, err := queue.client.SendMessage(ctx, msg)
if err != nil {
return fmt.Errorf("sqsQueue.Push: SendMessage=%w", err)
}

_ = output
_ = output.MessageId
_ = output.SequenceNumber

return nil
}

func (queue *SQSQueue[T]) Pop(ctx context.Context) ([]Task[T], error) {
output, err := queue.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: &queue.queueURL,
ReceiveRequestAttemptId: nil,
})
if err != nil {
return nil, fmt.Errorf("sqsQueue.Pop: ReceiveMessage=%w", err)
}

var tasks []Task[T]
for _, message := range output.Messages {
schemed, err := schema.FromJSON([]byte(*message.Body))
if err != nil {
return nil, fmt.Errorf("sqsQueue.Pop: FromJSON=%w", err)
}

data, err := schema.ToGoG[T](schemed, nil)
if err != nil {
return nil, fmt.Errorf("sqsQueue.Pop: ToGo=%w", err)
}

task := Task[T]{
ID: *message.MessageId,
Data: data,
Meta: map[string]string{
"SQS.ReceiptHandle": *message.ReceiptHandle,
},
}
tasks = append(tasks, task)
}

return tasks, nil
}

func (queue *SQSQueue[T]) Delete(ctx context.Context, tasks []Task[schemaless.Record[schema.Schema]]) error {
if len(tasks) == 0 {
return nil
}

var entries []types.DeleteMessageBatchRequestEntry
for _, task := range tasks {
receiptHandle, ok := task.Meta["SQS.ReceiptHandle"]
if !ok {
return fmt.Errorf("sqsQueue.Delete: missing SQS.ReceiptHandle in taskID=%s", task.ID)
}
entries = append(entries, types.DeleteMessageBatchRequestEntry{
Id: &task.ID,
ReceiptHandle: &receiptHandle,
})
}
_, err := queue.client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{
Entries: entries,
QueueUrl: &queue.queueURL,
})
if err != nil {
return fmt.Errorf("sqsQueue.Delete: DeleteMessageBatch=%w", err)
}

return nil
}
128 changes: 128 additions & 0 deletions x/taskqueue/taskqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package taskqueue

import (
"context"
"github.com/widmogrod/mkunion/x/schema"
"github.com/widmogrod/mkunion/x/storage/predicate"
"github.com/widmogrod/mkunion/x/storage/schemaless"
"time"
)

func NewTaskQueue(desc *Description, queue Queuer[schemaless.Record[schema.Schema]], find Repository, proc Processor[schemaless.Record[schema.Schema]]) *TaskQueue {
return &TaskQueue{
desc: desc,
queue: queue,
find: find,
proc: proc,
}
}

type Queuer[T any] interface {
Push(ctx context.Context, task Task[T]) error
Pop(ctx context.Context) ([]Task[T], error)
Delete(ctx context.Context, tasks []Task[schemaless.Record[schema.Schema]]) error
}

type Repository interface {
FindingRecords(query schemaless.FindingRecords[schemaless.Record[schema.Schema]]) (schemaless.PageResult[schemaless.Record[schema.Schema]], error)
}

type Processor[T any] interface {
Process(task Task[T]) error
}

type TaskQueue struct {
desc *Description
queue Queuer[schemaless.Record[schema.Schema]]
find Repository
proc Processor[schemaless.Record[schema.Schema]]
}

func (q *TaskQueue) RunSelector(ctx context.Context) error {
for {
var after = &schemaless.FindingRecords[schemaless.Record[schema.Schema]]{
RecordType: q.desc.Entity,
Where: predicate.MustWhere(q.desc.Filter, predicate.ParamBinds{}),
Limit: 10,
}

for {
records, err := q.find.FindingRecords(*after)
if err != nil {
return err
}

for _, record := range records.Items {
err := q.queue.Push(ctx, Task[schemaless.Record[schema.Schema]]{
Data: record,
})
if err != nil {
panic(err)
return err
}
}

if !records.HasNext() {
break
}

after = records.Next
}

time.Sleep(1 * time.Second)
}
}

func (q *TaskQueue) RunProcessor(ctx context.Context) error {
for {
tasks, err := q.queue.Pop(ctx)
if err != nil {
panic(err)
return err
}

for _, task := range tasks {
err = q.proc.Process(task)
if err != nil {
panic(err)
return err
}
}
err = q.queue.Delete(ctx, tasks)
if err != nil {
panic(err)
return err
}
}
}

type Description struct {
Change []string
Entity string
Filter string
}

type Task[T any] struct {
ID string
Data T
Meta map[string]string
}

type FunctionProcessor[T any] struct {
f func(task Task[schemaless.Record[T]])
}

func (proc *FunctionProcessor[T]) Process(task Task[schemaless.Record[schema.Schema]]) error {
t, err := schemaless.RecordAs[T](task.Data)
if err != nil {
panic(err)
}

proc.f(Task[schemaless.Record[T]]{
Data: t,
})

return nil
}

var _ Processor[schemaless.Record[schema.Schema]] = &FunctionProcessor[schemaless.Record[schema.Schema]]{}
Loading

0 comments on commit f7e30a1

Please sign in to comment.