Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert all remaining metrics to cloudwatch #383

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/mailroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

_ "github.com/nyaruka/mailroom/core/handlers"
_ "github.com/nyaruka/mailroom/core/hooks"
_ "github.com/nyaruka/mailroom/core/tasks/analytics"
_ "github.com/nyaruka/mailroom/core/tasks/campaigns"
_ "github.com/nyaruka/mailroom/core/tasks/contacts"
_ "github.com/nyaruka/mailroom/core/tasks/expirations"
Expand All @@ -28,6 +27,7 @@ import (
_ "github.com/nyaruka/mailroom/core/tasks/incidents"
_ "github.com/nyaruka/mailroom/core/tasks/interrupts"
_ "github.com/nyaruka/mailroom/core/tasks/ivr"
_ "github.com/nyaruka/mailroom/core/tasks/metrics"
_ "github.com/nyaruka/mailroom/core/tasks/msgs"
_ "github.com/nyaruka/mailroom/core/tasks/schedules"
_ "github.com/nyaruka/mailroom/core/tasks/starts"
Expand Down
89 changes: 0 additions & 89 deletions core/tasks/analytics/cron.go

This file was deleted.

7 changes: 0 additions & 7 deletions core/tasks/campaigns/fire_campaign_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/triggers"
Expand Down Expand Up @@ -118,8 +117,6 @@ func (t *FireCampaignEventTask) Perform(ctx context.Context, rt *runtime.Runtime

// FireCampaignEvents tries to handle the given event fires, returning those that were handled (i.e. skipped, fired or deleted)
func FireCampaignEvents(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, fires []*models.EventFire, flowUUID assets.FlowUUID, campaign *triggers.CampaignReference, eventUUID triggers.CampaignEventUUID) ([]*models.EventFire, error) {
start := time.Now()

// get the capmaign event object
dbEvent := oa.CampaignEventByID(fires[0].EventID)
if dbEvent == nil {
Expand Down Expand Up @@ -232,9 +229,5 @@ func FireCampaignEvents(ctx context.Context, rt *runtime.Runtime, oa *models.Org
slog.Error("error starting flow for campaign event", "error", err, "event", eventUUID)
}

// log both our total and average
analytics.Gauge("mr.campaign_event_elapsed", float64(time.Since(start))/float64(time.Second))
analytics.Gauge("mr.campaign_event_count", float64(len(handled)))

return handled, nil
}
4 changes: 2 additions & 2 deletions core/tasks/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Cron interface {
Run(context.Context, *runtime.Runtime) (map[string]any, error)

// AllInstances returns whether cron runs on all instances - i.e. locking is instance specific. This is for crons
// like analytics which report instance specific stats. Other crons are synchronized across all instances.
// like metrics which report instance specific stats. Other crons are synchronized across all instances.
AllInstances() bool
}

Expand Down Expand Up @@ -69,7 +69,7 @@ func recordCronExecution(name string, r func(context.Context, *runtime.Runtime)
elapsedSeconds := elapsed.Seconds()

rt.CW.Queue(types.MetricDatum{
MetricName: aws.String("CronTime"),
MetricName: aws.String("CronTaskDuration"),
Dimensions: []types.Dimension{{Name: aws.String("TaskName"), Value: aws.String(name)}},
Value: aws.Float64(elapsedSeconds),
Unit: types.StandardUnitSeconds,
Expand Down
18 changes: 12 additions & 6 deletions core/tasks/handler/handle_contact_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/mailroom/core/models"
Expand Down Expand Up @@ -98,11 +99,16 @@ func (t *HandleContactEventTask) Perform(ctx context.Context, rt *runtime.Runtim

err = performHandlerTask(ctx, rt, oa, t.ContactID, ctask)

// log our processing time to librato
analytics.Gauge(fmt.Sprintf("mr.%s_elapsed", taskPayload.Type), float64(time.Since(start))/float64(time.Second))

// and total latency for this task since it was queued
analytics.Gauge(fmt.Sprintf("mr.%s_latency", taskPayload.Type), float64(time.Since(taskPayload.QueuedOn))/float64(time.Second))
// send metrics for processing time and lag from queue time
rt.CW.Queue(types.MetricDatum{
MetricName: aws.String("HandlerTaskDuration"),
Value: aws.Float64(float64(time.Since(start)) / float64(time.Second)),
Unit: types.StandardUnitSeconds,
}, types.MetricDatum{
MetricName: aws.String("HandlerTaskLatency"),
Value: aws.Float64(float64(time.Since(taskPayload.QueuedOn)) / float64(time.Second)),
Unit: types.StandardUnitSeconds,
})

// if we get an error processing an event, requeue it for later and return our error
if err != nil {
Expand Down
132 changes: 132 additions & 0 deletions core/tasks/metrics/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package analytics

import (
"context"
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/runtime"
)

func init() {
tasks.RegisterCron("metrics", &metricsCron{})
}

// calculates a bunch of stats every minute and both logs them and sends them to cloudwatch
type metricsCron struct {
// both sqlx and redis provide wait stats which are cummulative that we need to make into increments
dbWaitDuration time.Duration
redisWaitDuration time.Duration
}

func (c *metricsCron) Next(last time.Time) time.Time {
return tasks.CronNext(last, time.Minute)
}

func (c *metricsCron) AllInstances() bool {
return true
}

func (c *metricsCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string]any, error) {
// TODO replace with offset passed to tasks.CronNext
// We wait 15 seconds since we fire at the top of the minute, the same as expirations.
// That way any metrics related to the size of our queue are a bit more accurate (all expirations can
// usually be handled in 15 seconds). Something more complicated would take into account the age of
// the items in our queues.
time.Sleep(time.Second * 15)

handlerSize, batchSize, throttledSize := getQueueSizes(rt)

// get our DB and redis stats
dbStats := rt.DB.Stats()
redisStats := rt.RP.Stats()

dbWaitDurationInPeriod := dbStats.WaitDuration - c.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - c.redisWaitDuration

c.dbWaitDuration = dbStats.WaitDuration
c.redisWaitDuration = redisStats.WaitDuration

dims := []types.Dimension{
{Name: aws.String("Host"), Value: aws.String(rt.Config.InstanceID)},
{Name: aws.String("App"), Value: aws.String("mailroom")},
}

rt.CW.Queue(types.MetricDatum{
MetricName: aws.String("DBConnectionsInUse"),
Dimensions: dims,
Value: aws.Float64(float64(dbStats.InUse)),
Unit: types.StandardUnitCount,
}, types.MetricDatum{
MetricName: aws.String("DBConnectionWaitDuration"),
Dimensions: dims,
Value: aws.Float64(float64(dbWaitDurationInPeriod / time.Second)),
Unit: types.StandardUnitSeconds,
}, types.MetricDatum{
MetricName: aws.String("RedisConnectionsInUse"),
Dimensions: dims,
Value: aws.Float64(float64(redisStats.ActiveCount)),
Unit: types.StandardUnitCount,
}, types.MetricDatum{
MetricName: aws.String("RedisConnectionsWaitDuration"),
Dimensions: dims,
Value: aws.Float64(float64(redisWaitDurationInPeriod / time.Second)),
Unit: types.StandardUnitSeconds,
})

rt.CW.Queue(types.MetricDatum{
MetricName: aws.String("QueuedTasks"),
Dimensions: []types.Dimension{
{Name: aws.String("QueueName"), Value: aws.String("handler")},
},
Value: aws.Float64(float64(handlerSize)),
Unit: types.StandardUnitCount,
}, types.MetricDatum{
MetricName: aws.String("QueuedTasks"),
Dimensions: []types.Dimension{
{Name: aws.String("QueueName"), Value: aws.String("batch")},
},
Value: aws.Float64(float64(batchSize)),
Unit: types.StandardUnitCount,
}, types.MetricDatum{
MetricName: aws.String("QueuedTasks"),
Dimensions: []types.Dimension{
{Name: aws.String("QueueName"), Value: aws.String("throttled")},
},
Value: aws.Float64(float64(throttledSize)),
Unit: types.StandardUnitCount,
})

return map[string]any{
"db_inuse": dbStats.InUse,
"db_wait": dbWaitDurationInPeriod,
"redis_inuse": redisStats.ActiveCount,
"redis_wait": redisWaitDurationInPeriod,
"handler_size": handlerSize,
"batch_size": batchSize,
"throttled_size": throttledSize,
}, nil
}

func getQueueSizes(rt *runtime.Runtime) (int, int, int) {
rc := rt.RP.Get()
defer rc.Close()

handler, err := tasks.HandlerQueue.Size(rc)
if err != nil {
slog.Error("error calculating handler queue size", "error", err)
}
batch, err := tasks.BatchQueue.Size(rc)
if err != nil {
slog.Error("error calculating batch queue size", "error", err)
}
throttled, err := tasks.ThrottledQueue.Size(rc)
if err != nil {
slog.Error("error calculating throttled queue size", "error", err)
}

return handler, batch, throttled
}
9 changes: 0 additions & 9 deletions mailroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/appleboy/go-fcm"
"github.com/elastic/go-elasticsearch/v8"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/nyaruka/gocommon/aws/s3x"
Expand Down Expand Up @@ -133,13 +132,6 @@ func (mr *Mailroom) Start() error {
log.Info("elastic ok")
}

// if we have a librato token, configure it
if c.LibratoToken != "" {
analytics.RegisterBackend(analytics.NewLibrato(c.LibratoUsername, c.LibratoToken, c.InstanceID, time.Second, mr.wg))
}

analytics.Start()

// configure and start cloudwatch
mr.rt.CW, err = cwatch.NewService(c.AWSAccessKeyID, c.AWSSecretAccessKey, c.AWSRegion, c.CloudwatchNamespace, c.DeploymentID)
if err != nil {
Expand Down Expand Up @@ -176,7 +168,6 @@ func (mr *Mailroom) Stop() error {
mr.throttledForeman.Stop()

mr.rt.CW.StopQueue()
analytics.Stop()

close(mr.quit)
mr.cancel()
Expand Down
21 changes: 9 additions & 12 deletions runtime/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,16 @@ type Config struct {
S3SessionsBucket string `help:"S3 bucket to write flow sessions to"`
S3Minio bool `help:"S3 is actually Minio or other compatible service"`

CourierAuthToken string `help:"the authentication token used for requests to Courier"`
LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
LibratoToken string `help:"the token that will be used to authenticate to Librato"`

CloudwatchNamespace string `help:"the namespace to use for cloudwatch metrics"`
DeploymentID string `help:"the deployment identifier to use for metrics"`
InstanceID string `help:"the instance identifier to use for metrics"`

CourierAuthToken string `help:"the authentication token used for requests to Courier"`
AndroidCredentialsFile string `help:"path to JSON file with FCM service account credentials used to sync Android relayers"`

InstanceID string `help:"the unique identifier of this instance, defaults to hostname"`
LogLevel slog.Level `help:"the logging level courier should use"`
UUIDSeed int `help:"seed to use for UUID generation in a testing environment"`
Version string `help:"the version of this mailroom install"`
LogLevel slog.Level `help:"the logging level courier should use"`
UUIDSeed int `help:"seed to use for UUID generation in a testing environment"`
Version string `help:"the version of this mailroom install"`
}

// NewDefaultConfig returns a new default configuration object
Expand Down Expand Up @@ -133,11 +130,11 @@ func NewDefaultConfig() *Config {

CloudwatchNamespace: "Temba",
DeploymentID: "dev",
InstanceID: hostname,

InstanceID: hostname,
LogLevel: slog.LevelWarn,
UUIDSeed: 0,
Version: "Dev",
LogLevel: slog.LevelWarn,
UUIDSeed: 0,
Version: "Dev",
}
}

Expand Down
Loading