Skip to content

Commit

Permalink
Treatment service Poller (#87)
Browse files Browse the repository at this point in the history
* Added changes for using queue only if poller is not enabled in config

* Added changes for poller config in treatment service

* Added changes for poller in treatment service

* Added changes for poller in treatment service

* Added changes for poller in treatment service

* Reverted poller changes for management-service

* Fixed treatment service config test case

* Refactored poller

* Fixed poller condition

* Refactored poller

* Removed unused struct

* Integrated PR review comments

* Integrated PR review comments

* Integrated PR review comments

* Added sample poller config

* Integrated PR review comments

* Integrated PR review comments

* Integrated PR review comments

* Integrated PR review comments

---------

Co-authored-by: Anirudh Rautela <[email protected]>
  • Loading branch information
rautelaanirudh and Anirudh Rautela authored Jan 2, 2025
1 parent d328d9e commit 397e59b
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 13 deletions.
2 changes: 2 additions & 0 deletions treatment-service/appcontext/appcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type AppContext struct {
SegmenterService services.SegmenterService

AssignedTreatmentLogger *monitoring.AssignedTreatmentLogger
LocalStorage *models.LocalStorage
}

func NewAppContext(cfg *config.Config) (*AppContext, error) {
Expand Down Expand Up @@ -129,6 +130,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) {
TreatmentService: treatmentSvc,
AssignedTreatmentLogger: logger,
MessageQueueService: messageQueueService,
LocalStorage: localStorage,
}

return appContext, nil
Expand Down
28 changes: 17 additions & 11 deletions treatment-service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package config
import (
"fmt"
"strconv"
"time"

"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry"

common_config "github.com/caraml-dev/xp/common/config"
common_mq_config "github.com/caraml-dev/xp/common/messagequeue"
"github.com/caraml-dev/xp/treatment-service/models"
Expand All @@ -24,16 +24,17 @@ type Config struct {
Port int `json:"port" default:"8080" validate:"required"`
ProjectIds []string `json:"project_ids" default:""`

AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"`
DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"`
NewRelicConfig newrelic.Config `json:"new_relic_config"`
SentryConfig sentry.Config `json:"sentry_config"`
DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"`
MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config" validate:"required,dive"`
ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"`
MonitoringConfig Monitoring `json:"monitoring_config"`
SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"`
SegmenterConfig map[string]interface{} `json:"segmenter_config"`
AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"`
DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"`
NewRelicConfig newrelic.Config `json:"new_relic_config"`
SentryConfig sentry.Config `json:"sentry_config"`
DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"`
MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config" validate:"required,dive"`
ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"`
MonitoringConfig Monitoring `json:"monitoring_config"`
SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"`
SegmenterConfig map[string]interface{} `json:"segmenter_config"`
ManagementServicePollerConfig ManagementServicePollerConfig `json:"management_service_poller_config" validate:"required,dive"`
}

type AssignedTreatmentLoggerConfig struct {
Expand Down Expand Up @@ -94,6 +95,11 @@ type ManagementServiceConfig struct {
AuthorizationEnabled bool `json:"authorization_enabled"`
}

type ManagementServicePollerConfig struct {
Enabled bool `default:"false"`
PollInterval time.Duration `default:"30s"`
}

func (c *Config) GetProjectIds() []models.ProjectId {
projectIds := make([]models.ProjectId, 0)
for _, projectIdString := range c.ProjectIds {
Expand Down
9 changes: 9 additions & 0 deletions treatment-service/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"testing"
"time"

"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry"
Expand Down Expand Up @@ -64,6 +65,10 @@ func TestDefaultConfigs(t *testing.T) {
},
SentryConfig: sentry.Config{Enabled: false, Labels: emptyStringMap},
SegmenterConfig: make(map[string]interface{}),
ManagementServicePollerConfig: ManagementServicePollerConfig{
Enabled: false,
PollInterval: 30 * time.Second,
},
}
cfg, err := Load()
require.NoError(t, err)
Expand Down Expand Up @@ -127,6 +132,10 @@ func TestLoadMultipleConfigs(t *testing.T) {
},
SentryConfig: sentry.Config{Enabled: true, DSN: "my.amazing.sentry.dsn", Labels: map[string]string{"app": "xp-treatment-service"}},
SegmenterConfig: map[string]interface{}{"s2_ids": map[string]interface{}{"mins2celllevel": 9, "maxs2celllevel": 15}},
ManagementServicePollerConfig: ManagementServicePollerConfig{
Enabled: false,
PollInterval: 30 * time.Second,
},
}

cfg, err := Load(configFiles...)
Expand Down
4 changes: 4 additions & 0 deletions treatment-service/config/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ SegmenterConfig:
S2_IDs:
MinS2CellLevel: 10
MaxS2CellLevel: 14

PollerConfig:
Enabled: true
PollInterval: 10s
4 changes: 4 additions & 0 deletions treatment-service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/deepmap/oapi-codegen v1.11.0
github.com/getkin/kin-openapi v0.94.0
github.com/go-chi/chi/v5 v5.0.7
github.com/go-playground/validator/v10 v10.11.1
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
github.com/golang/geo v0.0.0-20210211234256-740aa86cb551
github.com/google/go-cmp v0.6.0
Expand Down Expand Up @@ -103,6 +104,8 @@ require (
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/gofrs/flock v0.8.1 // indirect
Expand Down Expand Up @@ -136,6 +139,7 @@ require (
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
6 changes: 6 additions & 0 deletions treatment-service/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,18 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU=
github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho=
github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA=
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ=
github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
github.com/go-sql-driver/mysql v1.3.0 h1:pgwjLi/dvffoP9aabwkT3AKpXQM93QARkjFhDDqC1UE=
github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
Expand Down Expand Up @@ -599,6 +604,7 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.7.2/go.mod h1:xkCDAdFCIf8jsFQ5NnbK7oqaF/yU1A1X20Ltm0OvSks=
github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/lestrrat-go/backoff/v2 v2.0.8/go.mod h1:rHP/q/r9aT27n24JQLa7JhSQZCKBBOiM/uP402WwN8Y=
github.com/lestrrat-go/blackmagic v1.0.0/go.mod h1:TNgH//0vYSs8VXDCfkZLgIrVTTXQELZffUV0tz3MtdQ=
Expand Down
4 changes: 2 additions & 2 deletions treatment-service/models/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func (s *LocalStorage) DumpExperiments(filepath string) error {
return os.WriteFile(filepath, file, 0644)
}

func (s *LocalStorage) init() error {
func (s *LocalStorage) Init() error {
s.Lock()
defer s.Unlock()

Expand Down Expand Up @@ -592,7 +592,7 @@ func NewLocalStorage(
}
segmenterCache := make(map[ProjectId]map[string]schema.SegmenterType)
s := LocalStorage{managementClient: xpClient, subscribedProjectIds: projectIds, ProjectSegmenters: segmenterCache}
err = s.init()
err = s.Init()

return &s, err
}
Expand Down
55 changes: 55 additions & 0 deletions treatment-service/server/poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package server

import (
"log"
"time"

"github.com/caraml-dev/xp/treatment-service/config"
"github.com/caraml-dev/xp/treatment-service/models"
)

type Poller struct {
pollerConfig config.ManagementServicePollerConfig
localStorage *models.LocalStorage
stopChannel chan struct{}
}

// NewPoller creates a new Poller instance with the given configuration and local storage.
// pollerConfig: configuration for the poller
// localStorage: local storage to be used by the poller
func NewPoller(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *Poller {
return &Poller{
pollerConfig: pollerConfig,
localStorage: localStorage,
stopChannel: make(chan struct{}),
}
}

func (p *Poller) Start() {
ticker := time.NewTicker(p.pollerConfig.PollInterval)
go func() {
for {
select {
case <-ticker.C:
err := p.Refresh()
log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval)
if err != nil {
log.Printf("Error updating local storage: %v", err)
continue
}
case <-p.stopChannel:
ticker.Stop()
return
}
}
}()
}

func (p *Poller) Stop() {
close(p.stopChannel)
}

func (p *Poller) Refresh() error {
err := p.localStorage.Init()
return err
}
13 changes: 13 additions & 0 deletions treatment-service/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Server struct {
subscribe bool
// cleanup captures all the actions to be executed on server shut down
cleanup []func()
// poller captures the poller instance
poller *Poller
}

// NewServer creates and configures an APIServer serving all application routes.
Expand Down Expand Up @@ -106,6 +108,11 @@ func NewServer(configFiles []string) (*Server, error) {
subscribe = true
}

var poller *Poller
if cfg.ManagementServicePollerConfig.Enabled {
poller = NewPoller(cfg.ManagementServicePollerConfig, appCtx.LocalStorage)
}

srv := http.Server{
Addr: cfg.ListenAddress(),
Handler: mux,
Expand All @@ -116,6 +123,7 @@ func NewServer(configFiles []string) (*Server, error) {
appContext: appCtx,
subscribe: subscribe,
cleanup: cleanup,
poller: poller,
}, nil
}

Expand All @@ -133,6 +141,11 @@ func (srv *Server) Start() {
}()
log.Printf("Listening on %s\n", srv.Addr)

if srv.poller != nil {
log.Println("Starting poller...")
srv.poller.Start()
}

stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)

Expand Down

0 comments on commit 397e59b

Please sign in to comment.