Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cas scaling #38

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 149 additions & 128 deletions cmd/cas/main.go

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions common/aws/ddb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.JobRepository = &JobDatabase{}

type JobDatabase struct {
ddbClient *dynamodb.Client
table string
Expand All @@ -43,17 +45,15 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error {

func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
jobParams := map[string]interface{}{
job.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
job.JobParam_Overrides: map[string]string{},
}
// Only enable continuous anchoring on Clay and Prod
if (jdb.env == cas.EnvTag_Tnet) || (jdb.env == cas.EnvTag_Prod) {
jobParams[job.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_AppMode] = models.AnchorAppMode_ContinualAnchoring
jobParams[job.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_SchedulerStopAfterNoOp] = "true"
job.AnchorJobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
job.AnchorJobParam_Overrides: map[string]string{
models.AnchorOverrides_AppMode: models.AnchorAppMode_ContinualAnchoring,
models.AnchorOverrides_SchedulerStopAfterNoOp: "true",
},
}
// If an override anchor contract address is available, pass it through to the job.
if contractAddress, found := os.LookupEnv(models.Env_AnchorContractAddress); found {
jobParams[job.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
jobParams[job.AnchorJobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
}
newJob := models.NewJob(job.JobType_Anchor, jobParams)
attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) {
Expand All @@ -74,7 +74,7 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
if err != nil {
return "", err
} else {
return newJob.Job, nil
return newJob.JobId, nil
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions common/aws/ddb/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.StateRepository = &StateDatabase{}

type StateDatabase struct {
client *dynamodb.Client
checkpointTable string
Expand Down
47 changes: 0 additions & 47 deletions common/aws/queue/consumer.go

This file was deleted.

2 changes: 2 additions & 0 deletions common/aws/queue/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.QueueMonitor = &Monitor{}

type Monitor struct {
queueUrl string
client *sqs.Client
Expand Down
32 changes: 32 additions & 0 deletions common/aws/queue/multiMonitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package queue

import (
"context"

"github.com/ceramicnetwork/go-cas/models"
)

var _ models.QueueMonitor = &MultiMonitor{}

type MultiMonitor struct {
monitors []models.QueueMonitor
}

func NewMultiMonitor(monitors []models.QueueMonitor) models.QueueMonitor {
return &MultiMonitor{monitors}
}

func (m MultiMonitor) GetUtilization(ctx context.Context) (int, int, error) {
// Get the utilization of all sub-queues
totalMsgsUnprocessed := 0
totalMsgsInFlight := 0
for _, monitor := range m.monitors {
numMsgsUnprocessed, numMsgsInFlight, err := monitor.GetUtilization(ctx)
if err != nil {
return 0, 0, err
}
totalMsgsUnprocessed += numMsgsUnprocessed
totalMsgsInFlight += numMsgsInFlight
}
return totalMsgsUnprocessed, totalMsgsInFlight, nil
}
86 changes: 86 additions & 0 deletions common/aws/queue/multiQueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package queue

import (
"context"
"math"
"sync/atomic"

"github.com/aws/aws-sdk-go-v2/service/sqs"

"github.com/abevier/go-sqs/gosqs"

"github.com/ceramicnetwork/go-cas/models"
)

var _ models.Queue = &multiQueue{}
var _ models.QueuePublisher = &multiQueue{}

type multiQueue struct {
queues []models.Queue
monitor models.QueueMonitor
index int32
}

func NewMultiQueue(
ctx context.Context,
metricService models.MetricService,
logger models.Logger,
sqsClient *sqs.Client,
opts Opts,
callback gosqs.MessageCallbackFunc,
numQueues int,
) (models.Queue, error) {
// Create at least one sub-queue
numQueues = int(math.Max(1, float64(numQueues)))
queues := make([]models.Queue, numQueues)
monitors := make([]models.QueueMonitor, numQueues)
if opts.NumWorkers != nil {
// Divide the workers evenly among the sub-queues
*opts.NumWorkers = int(math.Ceil(float64(*opts.NumWorkers / numQueues)))
}
for i := 0; i < numQueues; i++ {
if q, _, err := newQueue(ctx, metricService, logger, sqsClient, opts, callback, i); err != nil {
return nil, err
} else {
queues[i] = q
monitors[i] = q.Monitor()
}
}
return &multiQueue{queues, NewMultiMonitor(monitors), 0}, nil
}

func (m *multiQueue) SendMessage(ctx context.Context, event any) (string, error) {
// Use an atomic counter to round-robin between sub-queues
i := atomic.AddInt32(&m.index, 1) - 1
i = i % int32(len(m.queues))
return m.queues[i].Publisher().SendMessage(ctx, event)
}

func (m *multiQueue) Start() {
// Start all sub-queues
for _, q := range m.queues {
q.Start()
}
}

func (m *multiQueue) Shutdown() {
// Shutdown all sub-queues
for _, q := range m.queues {
q.Shutdown()
}
}

func (m *multiQueue) WaitForRxShutdown() {
// Wait for all sub-queues to shutdown
for _, q := range m.queues {
q.WaitForRxShutdown()
}
}

func (m *multiQueue) Monitor() models.QueueMonitor {
return m.monitor
}

func (m *multiQueue) Publisher() models.QueuePublisher {
return m
}
60 changes: 0 additions & 60 deletions common/aws/queue/publisher.go

This file was deleted.

Loading
Loading