diff --git a/cmd/mailroom/main.go b/cmd/mailroom/main.go index 1ab05b52c..5a1ce1e6a 100644 --- a/cmd/mailroom/main.go +++ b/cmd/mailroom/main.go @@ -19,7 +19,6 @@ import ( _ "github.com/nyaruka/mailroom/core/handlers" _ "github.com/nyaruka/mailroom/core/hooks" - _ "github.com/nyaruka/mailroom/core/tasks/analytics" _ "github.com/nyaruka/mailroom/core/tasks/campaigns" _ "github.com/nyaruka/mailroom/core/tasks/contacts" _ "github.com/nyaruka/mailroom/core/tasks/expirations" @@ -28,6 +27,7 @@ import ( _ "github.com/nyaruka/mailroom/core/tasks/incidents" _ "github.com/nyaruka/mailroom/core/tasks/interrupts" _ "github.com/nyaruka/mailroom/core/tasks/ivr" + _ "github.com/nyaruka/mailroom/core/tasks/metrics" _ "github.com/nyaruka/mailroom/core/tasks/msgs" _ "github.com/nyaruka/mailroom/core/tasks/schedules" _ "github.com/nyaruka/mailroom/core/tasks/starts" diff --git a/core/tasks/analytics/cron.go b/core/tasks/analytics/cron.go deleted file mode 100644 index b7752c897..000000000 --- a/core/tasks/analytics/cron.go +++ /dev/null @@ -1,89 +0,0 @@ -package analytics - -import ( - "context" - "log/slog" - "time" - - "github.com/nyaruka/gocommon/analytics" - "github.com/nyaruka/mailroom/core/tasks" - "github.com/nyaruka/mailroom/runtime" -) - -func init() { - tasks.RegisterCron("analytics", &analyticsCron{}) -} - -// calculates a bunch of stats every minute and both logs them and sends them to librato -type analyticsCron struct { - // both sqlx and redis provide wait stats which are cummulative that we need to make into increments - dbWaitDuration time.Duration - dbWaitCount int64 - redisWaitDuration time.Duration - redisWaitCount int64 -} - -func (c *analyticsCron) Next(last time.Time) time.Time { - return tasks.CronNext(last, time.Minute) -} - -func (c *analyticsCron) AllInstances() bool { - return true -} - -func (c *analyticsCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string]any, error) { - // We wait 15 seconds since we fire at the top of the minute, the same as expirations. - // That way any metrics related to the size of our queue are a bit more accurate (all expirations can - // usually be handled in 15 seconds). Something more complicated would take into account the age of - // the items in our queues. - time.Sleep(time.Second * 15) - - rc := rt.RP.Get() - defer rc.Close() - - // calculate size of batch queue - batchSize, err := tasks.BatchQueue.Size(rc) - if err != nil { - slog.Error("error calculating batch queue size", "error", err) - } - - // and size of handler queue - handlerSize, err := tasks.HandlerQueue.Size(rc) - if err != nil { - slog.Error("error calculating handler queue size", "error", err) - } - - // get our DB and redis stats - dbStats := rt.DB.Stats() - redisStats := rt.RP.Stats() - - dbWaitDurationInPeriod := dbStats.WaitDuration - c.dbWaitDuration - dbWaitCountInPeriod := dbStats.WaitCount - c.dbWaitCount - redisWaitDurationInPeriod := redisStats.WaitDuration - c.redisWaitDuration - redisWaitCountInPeriod := redisStats.WaitCount - c.redisWaitCount - - c.dbWaitDuration = dbStats.WaitDuration - c.dbWaitCount = dbStats.WaitCount - c.redisWaitDuration = redisStats.WaitDuration - c.redisWaitCount = redisStats.WaitCount - - analytics.Gauge("mr.db_busy", float64(dbStats.InUse)) - analytics.Gauge("mr.db_idle", float64(dbStats.Idle)) - analytics.Gauge("mr.db_wait_ms", float64(dbWaitDurationInPeriod/time.Millisecond)) - analytics.Gauge("mr.db_wait_count", float64(dbWaitCountInPeriod)) - analytics.Gauge("mr.redis_wait_ms", float64(redisWaitDurationInPeriod/time.Millisecond)) - analytics.Gauge("mr.redis_wait_count", float64(redisWaitCountInPeriod)) - analytics.Gauge("mr.handler_queue", float64(handlerSize)) - analytics.Gauge("mr.batch_queue", float64(batchSize)) - - return map[string]any{ - "db_busy": dbStats.InUse, - "db_idle": dbStats.Idle, - "db_wait_time": dbWaitDurationInPeriod, - "db_wait_count": dbWaitCountInPeriod, - "redis_wait_time": dbWaitDurationInPeriod, - "redis_wait_count": dbWaitCountInPeriod, - "handler_size": handlerSize, - "batch_size": batchSize, - }, nil -} diff --git a/core/tasks/campaigns/fire_campaign_event.go b/core/tasks/campaigns/fire_campaign_event.go index da1ff365d..dadce54d8 100644 --- a/core/tasks/campaigns/fire_campaign_event.go +++ b/core/tasks/campaigns/fire_campaign_event.go @@ -10,7 +10,6 @@ import ( "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" - "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/flows" "github.com/nyaruka/goflow/flows/triggers" @@ -118,8 +117,6 @@ func (t *FireCampaignEventTask) Perform(ctx context.Context, rt *runtime.Runtime // FireCampaignEvents tries to handle the given event fires, returning those that were handled (i.e. skipped, fired or deleted) func FireCampaignEvents(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, fires []*models.EventFire, flowUUID assets.FlowUUID, campaign *triggers.CampaignReference, eventUUID triggers.CampaignEventUUID) ([]*models.EventFire, error) { - start := time.Now() - // get the capmaign event object dbEvent := oa.CampaignEventByID(fires[0].EventID) if dbEvent == nil { @@ -232,9 +229,5 @@ func FireCampaignEvents(ctx context.Context, rt *runtime.Runtime, oa *models.Org slog.Error("error starting flow for campaign event", "error", err, "event", eventUUID) } - // log both our total and average - analytics.Gauge("mr.campaign_event_elapsed", float64(time.Since(start))/float64(time.Second)) - analytics.Gauge("mr.campaign_event_count", float64(len(handled))) - return handled, nil } diff --git a/core/tasks/cron.go b/core/tasks/cron.go index 5c8277b12..50c02732e 100644 --- a/core/tasks/cron.go +++ b/core/tasks/cron.go @@ -40,7 +40,7 @@ type Cron interface { Run(context.Context, *runtime.Runtime) (map[string]any, error) // AllInstances returns whether cron runs on all instances - i.e. locking is instance specific. This is for crons - // like analytics which report instance specific stats. Other crons are synchronized across all instances. + // like metrics which report instance specific stats. Other crons are synchronized across all instances. AllInstances() bool } @@ -69,7 +69,7 @@ func recordCronExecution(name string, r func(context.Context, *runtime.Runtime) elapsedSeconds := elapsed.Seconds() rt.CW.Queue(types.MetricDatum{ - MetricName: aws.String("CronTime"), + MetricName: aws.String("CronTaskDuration"), Dimensions: []types.Dimension{{Name: aws.String("TaskName"), Value: aws.String(name)}}, Value: aws.Float64(elapsedSeconds), Unit: types.StandardUnitSeconds, diff --git a/core/tasks/handler/handle_contact_event.go b/core/tasks/handler/handle_contact_event.go index 089a9cee6..dc5211871 100644 --- a/core/tasks/handler/handle_contact_event.go +++ b/core/tasks/handler/handle_contact_event.go @@ -7,9 +7,10 @@ import ( "log/slog" "time" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" - "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/gocommon/dbutil" "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/mailroom/core/models" @@ -98,11 +99,16 @@ func (t *HandleContactEventTask) Perform(ctx context.Context, rt *runtime.Runtim err = performHandlerTask(ctx, rt, oa, t.ContactID, ctask) - // log our processing time to librato - analytics.Gauge(fmt.Sprintf("mr.%s_elapsed", taskPayload.Type), float64(time.Since(start))/float64(time.Second)) - - // and total latency for this task since it was queued - analytics.Gauge(fmt.Sprintf("mr.%s_latency", taskPayload.Type), float64(time.Since(taskPayload.QueuedOn))/float64(time.Second)) + // send metrics for processing time and lag from queue time + rt.CW.Queue(types.MetricDatum{ + MetricName: aws.String("HandlerTaskDuration"), + Value: aws.Float64(float64(time.Since(start)) / float64(time.Second)), + Unit: types.StandardUnitSeconds, + }, types.MetricDatum{ + MetricName: aws.String("HandlerTaskLatency"), + Value: aws.Float64(float64(time.Since(taskPayload.QueuedOn)) / float64(time.Second)), + Unit: types.StandardUnitSeconds, + }) // if we get an error processing an event, requeue it for later and return our error if err != nil { diff --git a/core/tasks/metrics/cron.go b/core/tasks/metrics/cron.go new file mode 100644 index 000000000..02c84e79a --- /dev/null +++ b/core/tasks/metrics/cron.go @@ -0,0 +1,132 @@ +package analytics + +import ( + "context" + "log/slog" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + "github.com/nyaruka/mailroom/core/tasks" + "github.com/nyaruka/mailroom/runtime" +) + +func init() { + tasks.RegisterCron("metrics", &metricsCron{}) +} + +// calculates a bunch of stats every minute and both logs them and sends them to cloudwatch +type metricsCron struct { + // both sqlx and redis provide wait stats which are cummulative that we need to make into increments + dbWaitDuration time.Duration + redisWaitDuration time.Duration +} + +func (c *metricsCron) Next(last time.Time) time.Time { + return tasks.CronNext(last, time.Minute) +} + +func (c *metricsCron) AllInstances() bool { + return true +} + +func (c *metricsCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string]any, error) { + // TODO replace with offset passed to tasks.CronNext + // We wait 15 seconds since we fire at the top of the minute, the same as expirations. + // That way any metrics related to the size of our queue are a bit more accurate (all expirations can + // usually be handled in 15 seconds). Something more complicated would take into account the age of + // the items in our queues. + time.Sleep(time.Second * 15) + + handlerSize, batchSize, throttledSize := getQueueSizes(rt) + + // get our DB and redis stats + dbStats := rt.DB.Stats() + redisStats := rt.RP.Stats() + + dbWaitDurationInPeriod := dbStats.WaitDuration - c.dbWaitDuration + redisWaitDurationInPeriod := redisStats.WaitDuration - c.redisWaitDuration + + c.dbWaitDuration = dbStats.WaitDuration + c.redisWaitDuration = redisStats.WaitDuration + + dims := []types.Dimension{ + {Name: aws.String("Host"), Value: aws.String(rt.Config.InstanceID)}, + {Name: aws.String("App"), Value: aws.String("mailroom")}, + } + + rt.CW.Queue(types.MetricDatum{ + MetricName: aws.String("DBConnectionsInUse"), + Dimensions: dims, + Value: aws.Float64(float64(dbStats.InUse)), + Unit: types.StandardUnitCount, + }, types.MetricDatum{ + MetricName: aws.String("DBConnectionWaitDuration"), + Dimensions: dims, + Value: aws.Float64(float64(dbWaitDurationInPeriod / time.Second)), + Unit: types.StandardUnitSeconds, + }, types.MetricDatum{ + MetricName: aws.String("RedisConnectionsInUse"), + Dimensions: dims, + Value: aws.Float64(float64(redisStats.ActiveCount)), + Unit: types.StandardUnitCount, + }, types.MetricDatum{ + MetricName: aws.String("RedisConnectionsWaitDuration"), + Dimensions: dims, + Value: aws.Float64(float64(redisWaitDurationInPeriod / time.Second)), + Unit: types.StandardUnitSeconds, + }) + + rt.CW.Queue(types.MetricDatum{ + MetricName: aws.String("QueuedTasks"), + Dimensions: []types.Dimension{ + {Name: aws.String("QueueName"), Value: aws.String("handler")}, + }, + Value: aws.Float64(float64(handlerSize)), + Unit: types.StandardUnitCount, + }, types.MetricDatum{ + MetricName: aws.String("QueuedTasks"), + Dimensions: []types.Dimension{ + {Name: aws.String("QueueName"), Value: aws.String("batch")}, + }, + Value: aws.Float64(float64(batchSize)), + Unit: types.StandardUnitCount, + }, types.MetricDatum{ + MetricName: aws.String("QueuedTasks"), + Dimensions: []types.Dimension{ + {Name: aws.String("QueueName"), Value: aws.String("throttled")}, + }, + Value: aws.Float64(float64(throttledSize)), + Unit: types.StandardUnitCount, + }) + + return map[string]any{ + "db_inuse": dbStats.InUse, + "db_wait": dbWaitDurationInPeriod, + "redis_inuse": redisStats.ActiveCount, + "redis_wait": redisWaitDurationInPeriod, + "handler_size": handlerSize, + "batch_size": batchSize, + "throttled_size": throttledSize, + }, nil +} + +func getQueueSizes(rt *runtime.Runtime) (int, int, int) { + rc := rt.RP.Get() + defer rc.Close() + + handler, err := tasks.HandlerQueue.Size(rc) + if err != nil { + slog.Error("error calculating handler queue size", "error", err) + } + batch, err := tasks.BatchQueue.Size(rc) + if err != nil { + slog.Error("error calculating batch queue size", "error", err) + } + throttled, err := tasks.ThrottledQueue.Size(rc) + if err != nil { + slog.Error("error calculating throttled queue size", "error", err) + } + + return handler, batch, throttled +} diff --git a/mailroom.go b/mailroom.go index cf33cfeb0..68344332c 100644 --- a/mailroom.go +++ b/mailroom.go @@ -11,7 +11,6 @@ import ( "github.com/appleboy/go-fcm" "github.com/elastic/go-elasticsearch/v8" "github.com/jmoiron/sqlx" - "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/gocommon/aws/cwatch" "github.com/nyaruka/gocommon/aws/dynamo" "github.com/nyaruka/gocommon/aws/s3x" @@ -133,13 +132,6 @@ func (mr *Mailroom) Start() error { log.Info("elastic ok") } - // if we have a librato token, configure it - if c.LibratoToken != "" { - analytics.RegisterBackend(analytics.NewLibrato(c.LibratoUsername, c.LibratoToken, c.InstanceID, time.Second, mr.wg)) - } - - analytics.Start() - // configure and start cloudwatch mr.rt.CW, err = cwatch.NewService(c.AWSAccessKeyID, c.AWSSecretAccessKey, c.AWSRegion, c.CloudwatchNamespace, c.DeploymentID) if err != nil { @@ -176,7 +168,6 @@ func (mr *Mailroom) Stop() error { mr.throttledForeman.Stop() mr.rt.CW.StopQueue() - analytics.Stop() close(mr.quit) mr.cancel() diff --git a/runtime/config.go b/runtime/config.go index fc630c8ed..8eda94afd 100644 --- a/runtime/config.go +++ b/runtime/config.go @@ -69,19 +69,16 @@ type Config struct { S3SessionsBucket string `help:"S3 bucket to write flow sessions to"` S3Minio bool `help:"S3 is actually Minio or other compatible service"` - CourierAuthToken string `help:"the authentication token used for requests to Courier"` - LibratoUsername string `help:"the username that will be used to authenticate to Librato"` - LibratoToken string `help:"the token that will be used to authenticate to Librato"` - CloudwatchNamespace string `help:"the namespace to use for cloudwatch metrics"` DeploymentID string `help:"the deployment identifier to use for metrics"` + InstanceID string `help:"the instance identifier to use for metrics"` + CourierAuthToken string `help:"the authentication token used for requests to Courier"` AndroidCredentialsFile string `help:"path to JSON file with FCM service account credentials used to sync Android relayers"` - InstanceID string `help:"the unique identifier of this instance, defaults to hostname"` - LogLevel slog.Level `help:"the logging level courier should use"` - UUIDSeed int `help:"seed to use for UUID generation in a testing environment"` - Version string `help:"the version of this mailroom install"` + LogLevel slog.Level `help:"the logging level courier should use"` + UUIDSeed int `help:"seed to use for UUID generation in a testing environment"` + Version string `help:"the version of this mailroom install"` } // NewDefaultConfig returns a new default configuration object @@ -133,11 +130,11 @@ func NewDefaultConfig() *Config { CloudwatchNamespace: "Temba", DeploymentID: "dev", + InstanceID: hostname, - InstanceID: hostname, - LogLevel: slog.LevelWarn, - UUIDSeed: 0, - Version: "Dev", + LogLevel: slog.LevelWarn, + UUIDSeed: 0, + Version: "Dev", } }