Skip to content

Commit

Permalink
Added config watcher and supporting metrics reloading
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade committed Apr 4, 2024
1 parent a5ad247 commit 64d701b
Show file tree
Hide file tree
Showing 17 changed files with 558 additions and 73 deletions.
1 change: 1 addition & 0 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func initFlags() {
rootCmd.PersistentFlags().IntVar(&opts.Profile.Port, "profile.port", 0, "Go pprof tool port (default: disabled)")
rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field")
rootCmd.PersistentFlags().StringVar(&opts.Parameters, "parameters", "", "json of config file parameters field")
rootCmd.PersistentFlags().StringVar(&opts.DynamicParameters, "dynamicParameters", "", "json of configmap location for dynamic parameters")
rootCmd.PersistentFlags().StringVar(&opts.MetricsSettings, "metricsSettings", "", "json for global metrics settings")
}

Expand Down
42 changes: 32 additions & 10 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,35 @@ import (
)

type Options struct {
PipeLine string
Parameters string
MetricsSettings string
Health Health
Profile Profile
PipeLine string
Parameters string
DynamicParameters string
MetricsSettings string
Health Health
Profile Profile
}

// (nolint => needs refactoring)
//
//nolint:revive
type ConfigFileStruct struct {
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
DynamicParameters DynamicParameters `yaml:"dynamicParameters,omitempty" json:"dynamicParameters,omitempty"`
}

type DynamicParameters struct {
Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"`
Name string `yaml:"name,omitempty" json:"name,omitempty"`
FileName string `yaml:"fileName,omitempty" json:"fileName,omitempty"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
}

type HotReloadStruct struct {
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
}

type Health struct {
Expand Down Expand Up @@ -154,6 +167,15 @@ func ParseConfig(opts *Options) (ConfigFileStruct, error) {
}
logrus.Debugf("params = %v ", out.Parameters)

if opts.DynamicParameters != "" {
err = JSONUnmarshalStrict([]byte(opts.DynamicParameters), &out.DynamicParameters)
if err != nil {
logrus.Errorf("error when parsing dynamic pipeline parameters: %v", err)
return out, err
}
logrus.Debugf("dynamicParams = %v ", out.DynamicParameters)
}

if opts.MetricsSettings != "" {
err = JSONUnmarshalStrict([]byte(opts.MetricsSettings), &out.MetricsSettings)
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions pkg/config/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,33 @@ func (b *PipelineBuilderStage) GetStageParams() []StageParam {
return b.pipeline.config
}

func isStaticParam(param StageParam) bool {
if param.Encode != nil && param.Encode.Type == api.PromType {
return false
}
return true
}

func (b *PipelineBuilderStage) GetStaticStageParams() []StageParam {
res := []StageParam{}
for _, param := range b.pipeline.config {
if isStaticParam(param) {
res = append(res, param)
}
}
return res
}

func (b *PipelineBuilderStage) GetDynamicStageParams() []StageParam {
res := []StageParam{}
for _, param := range b.pipeline.config {
if !isStaticParam(param) {
res = append(res, param)
}
}
return res
}

// IntoConfigFileStruct injects the current pipeline and params in the provided ConfigFileStruct object.
func (b *PipelineBuilderStage) IntoConfigFileStruct(cfs *ConfigFileStruct) *ConfigFileStruct {
cfs.Pipeline = b.GetStages()
Expand Down
5 changes: 5 additions & 0 deletions pkg/pipeline/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ type encodeNone struct {

type Encoder interface {
Encode(in config.GenericMap)
Update(config.StageParam)
}

// Encode encodes a flow before being stored
func (t *encodeNone) Encode(in config.GenericMap) {
t.prevRecord = in
}

func (t *encodeNone) Update(_ config.StageParam) {
log.Warn("Encode None, update not supported")
}

// NewEncodeNone create a new encode
func NewEncodeNone() (Encoder, error) {
log.Debugf("entering NewEncodeNone")
Expand Down
4 changes: 4 additions & 0 deletions pkg/pipeline/encode/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (r *encodeKafka) Encode(entry config.GenericMap) {
}
}

func (r *encodeKafka) Update(_ config.StageParam) {
log.Warn("Encode Kafka, update not supported")
}

// NewEncodeKafka create a new writer to kafka
func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
log.Debugf("entering NewEncodeKafka")
Expand Down
Loading

0 comments on commit 64d701b

Please sign in to comment.