Skip to content

Commit

Permalink
[skip ci] wip: redrive from sqs
Browse files Browse the repository at this point in the history
  • Loading branch information
mmalenic committed Nov 7, 2024
1 parent 93565eb commit d051089
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 12 deletions.
48 changes: 48 additions & 0 deletions lib/workload/stateless/stacks/fmannotator/cmd/redrivedlq/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package main

import (
"encoding/json"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-secretsmanager-caching-go/secretcache"
"github.com/umccr/orcabus/lib/workload/stateless/stacks/fmannotator"
"log/slog"
)

const (
TokenIdField = "id_token"
)

var (
secretCache, _ = secretcache.New()
)

// Handler for the redrivedlq function.
func Handler() error {
level, err := fmannotator.GetLogLevel()
if err != nil {
return err
}
slog.SetLogLoggerLevel(level)

config, err := fmannotator.LoadConfig()
if err != nil {
return err
}

secret, err := secretCache.GetSecretString(config.FileManagerSecretName)
if err != nil {
return err
}

secretKeys := make(map[string]string)
err = json.Unmarshal([]byte(secret), &secretKeys)
if err != nil {
return err
}

return fmannotator.RedriveDLQ(&config, secretKeys[TokenIdField])
}

func main() {
lambda.Start(Handler)
}
1 change: 1 addition & 0 deletions lib/workload/stateless/stacks/fmannotator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
type Config struct {
FileManagerEndpoint string `required:"true" split_words:"true"`
FileManagerSecretName string `required:"true" split_words:"true"`
DlqName string `required:"true" split_words:"true"`
}

// LoadConfig Load config from the environment.
Expand Down
9 changes: 5 additions & 4 deletions lib/workload/stateless/stacks/fmannotator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.22

