Skip to content

Commit

Permalink
fix(xp-treatment): Add field tags to poller configs and make XP plugi…
Browse files Browse the repository at this point in the history
…n pass poller configs (#88)

* Add field tags to poller configs

* Add poller config to plugin

* Update message queue config with json tags

* Update xp plugin to use message queue config from manager instead of xp management

* Reorder imports

* Change poller interval field to int

* Reorder imports

* Reorder imports

* Refactor poller into a service
  • Loading branch information
deadlycoconuts authored Jan 20, 2025
1 parent 397e59b commit 649e5f6
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 82 deletions.
4 changes: 2 additions & 2 deletions common/messagequeue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ const (

type MessageQueueConfig struct {
// The type of Message Queue for event updates
Kind MessageQueueKind `default:""`
Kind MessageQueueKind `json:"kind" default:""`

// PubSubConfig captures the config related to publishing and subscribing to a PubSub Message Queue
PubSubConfig *PubSubConfig
PubSubConfig *PubSubConfig `json:"pub_sub_config"`
}

type PubSubConfig struct {
Expand Down
19 changes: 11 additions & 8 deletions plugins/turing/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry"
common_mq_config "github.com/caraml-dev/xp/common/messagequeue"
"github.com/caraml-dev/xp/treatment-service/config"
"github.com/go-playground/validator/v10"
)
Expand Down Expand Up @@ -46,14 +47,16 @@ type TreatmentServicePluginConfig struct {
Port int `json:"port" default:"8080"`
PubSubTimeoutSeconds int `json:"pub_sub_timeout_seconds" validate:"required"`

AssignedTreatmentLogger config.AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"`
DebugConfig config.DebugConfig `json:"debug_config"`
DeploymentConfig config.DeploymentConfig `json:"deployment_config"`
ManagementService config.ManagementServiceConfig `json:"management_service"`
MonitoringConfig config.Monitoring `json:"monitoring_config"`
SwaggerConfig config.SwaggerConfig `json:"swagger_config"`
NewRelicConfig newrelic.Config `json:"new_relic_config"`
SentryConfig sentry.Config `json:"sentry_config"`
AssignedTreatmentLogger config.AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"`
DebugConfig config.DebugConfig `json:"debug_config"`
DeploymentConfig config.DeploymentConfig `json:"deployment_config"`
MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config"`
ManagementService config.ManagementServiceConfig `json:"management_service"`
MonitoringConfig config.Monitoring `json:"monitoring_config"`
SwaggerConfig config.SwaggerConfig `json:"swagger_config"`
NewRelicConfig newrelic.Config `json:"new_relic_config"`
SentryConfig sentry.Config `json:"sentry_config"`
ManagementServicePollerConfig config.ManagementServicePollerConfig `json:"management_service_poller_config"`
}

type Variable struct {
Expand Down
44 changes: 13 additions & 31 deletions plugins/turing/manager/experiment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

xpclient "github.com/caraml-dev/xp/clients/management"
"github.com/caraml-dev/xp/common/api/schema"
common_mq_config "github.com/caraml-dev/xp/common/messagequeue"
_config "github.com/caraml-dev/xp/plugins/turing/config"
"github.com/caraml-dev/xp/treatment-service/config"
"github.com/go-playground/validator/v10"
Expand Down Expand Up @@ -142,37 +141,20 @@ func (em *experimentManager) MakeTreatmentServicePluginConfig(
projectID int,
) (*config.Config, error) {
pluginConfig := &config.Config{
Port: em.TreatmentServicePluginConfig.Port,
ProjectIds: []string{strconv.Itoa(projectID)},
AssignedTreatmentLogger: em.TreatmentServicePluginConfig.AssignedTreatmentLogger,
DebugConfig: em.TreatmentServicePluginConfig.DebugConfig,
DeploymentConfig: em.TreatmentServicePluginConfig.DeploymentConfig,
ManagementService: em.TreatmentServicePluginConfig.ManagementService,
MonitoringConfig: em.TreatmentServicePluginConfig.MonitoringConfig,
SwaggerConfig: em.TreatmentServicePluginConfig.SwaggerConfig,
NewRelicConfig: em.TreatmentServicePluginConfig.NewRelicConfig,
SentryConfig: em.TreatmentServicePluginConfig.SentryConfig,
SegmenterConfig: *treatmentServiceConfig.SegmenterConfig,
}
messageQueueKind := *treatmentServiceConfig.MessageQueueConfig.Kind
switch messageQueueKind {
case schema.MessageQueueKindPubsub:
pluginConfig.MessageQueueConfig = common_mq_config.MessageQueueConfig{
Kind: "pubsub",
PubSubConfig: &common_mq_config.PubSubConfig{
Project: *treatmentServiceConfig.MessageQueueConfig.PubSub.Project,
TopicName: *treatmentServiceConfig.MessageQueueConfig.PubSub.TopicName,
PubSubTimeoutSeconds: em.TreatmentServicePluginConfig.PubSubTimeoutSeconds,
},
}
case schema.MessageQueueKindNoop:
pluginConfig.MessageQueueConfig = common_mq_config.MessageQueueConfig{
Kind: "",
}
default:
return nil, fmt.Errorf("invalid message queue kind (%s) was provided", messageQueueKind)
Port: em.TreatmentServicePluginConfig.Port,
ProjectIds: []string{strconv.Itoa(projectID)},
AssignedTreatmentLogger: em.TreatmentServicePluginConfig.AssignedTreatmentLogger,
DebugConfig: em.TreatmentServicePluginConfig.DebugConfig,
DeploymentConfig: em.TreatmentServicePluginConfig.DeploymentConfig,
MessageQueueConfig: em.TreatmentServicePluginConfig.MessageQueueConfig,
ManagementService: em.TreatmentServicePluginConfig.ManagementService,
MonitoringConfig: em.TreatmentServicePluginConfig.MonitoringConfig,
SwaggerConfig: em.TreatmentServicePluginConfig.SwaggerConfig,
NewRelicConfig: em.TreatmentServicePluginConfig.NewRelicConfig,
SentryConfig: em.TreatmentServicePluginConfig.SentryConfig,
SegmenterConfig: *treatmentServiceConfig.SegmenterConfig,
ManagementServicePollerConfig: em.TreatmentServicePluginConfig.ManagementServicePollerConfig,
}

return pluginConfig, nil
}

Expand Down
8 changes: 8 additions & 0 deletions plugins/turing/manager/experiment_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ func TestNewExperimentManager(t *testing.T) {
"environment_type": "dev",
"max_go_routines": 200
},
"message_queue_config": {
"kind": "dev",
"pub_sub_config": {
"project":"dev",
"topic_name":"xp-update",
"pub_sub_timeout_seconds": 30
}
},
"management_service": {
"authorization_enabled": true,
"url": "http://xp-management.global.io/api/xp/v1"
Expand Down
3 changes: 3 additions & 0 deletions plugins/turing/runner/experiment_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ func (er *experimentRunner) startBackgroundServices(
}
}()
}
if er.appContext.PollerService != nil {
er.appContext.PollerService.Start()
}
}

func (er *experimentRunner) getRequestParams(
Expand Down
7 changes: 7 additions & 0 deletions treatment-service/appcontext/appcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type AppContext struct {

AssignedTreatmentLogger *monitoring.AssignedTreatmentLogger
LocalStorage *models.LocalStorage
PollerService *services.PollerService
}

func NewAppContext(cfg *config.Config) (*AppContext, error) {
Expand Down Expand Up @@ -122,6 +123,11 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) {
return nil, err
}

var pollerService *services.PollerService
if cfg.ManagementServicePollerConfig.Enabled {
pollerService = services.NewPollerService(cfg.ManagementServicePollerConfig, localStorage)
}

appContext := &AppContext{
ExperimentService: experimentSvc,
MetricService: metricService,
Expand All @@ -131,6 +137,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) {
AssignedTreatmentLogger: logger,
MessageQueueService: messageQueueService,
LocalStorage: localStorage,
PollerService: pollerService,
}

return appContext, nil
Expand Down
5 changes: 2 additions & 3 deletions treatment-service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package config
import (
"fmt"
"strconv"
"time"

"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry"
Expand Down Expand Up @@ -96,8 +95,8 @@ type ManagementServiceConfig struct {
}

type ManagementServicePollerConfig struct {
Enabled bool `default:"false"`
PollInterval time.Duration `default:"30s"`
Enabled bool `json:"enabled" default:"false"`
PollIntervalSeconds int `json:"poll_interval" default:"30"`
}

func (c *Config) GetProjectIds() []models.ProjectId {
Expand Down
9 changes: 4 additions & 5 deletions treatment-service/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package config

import (
"testing"
"time"

"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry"
Expand Down Expand Up @@ -66,8 +65,8 @@ func TestDefaultConfigs(t *testing.T) {
SentryConfig: sentry.Config{Enabled: false, Labels: emptyStringMap},
SegmenterConfig: make(map[string]interface{}),
ManagementServicePollerConfig: ManagementServicePollerConfig{
Enabled: false,
PollInterval: 30 * time.Second,
Enabled: false,
PollIntervalSeconds: 30,
},
}
cfg, err := Load()
Expand Down Expand Up @@ -133,8 +132,8 @@ func TestLoadMultipleConfigs(t *testing.T) {
SentryConfig: sentry.Config{Enabled: true, DSN: "my.amazing.sentry.dsn", Labels: map[string]string{"app": "xp-treatment-service"}},
SegmenterConfig: map[string]interface{}{"s2_ids": map[string]interface{}{"mins2celllevel": 9, "maxs2celllevel": 15}},
ManagementServicePollerConfig: ManagementServicePollerConfig{
Enabled: false,
PollInterval: 30 * time.Second,
Enabled: false,
PollIntervalSeconds: 30,
},
}

Expand Down
4 changes: 0 additions & 4 deletions treatment-service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/deepmap/oapi-codegen v1.11.0
github.com/getkin/kin-openapi v0.94.0
github.com/go-chi/chi/v5 v5.0.7
github.com/go-playground/validator/v10 v10.11.1
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
github.com/golang/geo v0.0.0-20210211234256-740aa86cb551
github.com/google/go-cmp v0.6.0
Expand Down Expand Up @@ -104,8 +103,6 @@ require (
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/gofrs/flock v0.8.1 // indirect
Expand Down Expand Up @@ -139,7 +136,6 @@ require (
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
6 changes: 0 additions & 6 deletions treatment-service/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -333,18 +333,13 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU=
github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho=
github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA=
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ=
github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
github.com/go-sql-driver/mysql v1.3.0 h1:pgwjLi/dvffoP9aabwkT3AKpXQM93QARkjFhDDqC1UE=
github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
Expand Down Expand Up @@ -604,7 +599,6 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.7.2/go.mod h1:xkCDAdFCIf8jsFQ5NnbK7oqaF/yU1A1X20Ltm0OvSks=
github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/lestrrat-go/backoff/v2 v2.0.8/go.mod h1:rHP/q/r9aT27n24JQLa7JhSQZCKBBOiM/uP402WwN8Y=
github.com/lestrrat-go/blackmagic v1.0.0/go.mod h1:TNgH//0vYSs8VXDCfkZLgIrVTTXQELZffUV0tz3MtdQ=
Expand Down
17 changes: 4 additions & 13 deletions treatment-service/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type Server struct {
subscribe bool
// cleanup captures all the actions to be executed on server shut down
cleanup []func()
// poller captures the poller instance
poller *Poller
}

// NewServer creates and configures an APIServer serving all application routes.
Expand Down Expand Up @@ -108,11 +106,6 @@ func NewServer(configFiles []string) (*Server, error) {
subscribe = true
}

var poller *Poller
if cfg.ManagementServicePollerConfig.Enabled {
poller = NewPoller(cfg.ManagementServicePollerConfig, appCtx.LocalStorage)
}

srv := http.Server{
Addr: cfg.ListenAddress(),
Handler: mux,
Expand All @@ -123,7 +116,6 @@ func NewServer(configFiles []string) (*Server, error) {
appContext: appCtx,
subscribe: subscribe,
cleanup: cleanup,
poller: poller,
}, nil
}

Expand All @@ -141,11 +133,6 @@ func (srv *Server) Start() {
}()
log.Printf("Listening on %s\n", srv.Addr)

if srv.poller != nil {
log.Println("Starting poller...")
srv.poller.Start()
}

stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)

Expand Down Expand Up @@ -193,5 +180,9 @@ func (srv *Server) startBackgroundService(errChannel chan error) context.CancelF
}
}()

if srv.appContext.PollerService != nil {
srv.appContext.PollerService.Start()
}

return cancel
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server
package services

import (
"log"
Expand All @@ -8,31 +8,33 @@ import (
"github.com/caraml-dev/xp/treatment-service/models"
)

type Poller struct {
type PollerService struct {
pollerConfig config.ManagementServicePollerConfig
localStorage *models.LocalStorage
stopChannel chan struct{}
}

// NewPoller creates a new Poller instance with the given configuration and local storage.
// NewPollerService creates a new PollerService instance with the given configuration and local storage.
// pollerConfig: configuration for the poller
// localStorage: local storage to be used by the poller
func NewPoller(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *Poller {
return &Poller{
func NewPollerService(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *PollerService {
return &PollerService{
pollerConfig: pollerConfig,
localStorage: localStorage,
stopChannel: make(chan struct{}),
}
}

func (p *Poller) Start() {
ticker := time.NewTicker(p.pollerConfig.PollInterval)
func (p *PollerService) Start() {
log.Println("Starting management service poller service...")
pollInterval := time.Duration(p.pollerConfig.PollIntervalSeconds) * time.Second
ticker := time.NewTicker(pollInterval)
go func() {
for {
select {
case <-ticker.C:
err := p.Refresh()
log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval)
log.Printf("Polling at %v with interval %v", time.Now(), pollInterval)
if err != nil {
log.Printf("Error updating local storage: %v", err)
continue
Expand All @@ -45,11 +47,11 @@ func (p *Poller) Start() {
}()
}

func (p *Poller) Stop() {
func (p *PollerService) Stop() {
close(p.stopChannel)
}

func (p *Poller) Refresh() error {
func (p *PollerService) Refresh() error {
err := p.localStorage.Init()
return err
}

0 comments on commit 649e5f6

Please sign in to comment.