Skip to content

Commit

Permalink
feat: cas scaling ph 2 (s3 batching) (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 authored May 20, 2024
1 parent b6ea0be commit 9d90e04
Show file tree
Hide file tree
Showing 20 changed files with 266 additions and 96 deletions.
11 changes: 10 additions & 1 deletion cmd/cas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"github.com/joho/godotenv"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"

"github.com/ceramicnetwork/go-cas"
"github.com/ceramicnetwork/go-cas/common/aws/config"
"github.com/ceramicnetwork/go-cas/common/aws/ddb"
"github.com/ceramicnetwork/go-cas/common/aws/queue"
"github.com/ceramicnetwork/go-cas/common/aws/storage"
"github.com/ceramicnetwork/go-cas/common/db"
"github.com/ceramicnetwork/go-cas/common/loggers"
"github.com/ceramicnetwork/go-cas/common/metrics"
Expand Down Expand Up @@ -53,10 +55,16 @@ func main() {
// HTTP clients
dynamoDbClient := dynamodb.NewFromConfig(awsCfg)
sqsClient := sqs.NewFromConfig(awsCfg)
s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
o.UsePathStyle = true
})

stateDb := ddb.NewStateDb(serverCtx, logger, dynamoDbClient)
jobDb := ddb.NewJobDb(serverCtx, logger, dynamoDbClient)

casBucket := "ceramic-" + os.Getenv(cas.Env_Env) + "-cas"
batchStore := storage.NewS3Store(serverCtx, logger, s3Client, casBucket)