require (
github.com/aws/aws-lambda-go v1.47.0
github.com/aws/aws-sdk-go-v2 v1.32.4
github.com/aws/aws-sdk-go-v2/service/sqs v1.36.4
github.com/aws/aws-secretsmanager-caching-go v1.2.0
github.com/docker/go-connections v0.5.0
github.com/go-testfixtures/testfixtures/v3 v3.10.0
Expand All @@ -28,19 +30,18 @@ require (
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/aws/aws-sdk-go v1.47.10 // indirect
github.com/aws/aws-sdk-go-v2 v1.24.1 // indirect
github.com/aws/aws-sdk-go-v2/config v1.26.6 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.16.16 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.10 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.18.7 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.26.7 // indirect
github.com/aws/smithy-go v1.19.0 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/goterm v1.0.4 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand Down
18 changes: 10 additions & 8 deletions lib/workload/stateless/stacks/fmannotator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,26 @@ github.com/aws/aws-lambda-go v1.47.0 h1:0H8s0vumYx/YKs4sE7YM0ktwL2eWse+kfopsRI1s
github.com/aws/aws-lambda-go v1.47.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A=
github.com/aws/aws-sdk-go v1.47.10 h1:cvufN7WkD1nlOgpRopsmxKQlFp5X1MfyAw4r7BBORQc=
github.com/aws/aws-sdk-go v1.47.10/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v1.24.1 h1:xAojnj+ktS95YZlDf0zxWBkbFtymPeDP+rvUQIH3uAU=
github.com/aws/aws-sdk-go-v2 v1.24.1/go.mod h1:LNh45Br1YAkEKaAqvmE1m8FUx6a5b/V0oAKV7of29b4=
github.com/aws/aws-sdk-go-v2 v1.32.4 h1:S13INUiTxgrPueTmrm5DZ+MiAo99zYzHEFh1UNkOxNE=
github.com/aws/aws-sdk-go-v2 v1.32.4/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo=
github.com/aws/aws-sdk-go-v2/config v1.26.6 h1:Z/7w9bUqlRI0FFQpetVuFYEsjzE3h7fpU6HuGmfPL/o=
github.com/aws/aws-sdk-go-v2/config v1.26.6/go.mod h1:uKU6cnDmYCvJ+pxO9S4cWDb2yWWIH5hra+32hVh1MI4=
github.com/aws/aws-sdk-go-v2/credentials v1.16.16 h1:8q6Rliyv0aUFAVtzaldUEcS+T5gbadPbWdV1WcAddK8=
github.com/aws/aws-sdk-go-v2/credentials v1.16.16/go.mod h1:UHVZrdUsv63hPXFo1H7c5fEneoVo9UXiz36QG1GEPi0=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.11 h1:c5I5iH+DZcH3xOIMlz3/tCKJDaHFwYEmxvlh2fAcFo8=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.11/go.mod h1:cRrYDYAMUohBJUtUnOhydaMHtiK/1NZ0Otc9lIb6O0Y=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.10 h1:vF+Zgd9s+H4vOXd5BMaPWykta2a6Ih0AKLq/X6NYKn4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.10/go.mod h1:6BkRjejp/GR4411UGqkX8+wFMbFbqsUIimfK4XjOKR4=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.10 h1:nYPe006ktcqUji8S2mqXf9c/7NdiKriOwMvWQHgYztw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.10/go.mod h1:6UV4SZkVvmODfXKql4LCbaZUpF7HO2BX38FgBf9ZOLw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 h1:A2w6m6Tmr+BNXjDsr7M90zkWjsu4JXHwrzPg235STs4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23/go.mod h1:35EVp9wyeANdujZruvHiQUAo9E3vbhnIO1mTCAxMlY0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 h1:pgYW9FCabt2M25MoHYCfMrVY2ghiiBKYWUVXfwZs+sU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23/go.mod h1:c48kLgzO19wAu3CPkDWC28JbaJ+hfQlsdl7I2+oqIbk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.3 h1:n3GDfwqF2tzEkXlv5cuy4iy7LpKDtqDMcNLfZDu9rls=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.3/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4 h1:/b31bi3YVNlkzkBrm9LfpaKoaYZUxIAj4sHfOTmLfqw=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4/go.mod h1:2aGXHFmbInwgP9ZfpmdIfOELL79zhdNYNmReK8qDfdQ=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.10 h1:DBYTXwIGQSGs9w4jKm60F5dmCQ3EEruxdc0MFh+3EY4=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.10/go.mod h1:wohMUQiFdzo0NtxbBg0mSRGZ4vL3n0dKjLTINdcIino=
github.com/aws/aws-sdk-go-v2/service/sqs v1.36.4 h1:vo02KRxWcY96S69VoH6096WC4UmuEV/mHbX8Zhvo3y8=
github.com/aws/aws-sdk-go-v2/service/sqs v1.36.4/go.mod h1:YXj6Y1BjZNj1PKi78CX2hBkVpCCuJ0TRtyd6wrKVQ64=
github.com/aws/aws-sdk-go-v2/service/sso v1.18.7 h1:eajuO3nykDPdYicLlP3AGgOyVN3MOlFmZv7WGTuJPow=
github.com/aws/aws-sdk-go-v2/service/sso v1.18.7/go.mod h1:+mJNDdF+qiUlNKNC3fxn74WWNN+sOiGOEImje+3ScPM=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.7 h1:QPMJf+Jw8E1l7zqhZmMlFw6w1NmfkfiSK8mS4zOx3BA=
Expand All @@ -72,8 +74,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.26.7 h1:NzO4Vrau795RkUdSHKEwiR01FaGz
github.com/aws/aws-sdk-go-v2/service/sts v1.26.7/go.mod h1:6h2YuIoxaMSCFf5fi1EgZAwdfkGMgDY+DVfa61uLe4U=
github.com/aws/aws-secretsmanager-caching-go v1.2.0 h1:gUA+CVKvFLj4OUSknhIrnt4dF7Y37+JrChKqfaehJME=
github.com/aws/aws-secretsmanager-caching-go v1.2.0/go.mod h1:6t2/zQIsigFMlnpOdGj503Dgaz24tMqIRhass9uoTBo=
github.com/aws/smithy-go v1.19.0 h1:KWFKQV80DpP3vJrrA9sVAHQ5gc2z8i4EzrLhLlWXcBM=
github.com/aws/smithy-go v1.19.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE=
github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM=
github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/beorn7/perks v0.0.0-20150223135152-b965b613227f/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
20 changes: 20 additions & 0 deletions lib/workload/stateless/stacks/fmannotator/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package fmannotator

import (
"bytes"
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/umccr/orcabus/lib/workload/stateless/stacks/fmannotator/schema/orcabus_workflowmanager/workflowrunstatechange"
"log/slog"
"net/url"
Expand Down Expand Up @@ -44,3 +47,20 @@ func PortalRunId(event workflowrunstatechange.Event, config *Config, token strin

return nil
}

// RedriveDLQ Re-drive failed events from the DLQ.
func RedriveDLQ(annotatorConfig *Config, token string) (err error) {
ctx := context.Background()
sdkConfig, err := config.LoadDefaultConfig(ctx)
if err != nil {
return err
}
sqsClient := sqs.NewFromConfig(sdkConfig)

messages, err := SqsActions(*sqsClient).GetMessages(ctx, annotatorConfig.DlqName, 100, 60)
if err != nil {
return err
}

return nil
}
40 changes: 40 additions & 0 deletions lib/workload/stateless/stacks/fmannotator/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package fmannotator

import (
"context"
"encoding/json"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/umccr/orcabus/lib/workload/stateless/stacks/fmannotator/schema/orcabus_workflowmanager/workflowrunstatechange"
)

// SqsActions encapsulates the Amazon Simple Queue Service (Amazon SQS) actions
// used in the examples.
type SqsActions struct {
SqsClient *sqs.Client
}

// GetMessages uses the ReceiveMessage action to get messages from an Amazon SQS queue.
func (actor SqsActions) GetMessages(ctx context.Context, queueUrl string, maxMessages int32, waitTime int32) ([]workflowrunstatechange.Event, error) {
var messages []workflowrunstatechange.Event
result, err := actor.SqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueUrl),
MaxNumberOfMessages: maxMessages,
WaitTimeSeconds: waitTime,
})
if err != nil {
return nil, err
}

for _, message := range result.Messages {
var event workflowrunstatechange.Event
err := json.Unmarshal([]byte(*message.Body), &event)
if err != nil {
return nil, err
}

messages = append(messages, event)
}

return messages, nil
}

0 comments on commit d051089

Please sign in to comment.