From 397e59bd5bcc464542bbcd8228705795bfc5fd49 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Thu, 2 Jan 2025 14:52:55 +0530 Subject: [PATCH] Treatment service Poller (#87) * Added changes for using queue only if poller is not enabled in config * Added changes for poller config in treatment service * Added changes for poller in treatment service * Added changes for poller in treatment service * Added changes for poller in treatment service * Reverted poller changes for management-service * Fixed treatment service config test case * Refactored poller * Fixed poller condition * Refactored poller * Removed unused struct * Integrated PR review comments * Integrated PR review comments * Integrated PR review comments * Added sample poller config * Integrated PR review comments * Integrated PR review comments * Integrated PR review comments * Integrated PR review comments --------- Co-authored-by: Anirudh Rautela --- treatment-service/appcontext/appcontext.go | 2 + treatment-service/config/config.go | 28 ++++++----- treatment-service/config/config_test.go | 9 ++++ treatment-service/config/example.yaml | 4 ++ treatment-service/go.mod | 4 ++ treatment-service/go.sum | 6 +++ treatment-service/models/storage.go | 4 +- treatment-service/server/poller.go | 55 ++++++++++++++++++++++ treatment-service/server/server.go | 13 +++++ 9 files changed, 112 insertions(+), 13 deletions(-) create mode 100644 treatment-service/server/poller.go diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index 58f4defd..8e961263 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -23,6 +23,7 @@ type AppContext struct { SegmenterService services.SegmenterService AssignedTreatmentLogger *monitoring.AssignedTreatmentLogger + LocalStorage *models.LocalStorage } func NewAppContext(cfg *config.Config) (*AppContext, error) { @@ -129,6 +130,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { TreatmentService: treatmentSvc, AssignedTreatmentLogger: logger, MessageQueueService: messageQueueService, + LocalStorage: localStorage, } return appContext, nil diff --git a/treatment-service/config/config.go b/treatment-service/config/config.go index 7058d6cc..a738b02d 100644 --- a/treatment-service/config/config.go +++ b/treatment-service/config/config.go @@ -3,10 +3,10 @@ package config import ( "fmt" "strconv" + "time" "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" - common_config "github.com/caraml-dev/xp/common/config" common_mq_config "github.com/caraml-dev/xp/common/messagequeue" "github.com/caraml-dev/xp/treatment-service/models" @@ -24,16 +24,17 @@ type Config struct { Port int `json:"port" default:"8080" validate:"required"` ProjectIds []string `json:"project_ids" default:""` - AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` - DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"` - NewRelicConfig newrelic.Config `json:"new_relic_config"` - SentryConfig sentry.Config `json:"sentry_config"` - DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"` - MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config" validate:"required,dive"` - ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"` - MonitoringConfig Monitoring `json:"monitoring_config"` - SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"` - SegmenterConfig map[string]interface{} `json:"segmenter_config"` + AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` + DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"` + NewRelicConfig newrelic.Config `json:"new_relic_config"` + SentryConfig sentry.Config `json:"sentry_config"` + DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"` + MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config" validate:"required,dive"` + ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"` + MonitoringConfig Monitoring `json:"monitoring_config"` + SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"` + SegmenterConfig map[string]interface{} `json:"segmenter_config"` + ManagementServicePollerConfig ManagementServicePollerConfig `json:"management_service_poller_config" validate:"required,dive"` } type AssignedTreatmentLoggerConfig struct { @@ -94,6 +95,11 @@ type ManagementServiceConfig struct { AuthorizationEnabled bool `json:"authorization_enabled"` } +type ManagementServicePollerConfig struct { + Enabled bool `default:"false"` + PollInterval time.Duration `default:"30s"` +} + func (c *Config) GetProjectIds() []models.ProjectId { projectIds := make([]models.ProjectId, 0) for _, projectIdString := range c.ProjectIds { diff --git a/treatment-service/config/config_test.go b/treatment-service/config/config_test.go index 621c50aa..9c04fd6c 100644 --- a/treatment-service/config/config_test.go +++ b/treatment-service/config/config_test.go @@ -2,6 +2,7 @@ package config import ( "testing" + "time" "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" @@ -64,6 +65,10 @@ func TestDefaultConfigs(t *testing.T) { }, SentryConfig: sentry.Config{Enabled: false, Labels: emptyStringMap}, SegmenterConfig: make(map[string]interface{}), + ManagementServicePollerConfig: ManagementServicePollerConfig{ + Enabled: false, + PollInterval: 30 * time.Second, + }, } cfg, err := Load() require.NoError(t, err) @@ -127,6 +132,10 @@ func TestLoadMultipleConfigs(t *testing.T) { }, SentryConfig: sentry.Config{Enabled: true, DSN: "my.amazing.sentry.dsn", Labels: map[string]string{"app": "xp-treatment-service"}}, SegmenterConfig: map[string]interface{}{"s2_ids": map[string]interface{}{"mins2celllevel": 9, "maxs2celllevel": 15}}, + ManagementServicePollerConfig: ManagementServicePollerConfig{ + Enabled: false, + PollInterval: 30 * time.Second, + }, } cfg, err := Load(configFiles...) diff --git a/treatment-service/config/example.yaml b/treatment-service/config/example.yaml index a353f080..9a8e2d11 100644 --- a/treatment-service/config/example.yaml +++ b/treatment-service/config/example.yaml @@ -30,3 +30,7 @@ SegmenterConfig: S2_IDs: MinS2CellLevel: 10 MaxS2CellLevel: 14 + +PollerConfig: + Enabled: true + PollInterval: 10s diff --git a/treatment-service/go.mod b/treatment-service/go.mod index fda4fabc..c4c1d473 100644 --- a/treatment-service/go.mod +++ b/treatment-service/go.mod @@ -14,6 +14,7 @@ require ( github.com/deepmap/oapi-codegen v1.11.0 github.com/getkin/kin-openapi v0.94.0 github.com/go-chi/chi/v5 v5.0.7 + github.com/go-playground/validator/v10 v10.11.1 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 github.com/golang/geo v0.0.0-20210211234256-740aa86cb551 github.com/google/go-cmp v0.6.0 @@ -103,6 +104,8 @@ require ( github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.3 // indirect + github.com/go-playground/locales v0.14.0 // indirect + github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-viper/mapstructure/v2 v2.0.0 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/gofrs/flock v0.8.1 // indirect @@ -136,6 +139,7 @@ require ( github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/leodido/go-urn v1.2.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect diff --git a/treatment-service/go.sum b/treatment-service/go.sum index f4f0b24d..788fc763 100644 --- a/treatment-service/go.sum +++ b/treatment-service/go.sum @@ -333,13 +333,18 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= +github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ= +github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= github.com/go-sql-driver/mysql v1.3.0 h1:pgwjLi/dvffoP9aabwkT3AKpXQM93QARkjFhDDqC1UE= github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -599,6 +604,7 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.7.2/go.mod h1:xkCDAdFCIf8jsFQ5NnbK7oqaF/yU1A1X20Ltm0OvSks= github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lestrrat-go/backoff/v2 v2.0.8/go.mod h1:rHP/q/r9aT27n24JQLa7JhSQZCKBBOiM/uP402WwN8Y= github.com/lestrrat-go/blackmagic v1.0.0/go.mod h1:TNgH//0vYSs8VXDCfkZLgIrVTTXQELZffUV0tz3MtdQ= diff --git a/treatment-service/models/storage.go b/treatment-service/models/storage.go index 04af00d6..52d0529a 100644 --- a/treatment-service/models/storage.go +++ b/treatment-service/models/storage.go @@ -486,7 +486,7 @@ func (s *LocalStorage) DumpExperiments(filepath string) error { return os.WriteFile(filepath, file, 0644) } -func (s *LocalStorage) init() error { +func (s *LocalStorage) Init() error { s.Lock() defer s.Unlock() @@ -592,7 +592,7 @@ func NewLocalStorage( } segmenterCache := make(map[ProjectId]map[string]schema.SegmenterType) s := LocalStorage{managementClient: xpClient, subscribedProjectIds: projectIds, ProjectSegmenters: segmenterCache} - err = s.init() + err = s.Init() return &s, err } diff --git a/treatment-service/server/poller.go b/treatment-service/server/poller.go new file mode 100644 index 00000000..da636d4f --- /dev/null +++ b/treatment-service/server/poller.go @@ -0,0 +1,55 @@ +package server + +import ( + "log" + "time" + + "github.com/caraml-dev/xp/treatment-service/config" + "github.com/caraml-dev/xp/treatment-service/models" +) + +type Poller struct { + pollerConfig config.ManagementServicePollerConfig + localStorage *models.LocalStorage + stopChannel chan struct{} +} + +// NewPoller creates a new Poller instance with the given configuration and local storage. +// pollerConfig: configuration for the poller +// localStorage: local storage to be used by the poller +func NewPoller(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *Poller { + return &Poller{ + pollerConfig: pollerConfig, + localStorage: localStorage, + stopChannel: make(chan struct{}), + } +} + +func (p *Poller) Start() { + ticker := time.NewTicker(p.pollerConfig.PollInterval) + go func() { + for { + select { + case <-ticker.C: + err := p.Refresh() + log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval) + if err != nil { + log.Printf("Error updating local storage: %v", err) + continue + } + case <-p.stopChannel: + ticker.Stop() + return + } + } + }() +} + +func (p *Poller) Stop() { + close(p.stopChannel) +} + +func (p *Poller) Refresh() error { + err := p.localStorage.Init() + return err +} diff --git a/treatment-service/server/server.go b/treatment-service/server/server.go index 924a65ce..e94f17d4 100644 --- a/treatment-service/server/server.go +++ b/treatment-service/server/server.go @@ -33,6 +33,8 @@ type Server struct { subscribe bool // cleanup captures all the actions to be executed on server shut down cleanup []func() + // poller captures the poller instance + poller *Poller } // NewServer creates and configures an APIServer serving all application routes. @@ -106,6 +108,11 @@ func NewServer(configFiles []string) (*Server, error) { subscribe = true } + var poller *Poller + if cfg.ManagementServicePollerConfig.Enabled { + poller = NewPoller(cfg.ManagementServicePollerConfig, appCtx.LocalStorage) + } + srv := http.Server{ Addr: cfg.ListenAddress(), Handler: mux, @@ -116,6 +123,7 @@ func NewServer(configFiles []string) (*Server, error) { appContext: appCtx, subscribe: subscribe, cleanup: cleanup, + poller: poller, }, nil } @@ -133,6 +141,11 @@ func (srv *Server) Start() { }() log.Printf("Listening on %s\n", srv.Addr) + if srv.poller != nil { + log.Println("Starting poller...") + srv.poller.Start() + } + stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt)