Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow overriding the device id regex per metric #116

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type MetricPerTopicConfig struct {
MetricNameRegex *Regexp `yaml:"metric_name_regex"` // Default
}

// Metrics Config is a mapping between a metric send on mqtt to a prometheus metric
// Metrics Config is a mapping between a metric sent on mqtt and a prometheus metric
type MetricConfig struct {
PrometheusName string `yaml:"prom_name"`
MQTTName string `yaml:"mqtt_name"`
Expand All @@ -137,6 +137,7 @@ type MetricConfig struct {
ConstantLabels map[string]string `yaml:"const_labels"`
StringValueMapping *StringValueMappingConfig `yaml:"string_value_mapping"`
MQTTValueScale float64 `yaml:"mqtt_value_scale"`
DeviceIDRegex *Regexp `yaml:"device_id_regex"`
}

// StringValueMappingConfig defines the mapping from string to float
Expand Down Expand Up @@ -184,37 +185,45 @@ func LoadConfig(configFile string) (Config, error) {
if cfg.MQTT.DeviceIDRegex == nil {
cfg.MQTT.DeviceIDRegex = MQTTConfigDefaults.DeviceIDRegex
}
var validRegex bool
for _, name := range cfg.MQTT.DeviceIDRegex.RegEx().SubexpNames() {
if name == DeviceIDRegexGroup {
validRegex = true
}
}
if !validRegex {
if !validDeviceIDRegexp(cfg.MQTT.DeviceIDRegex, DeviceIDRegexGroup) {
return Config{}, fmt.Errorf("device id regex %q does not contain required regex group %q", cfg.MQTT.DeviceIDRegex.pattern, DeviceIDRegexGroup)
}

if cfg.MQTT.ObjectPerTopicConfig != nil && cfg.MQTT.MetricPerTopicConfig != nil {
return Config{}, fmt.Errorf("only one of object_per_topic_config and metric_per_topic_config can be specified")
}

// Check DeviceIDRegex on each MetricConfig and populate with the default if it's not set
for idx, config := range cfg.Metrics {
if config.DeviceIDRegex != nil {
if !validDeviceIDRegexp(config.DeviceIDRegex, DeviceIDRegexGroup) {
return Config{}, fmt.Errorf("device id regex %q does not contain required regex group %q", config.DeviceIDRegex.pattern, DeviceIDRegexGroup)
}
} else {
cfg.Metrics[idx].DeviceIDRegex = cfg.MQTT.DeviceIDRegex
}
}

if cfg.MQTT.ObjectPerTopicConfig == nil && cfg.MQTT.MetricPerTopicConfig == nil {
cfg.MQTT.ObjectPerTopicConfig = &ObjectPerTopicConfig{
Encoding: EncodingJSON,
}
}

if cfg.MQTT.MetricPerTopicConfig != nil {
validRegex = false
for _, name := range cfg.MQTT.MetricPerTopicConfig.MetricNameRegex.RegEx().SubexpNames() {
if name == MetricNameRegexGroup {
validRegex = true
}
}
if !validRegex {
if !validDeviceIDRegexp(cfg.MQTT.MetricPerTopicConfig.MetricNameRegex, MetricNameRegexGroup) {
return Config{}, fmt.Errorf("metric name regex %q does not contain required regex group %q", cfg.MQTT.DeviceIDRegex.pattern, MetricNameRegexGroup)
}
}

return cfg, nil
}

func validDeviceIDRegexp(regexp *Regexp, requiredGroup string) bool {
for _, name := range regexp.RegEx().SubexpNames() {
if name == requiredGroup {
return true
}
}
return false
}
10 changes: 6 additions & 4 deletions pkg/metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type Collector interface {
prometheus.Collector
Observe(deviceID string, collection MetricCollection)
Observe(collection MetricCollection)
}

type MemoryCachedCollector struct {
Expand All @@ -27,6 +27,7 @@ type Metric struct {
ValueType prometheus.ValueType
IngestTime time.Time
Topic string
DeviceID string
}

type CacheItem struct {
Expand All @@ -48,13 +49,14 @@ func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricC
}
}

func (c *MemoryCachedCollector) Observe(deviceID string, collection MetricCollection) {
func (c *MemoryCachedCollector) Observe(collection MetricCollection) {
for _, m := range collection {
// TODO: Now that DeviceID is part of Metric, perhaps we don't need the CacheItem any more?
item := CacheItem{
DeviceID: deviceID,
DeviceID: m.DeviceID,
Metric: m,
}
c.cache.Set(fmt.Sprintf("%s-%s", deviceID, m.Description.String()), item, gocache.DefaultExpiration)
c.cache.Set(fmt.Sprintf("%s-%s", m.DeviceID, m.Description.String()), item, gocache.DefaultExpiration)
}
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/metrics/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
gojsonq "github.com/thedevsaddam/gojsonq/v2"
)

type Extractor func(topic string, payload []byte, deviceID string) (MetricCollection, error)
type Extractor func(topic string, payload []byte) (MetricCollection, error)

func NewJSONObjectExtractor(p Parser) Extractor {
return func(topic string, payload []byte, deviceID string) (MetricCollection, error) {
return func(topic string, payload []byte) (MetricCollection, error) {
var mc MetricCollection
parsed := gojsonq.New(gojsonq.SetSeparator(p.separator)).FromString(string(payload))

Expand All @@ -22,7 +22,7 @@ func NewJSONObjectExtractor(p Parser) Extractor {
}

// Find a valid metrics config
config, found := p.findMetricConfig(path, deviceID)
config, deviceId, found := p.findMetricConfig(path, topic)
if !found {
continue
}
Expand All @@ -32,21 +32,22 @@ func NewJSONObjectExtractor(p Parser) Extractor {
return nil, fmt.Errorf("failed to parse valid metric value: %w", err)
}
m.Topic = topic
m.DeviceID = deviceId
mc = append(mc, m)
}
return mc, nil
}
}

func NewMetricPerTopicExtractor(p Parser, metricNameRegex *config.Regexp) Extractor {
return func(topic string, payload []byte, deviceID string) (MetricCollection, error) {
return func(topic string, payload []byte) (MetricCollection, error) {
metricName := metricNameRegex.GroupValue(topic, config.MetricNameRegexGroup)
if metricName == "" {
return nil, fmt.Errorf("failed to find valid metric in topic path")
}

// Find a valid metrics config
config, found := p.findMetricConfig(metricName, deviceID)
config, deviceId, found := p.findMetricConfig(metricName, topic)
if !found {
return nil, nil
}
Expand All @@ -56,6 +57,7 @@ func NewMetricPerTopicExtractor(p Parser, metricNameRegex *config.Regexp) Extrac
return nil, fmt.Errorf("failed to parse metric: %w", err)
}
m.Topic = topic
m.DeviceID = deviceId
return MetricCollection{m}, nil
}
}
55 changes: 42 additions & 13 deletions pkg/metrics/extractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) {
}
type args struct {
metricPath string
deviceID string
value string
}
tests := []struct {
Expand All @@ -37,21 +36,22 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) {
PrometheusName: "temperature",
MQTTName: "SDS0X1.PM2.5",
ValueType: "gauge",
DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex,
},
},
},
},
args: args{
metricPath: "topic",
deviceID: "dht22",
metricPath: "devices/dht22",
value: "{\"SDS0X1\":{\"PM2\":{\"5\":4.9}}}",
},
want: Metric{
Description: prometheus.NewDesc("temperature", "", []string{"sensor", "topic"}, nil),
ValueType: prometheus.GaugeValue,
Value: 4.9,
IngestTime: testNow(),
Topic: "topic",
Topic: "devices/dht22",
DeviceID: "dht22",
},
}, {
name: "string value with dots in path",
Expand All @@ -63,21 +63,22 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) {
PrometheusName: "temperature",
MQTTName: "SDS0X1->PM2.5",
ValueType: "gauge",
DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex,
},
},
},
},
args: args{
metricPath: "topic",
deviceID: "dht22",
metricPath: "devices/dht22",
value: "{\"SDS0X1\":{\"PM2.5\":4.9,\"PM10\":8.5}}",
},
want: Metric{
Description: prometheus.NewDesc("temperature", "", []string{"sensor", "topic"}, nil),
ValueType: prometheus.GaugeValue,
Value: 4.9,
IngestTime: testNow(),
Topic: "topic",
Topic: "devices/dht22",
DeviceID: "dht22",
},
}, {
name: "metric matching SensorNameFilter",
Expand All @@ -90,21 +91,22 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) {
MQTTName: "temperature",
ValueType: "gauge",
SensorNameFilter: *config.MustNewRegexp(".*22$"),
DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex,
},
},
},
},
args: args{
metricPath: "topic",
deviceID: "dht22",
metricPath: "devices/dht22",
value: "{\"temperature\": 8.5}",
},
want: Metric{
Description: prometheus.NewDesc("temperature", "", []string{"sensor", "topic"}, nil),
ValueType: prometheus.GaugeValue,
Value: 8.5,
IngestTime: testNow(),
Topic: "topic",
Topic: "devices/dht22",
DeviceID: "dht22",
},
}, {
name: "metric not matching SensorNameFilter",
Expand All @@ -117,17 +119,44 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) {
MQTTName: "temperature",
ValueType: "gauge",
SensorNameFilter: *config.MustNewRegexp(".*fail$"),
DeviceIDRegex: config.MQTTConfigDefaults.DeviceIDRegex,
},
},
},
},
args: args{
metricPath: "topic",
deviceID: "dht22",
metricPath: "devices/dht22",
value: "{\"temperature\": 8.5}",
},
want: Metric{},
noValue: true,
}, {
name: "custom deviceID regex",
separator: ".",
fields: fields{
map[string][]config.MetricConfig{
"temperature": []config.MetricConfig{
{
PrometheusName: "temperature",
MQTTName: "temperature",
ValueType: "gauge",
DeviceIDRegex: config.MustNewRegexp("(.*/)?(?P<deviceid>.*)/SENSOR"),
},
},
},
},
args: args{
metricPath: "devices/dht22/SENSOR",
value: "{\"temperature\": 8.5}",
},
want: Metric{
Description: prometheus.NewDesc("temperature", "", []string{"sensor", "topic"}, nil),
ValueType: prometheus.GaugeValue,
Value: 8.5,
IngestTime: testNow(),
Topic: "devices/dht22/SENSOR",
DeviceID: "dht22",
},
},
}
for _, tt := range tests {
Expand All @@ -138,7 +167,7 @@ func TestNewJSONObjectExtractor_parseMetric(t *testing.T) {
}
extractor := NewJSONObjectExtractor(p)

got, err := extractor(tt.args.metricPath, []byte(tt.args.value), tt.args.deviceID)
got, err := extractor(tt.args.metricPath, []byte(tt.args.value))
if (err != nil) != tt.wantErr {
t.Errorf("parseMetric() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
13 changes: 4 additions & 9 deletions pkg/metrics/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package metrics

import (
"fmt"

"go.uber.org/zap"

"github.com/eclipse/paho.mqtt.golang"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
)

Expand All @@ -28,12 +29,11 @@ func NewIngest(collector Collector, extractor Extractor, deviceIDRegex *config.R
}

func (i *Ingest) store(topic string, payload []byte) error {
deviceID := i.deviceID(topic)
mc, err := i.extractor(topic, payload, deviceID)
mc, err := i.extractor(topic, payload)
if err != nil {
return fmt.Errorf("failed to extract metric values from topic: %w", err)
}
i.collector.Observe(deviceID, mc)
i.collector.Observe(mc)
return nil
}

Expand All @@ -49,8 +49,3 @@ func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHand
i.CountSuccess(m.Topic())
}
}

// deviceID uses the configured DeviceIDRegex to extract the device ID from the given mqtt topic path.
func (i *Ingest) deviceID(topic string) string {
return i.deviceIDRegex.GroupValue(topic, config.DeviceIDRegexGroup)
}
14 changes: 8 additions & 6 deletions pkg/metrics/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ func (p *Parser) config() map[string][]config.MetricConfig {
return p.metricConfigs
}

// validMetric returns config matching the metric and deviceID
// Second return value indicates if config was found.
func (p *Parser) findMetricConfig(metric string, deviceID string) (config.MetricConfig, bool) {
// validMetric returns config matching the metric and the deviceID extracted from the topic
// Third return value indicates if config was found.
func (p *Parser) findMetricConfig(metric string, topic string) (config.MetricConfig, string, bool) {
for _, c := range p.metricConfigs[metric] {
if c.SensorNameFilter.Match(deviceID) {
return c, true
// use DeviceIDRegex to extract the device ID from the given mqtt topic path
deviceId := c.DeviceIDRegex.GroupValue(topic, config.DeviceIDRegexGroup)
if c.SensorNameFilter.Match(deviceId) {
return c, deviceId, true
}
}
return config.MetricConfig{}, false
return config.MetricConfig{}, "", false
}

// parseMetric parses the given value according to the given deviceID and metricPath. The config allows to
Expand Down
Loading