From 8b273b580fcac39485b22e7d48e5d904896dcd62 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 13 Dec 2024 12:03:05 -0500 Subject: [PATCH] Add cloudwatch and start sending cron times there --- core/tasks/cron.go | 10 ++++++++++ go.mod | 15 ++++++++------- go.sum | 30 ++++++++++++++++-------------- mailroom.go | 13 +++++++++++++ runtime/runtime.go | 2 ++ testsuite/testsuite.go | 8 +++++++- 6 files changed, 56 insertions(+), 22 deletions(-) diff --git a/core/tasks/cron.go b/core/tasks/cron.go index bfa1a6dbb..7e51e5219 100644 --- a/core/tasks/cron.go +++ b/core/tasks/cron.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/mailroom/runtime" @@ -66,8 +68,16 @@ func recordCronExecution(name string, r func(context.Context, *runtime.Runtime) elapsed := time.Since(started) elapsedSeconds := elapsed.Seconds() + analytics.Gauge("mr.cron_"+name, elapsedSeconds) + rt.CW.Queue(types.MetricDatum{ + MetricName: aws.String("CronTime"), + Dimensions: []types.Dimension{{Name: aws.String("TaskName"), Value: aws.String(name)}}, + Value: aws.Float64(elapsedSeconds), + Unit: types.StandardUnitSeconds, + }) + rc := rt.RP.Get() defer rc.Close() diff --git a/go.mod b/go.mod index 54f96e27f..d8b771cbb 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/Masterminds/semver v1.5.0 github.com/appleboy/go-fcm v1.2.1 github.com/aws/aws-sdk-go-v2 v1.32.6 - github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.20 + github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.21 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0 github.com/aws/aws-sdk-go-v2/service/s3 v1.71.0 github.com/buger/jsonparser v1.1.1 @@ -22,11 +22,11 @@ require ( github.com/jmoiron/sqlx v1.4.0 github.com/lib/pq v1.10.9 github.com/nyaruka/ezconf v0.3.0 - github.com/nyaruka/gocommon v1.59.3 + github.com/nyaruka/gocommon v1.60.2 github.com/nyaruka/goflow v0.225.8 github.com/nyaruka/null/v3 v3.0.0 github.com/nyaruka/redisx v0.8.1 - github.com/nyaruka/rp-indexer/v9 v9.2.1 + github.com/nyaruka/rp-indexer/v9 v9.3.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.61.0 @@ -34,13 +34,13 @@ require ( github.com/samber/slog-sentry v1.2.2 github.com/shopspring/decimal v1.4.0 github.com/stretchr/testify v1.10.0 - google.golang.org/api v0.210.0 + google.golang.org/api v0.211.0 ) require ( cel.dev/expr v0.19.1 // indirect cloud.google.com/go v0.116.0 // indirect - cloud.google.com/go/auth v0.12.0 // indirect + cloud.google.com/go/auth v0.12.1 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect cloud.google.com/go/compute/metadata v0.5.2 // indirect cloud.google.com/go/firestore v1.17.0 // indirect @@ -62,7 +62,8 @@ require ( github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25 // indirect - github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8 // indirect + github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3 // indirect + github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.9 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.6 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.6 // indirect @@ -116,7 +117,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect go.opentelemetry.io/otel/trace v1.32.0 // indirect golang.org/x/crypto v0.30.0 // indirect - golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d // indirect + golang.org/x/exp v0.0.0-20241210194714-1829a127f884 // indirect golang.org/x/net v0.32.0 // indirect golang.org/x/oauth2 v0.24.0 // indirect golang.org/x/sync v0.10.0 // indirect diff --git a/go.sum b/go.sum index 4ab291d3b..eece132fa 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,8 @@ cel.dev/expr v0.19.1/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE= cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U= -cloud.google.com/go/auth v0.12.0 h1:ARAD8r0lkiHw2go7kEnmviF6TOYhzLM+yDGcDt9mP68= -cloud.google.com/go/auth v0.12.0/go.mod h1:xxA5AqpDrvS+Gkmo9RqrGGRh6WSNKKOXhY3zNOr38tI= +cloud.google.com/go/auth v0.12.1 h1:n2Bj25BUMM0nvE9D2XLTiImanwZhO3DkfWSYS/SAJP4= +cloud.google.com/go/auth v0.12.1/go.mod h1:BFMu+TNpF3DmvfBO9ClqTR/SiqVIm7LukKF9mbendF4= cloud.google.com/go/auth/oauth2adapt v0.2.6 h1:V6a6XDu2lTwPZWOawrAa9HUK+DB2zfJyTuciBG5hFkU= cloud.google.com/go/auth/oauth2adapt v0.2.6/go.mod h1:AlmsELtlEBnaNTL7jCj8VQFLy6mbZv0s4Q7NGBeQ5E8= cloud.google.com/go/compute/metadata v0.5.2 h1:UxK4uu/Tn+I3p2dYWTfiX4wva7aYlKixAHn3fyqngqo= @@ -54,8 +54,8 @@ github.com/aws/aws-sdk-go-v2/config v1.28.6 h1:D89IKtGrs/I3QXOLNTH93NJYtDhm8SYa9 github.com/aws/aws-sdk-go-v2/config v1.28.6/go.mod h1:GDzxJ5wyyFSCoLkS+UhGB0dArhb9mI+Co4dHtoTxbko= github.com/aws/aws-sdk-go-v2/credentials v1.17.47 h1:48bA+3/fCdi2yAwVt+3COvmatZ6jUDNkDTIsqDiMUdw= github.com/aws/aws-sdk-go-v2/credentials v1.17.47/go.mod h1:+KdckOejLW3Ks3b0E3b5rHsr2f9yuORBum0WPnE5o5w= -github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.20 h1:bwHhhCScKRAYJtaWVT+jDpt74GybN2nxI6+InkRjqGM= -github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.20/go.mod h1:/RfYH8CUMQuq/3CIEVGHLkqkA9KtbBF5omt2Ae8xc0s= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.21 h1:FdDxp4HNtJWPBAOdkJ+84Dfx2TOA7Dq+cH72GDHhjnA= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.21/go.mod h1:doHEXGiMWQBxcTJy3YN1Ao2HCgCuMWumuvTULGndCuQ= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 h1:AmoU1pziydclFT/xRV+xXE/Vb8fttJCLRPv8oAkprc0= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21/go.mod h1:AjUdLYe4Tgs6kpH4Bv7uMZo7pottoyHMn4eTcIcneaY= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 h1:s/fF4+yDQDoElYhfIVvSNyeCydfbuTKzhxSXDXCPasU= @@ -66,10 +66,12 @@ github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvK github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25 h1:r67ps7oHCYnflpgDy2LZU0MAQtQbYIOqNNnqGO6xQkE= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25/go.mod h1:GrGY+Q4fIokYLtjCVB/aFfCVL6hhGUFl8inD18fDalE= +github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3 h1:nQLG9irjDGUFXVPDHzjCGEEwh0hZ6BcxTvHOod1YsP4= +github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3/go.mod h1:URs8sqsyaxiAZkKP6tOEmhcs9j2ynFIomqOKY/CAHJc= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0 h1:isKhHsjpQR3CypQJ4G1g8QWx7zNpiC/xKw1zjgJYVno= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0/go.mod h1:xDvUyIkwBwNtVZJdHEwAuhFly3mezwdEWkbJ5oNYwIw= -github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8 h1:ntqHwZb+ZyVz0CFYUG0sQ02KMMJh+iXeV3bXoba+s4A= -github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8/go.mod h1:Hcjb2SiUo9v1GhpXjRNW7hAwfzAPfrsgnlKpP5UYEPY= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.9 h1:yhB2XYpHeWeAv5u3w9PFiSVIariSyhK5jcyQUFJpnIQ= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.9/go.mod h1:Hcjb2SiUo9v1GhpXjRNW7hAwfzAPfrsgnlKpP5UYEPY= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.6 h1:HCpPsWqmYQieU7SS6E9HXfdAMSud0pteVXieJmcpIRI= @@ -222,8 +224,8 @@ github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8= github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= github.com/nyaruka/ezconf v0.3.0 h1:kGvJqVN8AHowb4HdaHAviJ0Z3yI5Pyekp1WqibFEaGk= github.com/nyaruka/ezconf v0.3.0/go.mod h1:89GUW6EPRNLIxT7lC4LWnjWTgZeQwRoX7lBmc8ralAU= -github.com/nyaruka/gocommon v1.59.3 h1:fdjs9Z7aH+zog7FXlEpvJ0GtI6XNdNdBtjFxw5kVB7s= -github.com/nyaruka/gocommon v1.59.3/go.mod h1:peOpluiVBMeQu81Ar+7EPQVT7vawN6ho9Kh1k/Gj8Vk= +github.com/nyaruka/gocommon v1.60.2 h1:AvvSSAV70SV49ocNtvjpdb9NlcdiA2OQAL4NYVUcuV0= +github.com/nyaruka/gocommon v1.60.2/go.mod h1:kFJuOq8COneV7ssfK6xgCMJ8gP8fQifLQnNXBnE4YL0= github.com/nyaruka/goflow v0.225.8 h1:rDc3P3KL8sNlXUFBi0UmBvMOd3eCUhUrkO3RbYYiW7o= github.com/nyaruka/goflow v0.225.8/go.mod h1:spXtSWgS7dusHIfUFCvJGjSMc7d4FX9Abl6S7tg49ks= github.com/nyaruka/librato v1.1.1 h1:0nTYtJLl3Sn7lX3CuHsLf+nXy1k/tGV0OjVxLy3Et4s= @@ -236,8 +238,8 @@ github.com/nyaruka/phonenumbers v1.4.3 h1:tR71UJ+DZu7TSkxoG8JI8HzHJkPD/m4KNiUX34 github.com/nyaruka/phonenumbers v1.4.3/go.mod h1:gv+CtldaFz+G3vHHnasBSirAi3O2XLqZzVWz4V1pl2E= github.com/nyaruka/redisx v0.8.1 h1:d9Hc8nfSKTSEU+bx+YrB13d6bzAgiiHygk4jg/Q4nb4= github.com/nyaruka/redisx v0.8.1/go.mod h1:2TUmkDvprPInnmInR5AEbCm0zRRewkvSDVLsO+Do6iI= -github.com/nyaruka/rp-indexer/v9 v9.2.1 h1:gQa0QHiU+LjhmgpToHpoGRKRC8oI1EdW4dDaN9inhSk= -github.com/nyaruka/rp-indexer/v9 v9.2.1/go.mod h1:NzcuE4Zxrzde7gQinlWfwq2jeyEbamBj8hqVkm+eQLg= +github.com/nyaruka/rp-indexer/v9 v9.3.0 h1:8Thnt7k6/anEYcM3hIY+ObzF3JtO2/8EbDmb4JEAzsc= +github.com/nyaruka/rp-indexer/v9 v9.3.0/go.mod h1:YrHQx+ImBKRUQ4RWFJad1IlcMWlMyry/72SxAVCCgIU= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= @@ -301,8 +303,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d h1:0olWaB5pg3+oychR51GUVCEsGkeCU/2JxjBgIo4f3M0= -golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c= +golang.org/x/exp v0.0.0-20241210194714-1829a127f884 h1:Y/Mj/94zIQQGHVSv1tTtQBDaQaJe62U9bkDZKKyhPCU= +golang.org/x/exp v0.0.0-20241210194714-1829a127f884/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -356,8 +358,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/api v0.210.0 h1:HMNffZ57OoZCRYSbdWVRoqOa8V8NIHLL0CzdBPLztWk= -google.golang.org/api v0.210.0/go.mod h1:B9XDZGnx2NtyjzVkOVTGrFSAVZgPcbedzKg/gTLwqBs= +google.golang.org/api v0.211.0 h1:IUpLjq09jxBSV1lACO33CGY3jsRcbctfGzhj+ZSE/Bg= +google.golang.org/api v0.211.0/go.mod h1:XOloB4MXFH4UTlQSGuNUxw0UT74qdENK8d6JNsXKLi0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine/v2 v2.0.6 h1:LvPZLGuchSBslPBp+LAhihBeGSiRh1myRoYK4NtuBIw= diff --git a/mailroom.go b/mailroom.go index ebf04620b..cf33cfeb0 100644 --- a/mailroom.go +++ b/mailroom.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/jmoiron/sqlx" "github.com/nyaruka/gocommon/analytics" + "github.com/nyaruka/gocommon/aws/cwatch" "github.com/nyaruka/gocommon/aws/dynamo" "github.com/nyaruka/gocommon/aws/s3x" "github.com/nyaruka/mailroom/core/tasks" @@ -139,6 +140,16 @@ func (mr *Mailroom) Start() error { analytics.Start() + // configure and start cloudwatch + mr.rt.CW, err = cwatch.NewService(c.AWSAccessKeyID, c.AWSSecretAccessKey, c.AWSRegion, c.CloudwatchNamespace, c.DeploymentID) + if err != nil { + log.Error("cloudwatch not available", "error", err) + } else { + log.Info("cloudwatch ok") + } + + mr.rt.CW.StartQueue(mr.wg, time.Second*3) + // init our foremen and start it mr.handlerForeman.Start() mr.batchForeman.Start() @@ -164,7 +175,9 @@ func (mr *Mailroom) Stop() error { mr.batchForeman.Stop() mr.throttledForeman.Stop() + mr.rt.CW.StopQueue() analytics.Stop() + close(mr.quit) mr.cancel() diff --git a/runtime/runtime.go b/runtime/runtime.go index 81b43e046..64e0ee643 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -8,6 +8,7 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" + "github.com/nyaruka/gocommon/aws/cwatch" "github.com/nyaruka/gocommon/aws/dynamo" "github.com/nyaruka/gocommon/aws/s3x" ) @@ -21,6 +22,7 @@ type Runtime struct { Dynamo *dynamo.Service S3 *s3x.Service ES *elasticsearch.TypedClient + CW *cwatch.Service FCM FCMClient Config *Config } diff --git a/testsuite/testsuite.go b/testsuite/testsuite.go index 979ea1cf2..34bcbf36b 100644 --- a/testsuite/testsuite.go +++ b/testsuite/testsuite.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" + "github.com/nyaruka/gocommon/aws/cwatch" "github.com/nyaruka/gocommon/aws/dynamo" "github.com/nyaruka/gocommon/aws/s3x" "github.com/nyaruka/gocommon/jsonx" @@ -22,6 +23,7 @@ import ( "github.com/nyaruka/mailroom/runtime" "github.com/nyaruka/redisx/assertredis" "github.com/nyaruka/rp-indexer/v9/indexers" + ixruntime "github.com/nyaruka/rp-indexer/v9/runtime" ) var _db *sqlx.DB @@ -89,6 +91,9 @@ func Runtime() (context.Context, *runtime.Runtime) { s3svc, err := s3x.NewService(cfg.AWSAccessKeyID, cfg.AWSSecretAccessKey, cfg.AWSRegion, cfg.S3Endpoint, cfg.S3Minio) noError(err) + cwSvc, err := cwatch.NewService(cfg.AWSAccessKeyID, cfg.AWSSecretAccessKey, cfg.AWSRegion, cfg.CloudwatchNamespace, cfg.DeploymentID) + noError(err) + dbx := getDB() rt := &runtime.Runtime{ DB: dbx, @@ -97,6 +102,7 @@ func Runtime() (context.Context, *runtime.Runtime) { Dynamo: dyna, S3: s3svc, ES: getES(), + CW: cwSvc, FCM: &MockFCMClient{ValidTokens: []string{"FCMID3", "FCMID4", "FCMID5"}}, Config: cfg, } @@ -112,7 +118,7 @@ func ReindexElastic(ctx context.Context) { es := getES() contactsIndexer := indexers.NewContactIndexer(elasticURL, elasticContactsIndex, 1, 1, 100) - contactsIndexer.Index(db.DB, false, false) + contactsIndexer.Index(&ixruntime.Runtime{DB: db.DB}, false, false) _, err := es.Indices.Refresh().Index(elasticContactsIndex).Do(ctx) noError(err)