discordHandler, err := notifs.NewDiscordHandler(logger)
if err != nil {
logger.Fatalf("error creating discord handler: %v", err)
Expand Down Expand Up @@ -127,6 +135,7 @@ func main() {
// The Ready and Batch queues will need larger visibility timeouts than the other queues. Requests pulled from the
// Ready queue will remain in flight for the batch linger duration. Batches from the Batch queue will remain in
// flight as long as it takes for them to get anchored.
//
// These queues will thus allow a smaller maximum receive count before messages fall through to the DLQ. Detecting
// failures is harder given the longer visibility timeouts, so it's important that they be detected as soon as
// possible.
Expand Down Expand Up @@ -250,7 +259,7 @@ func main() {
// are available in the queue. The 2 multiplier is arbitrary but will allow two batches worth of requests to be read
// and processed in parallel.
maxBatchQueueWorkers := anchorBatchSize * 2
batchingService := services.NewBatchingService(serverCtx, logger, batchQueue, metricService)
batchingService := services.NewBatchingService(serverCtx, batchQueue, batchStore, metricService, logger)
batchingConsumer := queue.NewConsumer(logger, readyQueue, batchingService.Batch, &maxBatchQueueWorkers)

// The Validation service reads from the Validate queue and posts to the Ready and Status queues
Expand Down
12 changes: 7 additions & 5 deletions common/aws/ddb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.JobRepository = &JobDatabase{}

type JobDatabase struct {
ddbClient *dynamodb.Client
table string
Expand All @@ -43,15 +45,15 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error {

func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
jobParams := map[string]interface{}{
job.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
job.JobParam_Overrides: map[string]string{
models.AnchorOverrides_AppMode: models.AnchorAppMode_ContinualAnchoring,
job.AnchorJobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
job.AnchorJobParam_Overrides: map[string]string{
models.AnchorOverrides_AppMode: models.AnchorAppMode_ContinualAnchoring,
models.AnchorOverrides_SchedulerStopAfterNoOp: "true",
},
}
// If an override anchor contract address is available, pass it through to the job.
if contractAddress, found := os.LookupEnv(models.Env_AnchorContractAddress); found {
jobParams[job.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
jobParams[job.AnchorJobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
}
newJob := models.NewJob(job.JobType_Anchor, jobParams)
attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) {
Expand All @@ -72,7 +74,7 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
if err != nil {
return "", err
} else {
return newJob.Job, nil
return newJob.JobId, nil
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions common/aws/ddb/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.StateRepository = &StateDatabase{}

type StateDatabase struct {
client *dynamodb.Client
checkpointTable string
Expand Down
2 changes: 2 additions & 0 deletions common/aws/queue/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.QueueMonitor = &Monitor{}

type Monitor struct {
queueUrl string
client *sqs.Client
Expand Down
74 changes: 74 additions & 0 deletions common/aws/storage/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package storage

import (
"bytes"
"context"
"encoding/json"
"errors"
"os"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"

"github.com/ceramicnetwork/go-cas/common"
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.KeyValueRepository = &S3Store{}

type S3Store struct {
client *s3.Client
logger models.Logger
bucket string
}

func NewS3Store(ctx context.Context, logger models.Logger, s3Client *s3.Client, bucket string) *S3Store {
// Create the bucket if it doesn't exist
if err := createBucket(ctx, s3Client, bucket); err != nil {
logger.Fatalf("failed to create bucket %s: %v", bucket, err)
}
return &S3Store{s3Client, logger, bucket}
}

func (s *S3Store) Store(ctx context.Context, key string, value interface{}) error {
if jsonBytes, err := json.Marshal(value); err != nil {
return err
} else {
httpCtx, httpCancel := context.WithTimeout(ctx, common.DefaultRpcWaitTime)
defer httpCancel()

putObjectIn := s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: bytes.NewReader(jsonBytes),
ContentType: aws.String("application/json"),
}
if _, err = s.client.PutObject(httpCtx, &putObjectIn); err != nil {
return err
} else {
s.logger.Debugf("stored key: %s", key)
}
}
return nil
}

func createBucket(ctx context.Context, client *s3.Client, bucket string) error {
httpCtx, httpCancel := context.WithTimeout(ctx, common.DefaultRpcWaitTime)
defer httpCancel()

if _, err := client.CreateBucket(httpCtx, &s3.CreateBucketInput{
Bucket: aws.String(bucket),
CreateBucketConfiguration: &types.CreateBucketConfiguration{
LocationConstraint: types.BucketLocationConstraint(os.Getenv("AWS_REGION")),
},
}); err != nil {
var ownedByYouErr *types.BucketAlreadyOwnedByYou
if errors.As(err, &ownedByYouErr) {
// This means that the bucket already exists and is owned by us, which is fine.
return nil
}
return err
}
return nil
}
2 changes: 2 additions & 0 deletions common/db/anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.AnchorRepository = &AnchorDatabase{}

type AnchorDatabase struct {
opts anchorDbOpts
logger models.Logger
Expand Down
2 changes: 2 additions & 0 deletions common/metrics/otl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.MetricService = &OtlMetricService{}

type OtlMetricService struct {
caller string
meterProvider *sdk.MeterProvider
Expand Down
2 changes: 2 additions & 0 deletions common/notifs/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.Notifier = &DiscordHandler{}

type DiscordColor int

const (
Expand Down
2 changes: 1 addition & 1 deletion env/.env.dev
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ANCHOR_AUDIT_ENABLED=true
ANCHOR_BATCH_LINGER=1h
ANCHOR_BATCH_SIZE=6144
ANCHOR_BATCH_SIZE=16383
ANCHOR_BATCH_MONITOR_TICK=10s
MAX_ANCHOR_WORKERS=5
LONG_QUEUE_VISIBILITY_TIMEOUT=2h
Expand Down
2 changes: 1 addition & 1 deletion env/.env.prod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ANCHOR_AUDIT_ENABLED=true
ANCHOR_BATCH_LINGER=1h
ANCHOR_BATCH_SIZE=6144
ANCHOR_BATCH_SIZE=16383
ANCHOR_BATCH_MONITOR_TICK=10s
MAX_ANCHOR_WORKERS=5
LONG_QUEUE_VISIBILITY_TIMEOUT=2h
Expand Down
2 changes: 1 addition & 1 deletion env/.env.tnet
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ANCHOR_AUDIT_ENABLED=true
ANCHOR_BATCH_LINGER=1h
ANCHOR_BATCH_SIZE=6144
ANCHOR_BATCH_SIZE=16383
ANCHOR_BATCH_MONITOR_TICK=10s
MAX_ANCHOR_WORKERS=5
LONG_QUEUE_VISIBILITY_TIMEOUT=2h
Expand Down
47 changes: 26 additions & 21 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ replace github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3 => github.c

require (
dagger.io/dagger v0.7.1
github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20231026142209-de0959fd5908
github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20231026142209-de0959fd5908
github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20240320183457-7556cceed3e2
github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20240320183457-7556cceed3e2
github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3
github.com/abevier/tsk v0.0.0-20230712145722-249b1e98b01c
github.com/alexflint/go-arg v1.4.2
github.com/aws/aws-sdk-go-v2 v1.21.2
github.com/aws/aws-sdk-go-v2/config v1.18.4
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.7
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.23.0
github.com/aws/aws-sdk-go-v2/service/ecr v1.18.11
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0
github.com/aws/aws-sdk-go-v2 v1.27.0
github.com/aws/aws-sdk-go-v2/config v1.27.15
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.17
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.32.3
github.com/aws/aws-sdk-go-v2/service/ecr v1.28.2
github.com/aws/aws-sdk-go-v2/service/s3 v1.54.2
github.com/aws/aws-sdk-go-v2/service/sqs v1.32.2
github.com/disgoorg/disgo v0.16.4
github.com/disgoorg/snowflake/v2 v2.0.1
github.com/go-playground/validator v9.31.0+incompatible
Expand Down Expand Up @@ -46,19 +47,23 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect
github.com/alexflint/go-scalar v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.37 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 // indirect
github.com/aws/smithy-go v1.15.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.15 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.20.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.8 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.8 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.9 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
Expand Down
Loading

0 comments on commit 9d90e04

Please sign in to comment.