From ea96cbe2fa29627ba146acf4144b11814f3e00e1 Mon Sep 17 00:00:00 2001 From: dezeroku Date: Sun, 29 Sep 2024 04:36:55 +0200 Subject: [PATCH] go: add small app implementation --- go.mod | 28 +++++++++++++++ go.sum | 38 ++++++++++++++++++++ helpers.go | 65 ++++++++++++++++++++++++++++++++++ main.go | 102 +++++++++++++++++++++++++++++++++++++++++++++++++++++ types.go | 30 ++++++++++++++++ 5 files changed, 263 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 helpers.go create mode 100644 main.go create mode 100644 types.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9c7702e --- /dev/null +++ b/go.mod @@ -0,0 +1,28 @@ +module ses_local_email + +go 1.23.1 + +require ( + github.com/aws/aws-sdk-go-v2 v1.31.0 + github.com/aws/aws-sdk-go-v2/config v1.27.39 + github.com/aws/aws-sdk-go-v2/service/s3 v1.63.3 + github.com/aws/aws-sdk-go-v2/service/sqs v1.35.3 +) + +require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.37 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.18 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.20 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.18 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.23.3 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.31.3 // indirect + github.com/aws/smithy-go v1.21.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..5b7ac97 --- /dev/null +++ b/go.sum @@ -0,0 +1,38 @@ +github.com/aws/aws-sdk-go-v2 v1.31.0 h1:3V05LbxTSItI5kUqNwhJrrrY1BAXxXt0sN0l72QmG5U= +github.com/aws/aws-sdk-go-v2 v1.31.0/go.mod h1:ztolYtaEUtdpf9Wftr31CJfLVjOnD/CVRkKOOYgF8hA= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5 h1:xDAuZTn4IMm8o1LnBZvmrL8JA1io4o3YWNXgohbf20g= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5/go.mod h1:wYSv6iDS621sEFLfKvpPE2ugjTuGlAG7iROg0hLOkfc= +github.com/aws/aws-sdk-go-v2/config v1.27.39 h1:FCylu78eTGzW1ynHcongXK9YHtoXD5AiiUqq3YfJYjU= +github.com/aws/aws-sdk-go-v2/config v1.27.39/go.mod h1:wczj2hbyskP4LjMKBEZwPRO1shXY+GsQleab+ZXT2ik= +github.com/aws/aws-sdk-go-v2/credentials v1.17.37 h1:G2aOH01yW8X373JK419THj5QVqu9vKEwxSEsGxihoW0= +github.com/aws/aws-sdk-go-v2/credentials v1.17.37/go.mod h1:0ecCjlb7htYCptRD45lXJ6aJDQac6D2NlKGpZqyTG6A= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 h1:C/d03NAmh8C4BZXhuRNboF/DqhBkBCeDiJDcaqIT5pA= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14/go.mod h1:7I0Ju7p9mCIdlrfS+JCgqcYD0VXz/N4yozsox+0o078= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 h1:kYQ3H1u0ANr9KEKlGs/jTLrBFPo8P8NaH/w7A01NeeM= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18/go.mod h1:r506HmK5JDUh9+Mw4CfGJGSSoqIiLCndAuqXuhbv67Y= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 h1:Z7IdFUONvTcvS7YuhtVxN99v2cCoHRXOS4mTr0B/pUc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18/go.mod h1:DkKMmksZVVyat+Y+r1dEOgJEfUeA7UngIHWeKsi0yNc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.18 h1:OWYvKL53l1rbsUmW7bQyJVsYU/Ii3bbAAQIIFNbM0Tk= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.18/go.mod h1:CUx0G1v3wG6l01tUB+j7Y8kclA8NSqK4ef0YG79a4cg= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 h1:QFASJGfT8wMXtuP3D5CRmMjARHv9ZmzFUMJznHDOY3w= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5/go.mod h1:QdZ3OmoIjSX+8D1OPAzPxDfjXASbBMDsz9qvtyIhtik= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.20 h1:rTWjG6AvWekO2B1LHeM3ktU7MqyX9rzWQ7hgzneZW7E= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.20/go.mod h1:RGW2DDpVc8hu6Y6yG8G5CHVmVOAn1oV8rNKOHRJyswg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 h1:Xbwbmk44URTiHNx6PNo0ujDE6ERlsCKJD3u1zfnzAPg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20/go.mod h1:oAfOFzUB14ltPZj1rWwRc3d/6OgD76R8KlvU3EqM9Fg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.18 h1:eb+tFOIl9ZsUe2259/BKPeniKuz4/02zZFH/i4Nf8Rg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.18/go.mod h1:GVCC2IJNJTmdlyEsSmofEy7EfJncP7DNnXDzRjJ5Keg= +github.com/aws/aws-sdk-go-v2/service/s3 v1.63.3 h1:3zt8qqznMuAZWDTDpcwv9Xr11M/lVj2FsRR7oYBt0OA= +github.com/aws/aws-sdk-go-v2/service/s3 v1.63.3/go.mod h1:NLTqRLe3pUNu3nTEHI6XlHLKYmc8fbHUdMxAB6+s41Q= +github.com/aws/aws-sdk-go-v2/service/sqs v1.35.3 h1:Lcs658WFW235QuUfpAdxd8RCy8Va2VUA7/U9iIrcjcY= +github.com/aws/aws-sdk-go-v2/service/sqs v1.35.3/go.mod h1:WuGxWQhu2LXoPGA2HBIbotpwhM6T4hAz0Ip/HjdxfJg= +github.com/aws/aws-sdk-go-v2/service/sso v1.23.3 h1:rs4JCczF805+FDv2tRhZ1NU0RB2H6ryAvsWPanAr72Y= +github.com/aws/aws-sdk-go-v2/service/sso v1.23.3/go.mod h1:XRlMvmad0ZNL+75C5FYdMvbbLkd6qiqz6foR1nA1PXY= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3 h1:S7EPdMVZod8BGKQQPTBK+FcX9g7bKR7c4+HxWqHP7Vg= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3/go.mod h1:FnvDM4sfa+isJ3kDXIzAB9GAwVSzFzSy97uZ3IsHo4E= +github.com/aws/aws-sdk-go-v2/service/sts v1.31.3 h1:VzudTFrDCIDakXtemR7l6Qzt2+JYsVqo2MxBPt5k8T8= +github.com/aws/aws-sdk-go-v2/service/sts v1.31.3/go.mod h1:yMWe0F+XG0DkRZK5ODZhG7BEFYhLXi2dqGsv6tX0cgI= +github.com/aws/smithy-go v1.21.0 h1:H7L8dtDRk0P1Qm6y0ji7MCYMQObJ5R9CRpyPhRUkLYA= +github.com/aws/smithy-go v1.21.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..4932490 --- /dev/null +++ b/helpers.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "io" + "log" + "os" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sqs" +) + +func DownloadFile(s3Client *s3.Client, bucketName string, objectKey string, fileName string) error { + log.Printf("Downloading object %v:%v as %v", bucketName, objectKey, fileName) + result, err := s3Client.GetObject(context.TODO(), &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + if err != nil { + return err + } + defer result.Body.Close() + file, err := os.Create(fileName) + if err != nil { + return err + } + defer file.Close() + body, err := io.ReadAll(result.Body) + if err != nil { + return err + } + _, err = file.Write(body) + return err +} + +func DeleteFile(s3Client *s3.Client, bucketName string, objectKey string) error { + log.Printf("Deleting object %v:%v", bucketName, objectKey) + _, err := s3Client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + + return err +} + +func DeleteMessage(sqsClient *sqs.Client, queueUrl string, receiptHandle string) error { + log.Printf("Deleting message %v from queue %v", receiptHandle, queueUrl) + _, err := sqsClient.DeleteMessage(context.TODO(), &sqs.DeleteMessageInput{ + QueueUrl: aws.String(queueUrl), + ReceiptHandle: aws.String(receiptHandle), + }) + + return err +} + +func ReceiveMessage(sqsClient *sqs.Client, queueUrl string, waitTimeSeconds int32) (*sqs.ReceiveMessageOutput, error) { + log.Printf("Polling messages from queue %v", queueUrl) + msgResult, err := sqsClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{ + QueueUrl: aws.String(queueUrl), + WaitTimeSeconds: waitTimeSeconds, + }) + + return msgResult, err +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..1aa4d56 --- /dev/null +++ b/main.go @@ -0,0 +1,102 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "os" + "path" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sqs" +) + +var ( + queueUrlEnvVar = "QUEUE_URL" + bucketNameEnvVar = "BUCKET_NAME" + storagePathEnvVar = "STORAGE_PATH" + idleSleepDurationEnvVar = "IDLE_SLEEP_DURATION" + + addedObjectSuffix = ".eml" +) + +func requireEnvVariable(name string) string { + value, present := os.LookupEnv(name) + if !present { + log.Fatalf("Env variable %s is required, but not present", name) + } + return value +} + +func main() { + queueUrl := requireEnvVariable(queueUrlEnvVar) + bucketName := requireEnvVariable(bucketNameEnvVar) + + storagePathVar := requireEnvVariable(storagePathEnvVar) + storagePath := path.Clean(storagePathVar) + + os.MkdirAll(storagePath, os.ModePerm) + + var idleSleepDuration time.Duration + idleSleepDurationVar, present := os.LookupEnv(idleSleepDurationEnvVar) + if !present { + idleSleepDuration = 300 * time.Second + } else { + var err error + idleSleepDuration, err = time.ParseDuration(idleSleepDurationVar) + if err != nil { + log.Fatal(err) + } + } + + log.Printf("idleSleepDuration set to %v", idleSleepDuration) + + cfg, err := config.LoadDefaultConfig(context.TODO()) + if err != nil { + log.Fatal(err) + } + + s3Client := s3.NewFromConfig(cfg) + sqsClient := sqs.NewFromConfig(cfg) + + for { + msgResult, err := ReceiveMessage(sqsClient, queueUrl, 20) + if err != nil { + log.Fatal(err) + } + + for _, message := range msgResult.Messages { + receiptHandle := *message.ReceiptHandle + log.Printf("New event received: %s", receiptHandle) + + body := *message.Body + var event ObjectCreatedEvent + if err := json.Unmarshal([]byte(body), &event); err != nil { + log.Fatal(err) + } + + objectKey := event.Detail.Object.Key + fileName := objectKey + addedObjectSuffix + + if err := DownloadFile(s3Client, bucketName, objectKey, path.Join(storagePath, fileName)); err != nil { + log.Fatal(err) + } + + if err := DeleteFile(s3Client, bucketName, objectKey); err != nil { + log.Fatal(err) + } + + if err := DeleteMessage(sqsClient, queueUrl, receiptHandle); err != nil { + log.Fatal(err) + } + } + + if len(msgResult.Messages) == 0 { + log.Printf("Idle sleeping for %v", idleSleepDuration) + time.Sleep(idleSleepDuration) + } + } +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..cc6239b --- /dev/null +++ b/types.go @@ -0,0 +1,30 @@ +package main + +type ObjectCreatedEventDetailObject struct { + Key string `json:"key"` + Size int `json:"size"` + Etag string `json:"etag"` + Sequencer string `json:"sequencer"` +} + +type ObjectCreatedEventDetail struct { + Version string `json:"version"` + Bucket map[string]string `json:"bucket"` + Object ObjectCreatedEventDetailObject `json:"object"` + RequestID string `json:"request-id"` + Requester string `json:"requester"` + SourceIPAddress string `json:"source-ip-address"` + Reason string `json:"reason"` +} + +type ObjectCreatedEvent struct { + Version string `json:"version"` + Id string `json:"id"` + DetailType string `json:"detail-type"` + Source string `json:"source"` + Account string `json:"account"` + Time string `json:"time"` + Region string `json:"region"` + Resources []string `json:"resources"` + Detail ObjectCreatedEventDetail `json:"detail"` +}