From bbfe73675524993d9b5eda6ad32c99da9c8756bc Mon Sep 17 00:00:00 2001 From: Will Moss Date: Wed, 4 Jan 2023 15:17:18 -0800 Subject: [PATCH] Allow overriding the device id regex per metric This allows the regex used to extract the device id to be overriden per metric. This required a little reorganizing of where the device id is parsed from the topic (and therefore how it's passed between functions). The default parsing remains the same. --- pkg/config/config.go | 39 +++++++++++++++---------- pkg/metrics/collector.go | 10 ++++--- pkg/metrics/extractor.go | 12 ++++---- pkg/metrics/extractor_test.go | 55 ++++++++++++++++++++++++++--------- pkg/metrics/ingest.go | 13 +++------ pkg/metrics/parser.go | 14 +++++---- pkg/metrics/parser_test.go | 49 ++++++++++++++++++++----------- 7 files changed, 123 insertions(+), 69 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index c28a40e..46ac156 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -126,7 +126,7 @@ type MetricPerTopicConfig struct { MetricNameRegex *Regexp `yaml:"metric_name_regex"` // Default } -// Metrics Config is a mapping between a metric send on mqtt to a prometheus metric +// Metrics Config is a mapping between a metric sent on mqtt and a prometheus metric type MetricConfig struct { PrometheusName string `yaml:"prom_name"` MQTTName string `yaml:"mqtt_name"` @@ -137,6 +137,7 @@ type MetricConfig struct { ConstantLabels map[string]string `yaml:"const_labels"` StringValueMapping *StringValueMappingConfig `yaml:"string_value_mapping"` MQTTValueScale float64 `yaml:"mqtt_value_scale"` + DeviceIDRegex *Regexp `yaml:"device_id_regex"` } // StringValueMappingConfig defines the mapping from string to float @@ -184,13 +185,7 @@ func LoadConfig(configFile string) (Config, error) { if cfg.MQTT.DeviceIDRegex == nil { cfg.MQTT.DeviceIDRegex = MQTTConfigDefaults.DeviceIDRegex } - var validRegex bool - for _, name := range cfg.MQTT.DeviceIDRegex.RegEx().SubexpNames() { - if name == DeviceIDRegexGroup { - validRegex = true - } - } - if !validRegex { + if !validDeviceIDRegexp(cfg.MQTT.DeviceIDRegex, DeviceIDRegexGroup) { return Config{}, fmt.Errorf("device id regex %q does not contain required regex group %q", cfg.MQTT.DeviceIDRegex.pattern, DeviceIDRegexGroup) } @@ -198,6 +193,17 @@ func LoadConfig(configFile string) (Config, error) { return Config{}, fmt.Errorf("only one of object_per_topic_config and metric_per_topic_config can be specified") } + // Check DeviceIDRegex on each MetricConfig and populate with the default if it's not set + for idx, config := range cfg.Metrics { + if config.DeviceIDRegex != nil { + if !validDeviceIDRegexp(config.DeviceIDRegex, DeviceIDRegexGroup) { + return Config{}, fmt.Errorf("device id regex %q does not contain required regex group %q", config.DeviceIDRegex.pattern, DeviceIDRegexGroup) + } + } else { + cfg.Metrics[idx].DeviceIDRegex = cfg.MQTT.DeviceIDRegex + } + } + if cfg.MQTT.ObjectPerTopicConfig == nil && cfg.MQTT.MetricPerTopicConfig == nil { cfg.MQTT.ObjectPerTopicConfig = &ObjectPerTopicConfig{ Encoding: EncodingJSON, @@ -205,16 +211,19 @@ func LoadConfig(configFile string) (Config, error) { } if cfg.MQTT.MetricPerTopicConfig != nil { - validRegex = false - for _, name := range cfg.MQTT.MetricPerTopicConfig.MetricNameRegex.RegEx().SubexpNames() { - if name == MetricNameRegexGroup { - validRegex = true - } - } - if !validRegex { + if !validDeviceIDRegexp(cfg.MQTT.MetricPerTopicConfig.MetricNameRegex, MetricNameRegexGroup) { return Config{}, fmt.Errorf("metric name regex %q does not contain required regex group %q", cfg.MQTT.DeviceIDRegex.pattern, MetricNameRegexGroup) } } return cfg, nil } + +func validDeviceIDRegexp(regexp *Regexp, requiredGroup string) bool { + for _, name := range regexp.RegEx().SubexpNames() { + if name == requiredGroup { + return true + } + } + return false +} diff --git a/pkg/metrics/collector.go b/pkg/metrics/collector.go index 188957a..025de2a 100644 --- a/pkg/metrics/collector.go +++ b/pkg/metrics/collector.go @@ -12,7 +12,7 @@ import ( type Collector interface { prometheus.Collector - Observe(deviceID string, collection MetricCollection) + Observe(collection MetricCollection) } type MemoryCachedCollector struct { @@ -27,6 +27,7 @@ type Metric struct { ValueType prometheus.ValueType IngestTime time.Time Topic string + DeviceID string } type CacheItem struct { @@ -48,13 +49,14 @@ func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricC } } -func (c *MemoryCachedCollector) Observe(deviceID string, collection MetricCollection) { +func (c *MemoryCachedCollector) Observe(collection MetricCollection) { for _, m := range collection { + // TODO: Now that DeviceID is part of Metric, perhaps we don't need the CacheItem any more? item := CacheItem{ - DeviceID: deviceID, + DeviceID: m.DeviceID, Metric: m, } - c.cache.Set(fmt.Sprintf("%s-%s", deviceID, m.Description.String()), item, gocache.DefaultExpiration) + c.cache.Set(fmt.Sprintf("%s-%s", m.DeviceID, m.Description.String()), item, gocache.DefaultExpiration) } } diff --git a/pkg/metrics/extractor.go b/pkg/metrics/extractor.go index 1f2ed47..101b665 100644 --- a/pkg/metrics/extractor.go +++ b/pkg/metrics/extractor.go @@ -7,10 +7,10 @@ import ( gojsonq "github.com/thedevsaddam/gojsonq/v2" ) -type Extractor func(topic string, payload []byte, deviceID string) (MetricCollection, error) +type Extractor func(topic string, payload []byte) (MetricCollection, error) func NewJSONObjectExtractor(p Parser) Extractor { - return func(topic string, payload []byte, deviceID string) (MetricCollection, error) { + return func(topic string, payload []byte) (MetricCollection, error) { var mc MetricCollection parsed := gojsonq.New(gojsonq.SetSeparator(p.separator)).FromString(string(payload)) @@ -22,7 +22,7 @@ func NewJSONObjectExtractor(p Parser) Extractor { } // Find a valid metrics config - config, found := p.findMetricConfig(path, deviceID) + config, deviceId, found := p.findMetricConfig(path, topic) if !found { continue } @@ -32,6 +32,7 @@ func NewJSONObjectExtractor(p Parser) Extractor { return nil, fmt.Errorf("failed to parse valid metric value: %w", err) } m.Topic = topic + m.DeviceID = deviceId mc = append(mc, m) } return mc, nil @@ -39,14 +40,14 @@ func NewJSONObjectExtractor(p Parser) Extractor { } func NewMetricPerTopicExtractor(p Parser, metricNameRegex *config.Regexp) Extractor { - return func(topic string, payload []byte, deviceID string) (MetricCollection, error) { + return func(topic string, payload []byte) (MetricCollection, error) { metricName := metricNameRegex.GroupValue(topic, config.MetricNameRegexGroup) if metricName == "" { return nil, fmt.Errorf("failed to find valid metric in topic path") } // Find a valid metrics config - config, found := p.findMetricConfig(metricName, deviceID) + config, deviceId, found := p.findMetricConfig(metricName, topic) if !found { return nil, nil } @@ -56,6 +57,7 @@ func NewMetricPerTopicExtractor(p Parser, metricNameRegex *config.Regexp) Extrac return nil, fmt.Errorf("failed to parse metric: %w", err) } m.Topic = topic + m.DeviceID = deviceId return MetricCollection{m}, nil } } diff --git a/pkg/metrics/extractor_test.go b/pkg/metrics/extractor_test.go index 4bdfe7c..f9090c1 100644 --- a/pkg/metrics/extractor_test.go +++ b/pkg/metrics/extractor_test.go @@ -15,7 +15,6 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) { } type args struct { metricPath string - deviceID string value string } tests := []struct { @@ -37,13 +36,13 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) { PrometheusName: "temperature", MQTTName: "SDS0X1.PM2.5", ValueType: "gauge", + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ - metricPath: "topic", - deviceID: "dht22", + metricPath: "devices/dht22", value: "{\"SDS0X1\":{\"PM2\":{\"5\":4.9}}}", }, want: Metric{ @@ -51,7 +50,8 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) { ValueType: prometheus.GaugeValue, Value: 4.9, IngestTime: testNow(), - Topic: "topic", + Topic: "devices/dht22", + DeviceID: "dht22", }, }, { name: "string value with dots in path", @@ -63,13 +63,13 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) { PrometheusName: "temperature", MQTTName: "SDS0X1->PM2.5", ValueType: "gauge", + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ - metricPath: "topic", - deviceID: "dht22", + metricPath: "devices/dht22", value: "{\"SDS0X1\":{\"PM2.5\":4.9,\"PM10\":8.5}}", }, want: Metric{ @@ -77,7 +77,8 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) { ValueType: prometheus.GaugeValue, Value: 4.9, IngestTime: testNow(), - Topic: "topic", + Topic: "devices/dht22", + DeviceID: "dht22", }, }, { name: "metric matching SensorNameFilter", @@ -90,13 +91,13 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) { MQTTName: "temperature", ValueType: "gauge", SensorNameFilter: *config.MustNewRegexp(".*22$"), + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ - metricPath: "topic", - deviceID: "dht22", + metricPath: "devices/dht22", value: "{\"temperature\": 8.5}", }, want: Metric{ @@ -104,7 +105,8 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) { ValueType: prometheus.GaugeValue, Value: 8.5, IngestTime: testNow(), - Topic: "topic", + Topic: "devices/dht22", + DeviceID: "dht22", }, }, { name: "metric not matching SensorNameFilter", @@ -117,17 +119,44 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) { MQTTName: "temperature", ValueType: "gauge", SensorNameFilter: *config.MustNewRegexp(".*fail$"), + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ - metricPath: "topic", - deviceID: "dht22", + metricPath: "devices/dht22", value: "{\"temperature\": 8.5}", }, want: Metric{}, noValue: true, + }, { + name: "custom deviceID regex", + separator: ".", + fields: fields{ + map[string][]config.MetricConfig{ + "temperature": []config.MetricConfig{ + { + PrometheusName: "temperature", + MQTTName: "temperature", + ValueType: "gauge", + DeviceIDRegex: config.MustNewRegexp("(.*/)?(?P.*)/SENSOR"), + }, + }, + }, + }, + args: args{ + metricPath: "devices/dht22/SENSOR", + value: "{\"temperature\": 8.5}", + }, + want: Metric{ + Description: prometheus.NewDesc("temperature", "", []string{"sensor", "topic"}, nil), + ValueType: prometheus.GaugeValue, + Value: 8.5, + IngestTime: testNow(), + Topic: "devices/dht22/SENSOR", + DeviceID: "dht22", + }, }, } for _, tt := range tests { @@ -138,7 +167,7 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) { } extractor := NewJSONObjectExtractor(p) - got, err := extractor(tt.args.metricPath, []byte(tt.args.value), tt.args.deviceID) + got, err := extractor(tt.args.metricPath, []byte(tt.args.value)) if (err != nil) != tt.wantErr { t.Errorf("parseMetric() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/metrics/ingest.go b/pkg/metrics/ingest.go index 7b4e8cd..2ca9f52 100644 --- a/pkg/metrics/ingest.go +++ b/pkg/metrics/ingest.go @@ -2,9 +2,10 @@ package metrics import ( "fmt" + "go.uber.org/zap" - "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/hikhvar/mqtt2prometheus/pkg/config" ) @@ -28,12 +29,11 @@ func NewIngest(collector Collector, extractor Extractor, deviceIDRegex *config.R } func (i *Ingest) store(topic string, payload []byte) error { - deviceID := i.deviceID(topic) - mc, err := i.extractor(topic, payload, deviceID) + mc, err := i.extractor(topic, payload) if err != nil { return fmt.Errorf("failed to extract metric values from topic: %w", err) } - i.collector.Observe(deviceID, mc) + i.collector.Observe(mc) return nil } @@ -49,8 +49,3 @@ func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHand i.CountSuccess(m.Topic()) } } - -// deviceID uses the configured DeviceIDRegex to extract the device ID from the given mqtt topic path. -func (i *Ingest) deviceID(topic string) string { - return i.deviceIDRegex.GroupValue(topic, config.DeviceIDRegexGroup) -} diff --git a/pkg/metrics/parser.go b/pkg/metrics/parser.go index 2482239..3b9d5b4 100644 --- a/pkg/metrics/parser.go +++ b/pkg/metrics/parser.go @@ -34,15 +34,17 @@ func (p *Parser) config() map[string][]config.MetricConfig { return p.metricConfigs } -// validMetric returns config matching the metric and deviceID -// Second return value indicates if config was found. -func (p *Parser) findMetricConfig(metric string, deviceID string) (config.MetricConfig, bool) { +// validMetric returns config matching the metric and the deviceID extracted from the topic +// Third return value indicates if config was found. +func (p *Parser) findMetricConfig(metric string, topic string) (config.MetricConfig, string, bool) { for _, c := range p.metricConfigs[metric] { - if c.SensorNameFilter.Match(deviceID) { - return c, true + // use DeviceIDRegex to extract the device ID from the given mqtt topic path + deviceId := c.DeviceIDRegex.GroupValue(topic, config.DeviceIDRegexGroup) + if c.SensorNameFilter.Match(deviceId) { + return c, deviceId, true } } - return config.MetricConfig{}, false + return config.MetricConfig{}, "", false } // parseMetric parses the given value according to the given deviceID and metricPath. The config allows to diff --git a/pkg/metrics/parser_test.go b/pkg/metrics/parser_test.go index d23df61..7413e18 100644 --- a/pkg/metrics/parser_test.go +++ b/pkg/metrics/parser_test.go @@ -16,7 +16,7 @@ func TestParser_parseMetric(t *testing.T) { } type args struct { metricPath string - deviceID string + topic string value interface{} } tests := []struct { @@ -35,13 +35,14 @@ func TestParser_parseMetric(t *testing.T) { PrometheusName: "temperature", ValueType: "gauge", OmitTimestamp: true, + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "temperature", - deviceID: "dht22", + topic: "devices/home/dht22", value: 12.6, }, want: Metric{ @@ -60,13 +61,14 @@ func TestParser_parseMetric(t *testing.T) { { PrometheusName: "temperature", ValueType: "gauge", + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "temperature", - deviceID: "dht22", + topic: "devices/home/dht22", value: "12.6", }, want: Metric{ @@ -86,13 +88,14 @@ func TestParser_parseMetric(t *testing.T) { PrometheusName: "temperature", ValueType: "gauge", MQTTValueScale: 0.01, + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "temperature", - deviceID: "dht22", + topic: "devices/home/dht22", value: "12.6", }, want: Metric{ @@ -111,13 +114,14 @@ func TestParser_parseMetric(t *testing.T) { { PrometheusName: "temperature", ValueType: "gauge", + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "temperature", - deviceID: "dht22", + topic: "devices/home/dht22", value: "12.6.5", }, wantErr: true, @@ -130,13 +134,14 @@ func TestParser_parseMetric(t *testing.T) { { PrometheusName: "temperature", ValueType: "gauge", + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "temperature", - deviceID: "dht22", + topic: "devices/home/dht22", value: 12.6, }, want: Metric{ @@ -156,13 +161,14 @@ func TestParser_parseMetric(t *testing.T) { PrometheusName: "humidity", ValueType: "gauge", MQTTValueScale: 0.01, + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "humidity", - deviceID: "dht22", + topic: "devices/home/dht22", value: 12.6, }, want: Metric{ @@ -182,13 +188,14 @@ func TestParser_parseMetric(t *testing.T) { PrometheusName: "humidity", ValueType: "gauge", MQTTValueScale: -2, + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "humidity", - deviceID: "dht22", + topic: "devices/home/dht22", value: 12.6, }, want: Metric{ @@ -207,13 +214,14 @@ func TestParser_parseMetric(t *testing.T) { { PrometheusName: "enabled", ValueType: "gauge", + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "enabled", - deviceID: "dht22", + topic: "devices/home/dht22", value: true, }, want: Metric{ @@ -233,13 +241,14 @@ func TestParser_parseMetric(t *testing.T) { PrometheusName: "enabled", ValueType: "gauge", MQTTValueScale: 0.5, + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "enabled", - deviceID: "dht22", + topic: "devices/home/dht22", value: true, }, want: Metric{ @@ -258,13 +267,14 @@ func TestParser_parseMetric(t *testing.T) { { PrometheusName: "enabled", ValueType: "gauge", + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "enabled", - deviceID: "dht22", + topic: "devices/home/dht22", value: false, }, want: Metric{ @@ -289,13 +299,14 @@ func TestParser_parseMetric(t *testing.T) { "bar": 2, }, }, + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "enabled", - deviceID: "dht22", + topic: "devices/home/dht22", value: "foo", }, want: Metric{ @@ -321,13 +332,14 @@ func TestParser_parseMetric(t *testing.T) { "bar": 2, }, }, + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "enabled", - deviceID: "dht22", + topic: "devices/home/dht22", value: "asd", }, want: Metric{ @@ -352,13 +364,14 @@ func TestParser_parseMetric(t *testing.T) { "bar": 2, }, }, + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "enabled", - deviceID: "dht22", + topic: "devices/home/dht22", value: "asd", }, wantErr: true, @@ -378,13 +391,14 @@ func TestParser_parseMetric(t *testing.T) { "bar": 2, }, }, + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "enabled1", - deviceID: "dht22", + topic: "devices/home/dht22", value: "asd", }, wantErr: true, @@ -404,13 +418,14 @@ func TestParser_parseMetric(t *testing.T) { "bar": 2, }, }, + DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex, }, }, }, }, args: args{ metricPath: "enabled", - deviceID: "dht22", + topic: "devices/home/dht22", value: []int{3}, }, wantErr: true, @@ -423,7 +438,7 @@ func TestParser_parseMetric(t *testing.T) { } // Find a valid metrics config - config, found := p.findMetricConfig(tt.args.metricPath, tt.args.deviceID) + config, _, found := p.findMetricConfig(tt.args.metricPath, tt.args.topic) if !found { if !tt.wantErr { t.Errorf("MetricConfig not found")