From 952a5661c83e15505e2f6dc31e538efd1b6e57db Mon Sep 17 00:00:00 2001 From: Niklas Ciecior Date: Thu, 18 Jan 2024 13:22:48 +0100 Subject: [PATCH 1/7] Add variable support --- src/datasource.ts | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/datasource.ts b/src/datasource.ts index 072fa78..4da9e87 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -1,9 +1,18 @@ -import { DataSourceInstanceSettings } from '@grafana/data'; -import { DataSourceWithBackend } from '@grafana/runtime'; +import { DataSourceInstanceSettings, ScopedVars } from '@grafana/data'; +import { DataSourceWithBackend, getTemplateSrv } from '@grafana/runtime'; import { MqttDataSourceOptions, MqttQuery } from './types'; export class DataSource extends DataSourceWithBackend { constructor(instanceSettings: DataSourceInstanceSettings) { super(instanceSettings); } + + applyTemplateVariables(query: MqttQuery, scopedVars: ScopedVars): Record { + const resolvedQuery: MqttQuery = { + ...query, + topic: getTemplateSrv().replace(query.topic, scopedVars), + }; + + return resolvedQuery; + } } From 932b03bfc6acc217f907e62e3883d6d315b717f9 Mon Sep 17 00:00:00 2001 From: Niklas Ciecior Date: Wed, 24 Jan 2024 11:38:22 +0100 Subject: [PATCH 2/7] Add support for + and # --- pkg/mqtt/client.go | 24 +++++++++++++++++------- pkg/mqtt/topic.go | 8 ++++++++ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go index bd36c87..1f15278 100644 --- a/pkg/mqtt/client.go +++ b/pkg/mqtt/client.go @@ -71,12 +71,13 @@ func (c *client) IsConnected() bool { return c.client.IsConnectionOpen() } -func (c *client) HandleMessage(_ paho.Client, msg paho.Message) { +func (c *client) HandleMessage(topic string, payload []byte) { message := Message{ Timestamp: time.Now(), - Value: msg.Payload(), + Value: payload, } - c.topics.AddMessage(msg.Topic(), message) + + c.topics.AddMessage(topic, message) } func (c *client) GetTopic(reqPath string) (*Topic, bool) { @@ -104,9 +105,16 @@ func (c *client) Subscribe(reqPath string) *Topic { return t } - log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", t.Path) - if token := c.client.Subscribe(t.Path, 0, c.HandleMessage); token.Wait() && token.Error() != nil { - log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", t.Path, "error", token.Error()) + log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", topicPath) + + topic := resolveTopic(t.Path) + + if token := c.client.Subscribe(topic, 0, func(_ paho.Client, m paho.Message) { + // by wrapping HandleMessage we can directly get the correct topicPath for the incoming topic + // and don't need to regex it against + and #. + c.HandleMessage(topicPath, []byte(m.Payload())) + }); token.Wait() && token.Error() != nil { + log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topicPath, "error", token.Error()) } c.topics.Store(t) return t @@ -126,7 +134,9 @@ func (c *client) Unsubscribe(reqPath string) { } log.DefaultLogger.Debug("Unsubscribing from MQTT topic", "topic", t.Path) - if token := c.client.Unsubscribe(t.Path); token.Wait() && token.Error() != nil { + + topic := resolveTopic(t.Path) + if token := c.client.Unsubscribe(topic); token.Wait() && token.Error() != nil { log.DefaultLogger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", token.Error()) } } diff --git a/pkg/mqtt/topic.go b/pkg/mqtt/topic.go index a254454..8e7db7f 100644 --- a/pkg/mqtt/topic.go +++ b/pkg/mqtt/topic.go @@ -2,6 +2,7 @@ package mqtt import ( "path" + "strings" "sync" "time" @@ -97,3 +98,10 @@ func (tm *TopicMap) Store(t *Topic) { func (tm *TopicMap) Delete(key string) { tm.Map.Delete(key) } + +// replace all __PLUS__ with + and one __HASH__ with # +// Question: Why does grafana not allow + and # in query? +func resolveTopic(topic string) string { + resolvedTopic := strings.ReplaceAll(topic, "__PLUS__", "+") + return strings.Replace(resolvedTopic, "__HASH__", "#", -1) +} From cd1728c22473cd2e9914dc2a09a9d3c821e4dc3b Mon Sep 17 00:00:00 2001 From: Niklas Ciecior Date: Wed, 24 Jan 2024 17:27:43 +0100 Subject: [PATCH 3/7] Add support for + and # in the query editor --- src/datasource.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/datasource.ts b/src/datasource.ts index 4da9e87..b5d0b65 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -8,9 +8,12 @@ export class DataSource extends DataSourceWithBackend { + let resolvedTopic = getTemplateSrv().replace(query.topic, scopedVars); + resolvedTopic = resolvedTopic.replace(/\+/gi, '__PLUS__'); + resolvedTopic = resolvedTopic.replace(/\#/gi, '__HASH__'); const resolvedQuery: MqttQuery = { ...query, - topic: getTemplateSrv().replace(query.topic, scopedVars), + topic: resolvedTopic, }; return resolvedQuery; From 25a853ee745f86a88fbf0ceffa1eeeb2f52c3e75 Mon Sep 17 00:00:00 2001 From: Niklas Ciecior Date: Tue, 30 Jan 2024 13:40:09 +0100 Subject: [PATCH 4/7] add poc for publishing --- pkg/mqtt/client.go | 51 +++++++++++++++++++++++++++++++++++ pkg/mqtt/topic.go | 10 ++++--- pkg/plugin/datasource_test.go | 8 ++++-- pkg/plugin/query.go | 23 +++++++++++----- 4 files changed, 80 insertions(+), 12 deletions(-) diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go index 1f15278..d4a62f9 100644 --- a/pkg/mqtt/client.go +++ b/pkg/mqtt/client.go @@ -1,6 +1,8 @@ package mqtt import ( + "encoding/json" + "errors" "fmt" "math/rand" "path" @@ -15,6 +17,7 @@ type Client interface { GetTopic(string) (*Topic, bool) IsConnected() bool Subscribe(string) *Topic + Publish(string, map[string]any, string) (json.RawMessage, error) Unsubscribe(string) Dispose() } @@ -141,6 +144,54 @@ func (c *client) Unsubscribe(reqPath string) { } } +func (c *client) Publish(topic string, payload map[string]any, responseTopic string) (json.RawMessage, error) { + var response json.RawMessage + var err error + + if responseTopic == "" { + return nil, errors.New("response topic should not be empty") + } + + done := make(chan struct{}) + tokenSub := c.client.Subscribe(responseTopic, 2, func(c paho.Client, m paho.Message) { + response = m.Payload() + done <- struct{}{} + }) + + if !tokenSub.WaitTimeout(time.Second) && tokenSub.Error() != nil { + err = errors.Join(err, tokenSub.Error()) + return response, err + } + + defer c.client.Unsubscribe(responseTopic) + + data, errMarshal := json.Marshal(&payload) + if errMarshal != nil { + err = errors.Join(err, errMarshal) + return response, err + } + + token := c.client.Publish(topic, 2, false, data) + + if token.Error() != nil { + err = errors.Join(err, token.Error()) + return response, err + } + + if !token.WaitTimeout(time.Second) { + err = errors.Join(err, errors.New("publish timeout")) + return response, err + } + + select { + case <-done: + case <-time.After(time.Second): + err = errors.Join(err, errors.New("subscribe timeout")) + } + + return response, err +} + func (c *client) Dispose() { log.DefaultLogger.Info("MQTT Disconnecting") c.client.Disconnect(250) diff --git a/pkg/mqtt/topic.go b/pkg/mqtt/topic.go index 8e7db7f..091b344 100644 --- a/pkg/mqtt/topic.go +++ b/pkg/mqtt/topic.go @@ -16,10 +16,12 @@ type Message struct { // Topic represents a MQTT topic. type Topic struct { - Path string `json:"topic"` - Interval time.Duration - Messages []Message - framer *framer + Path string `json:"topic"` + Payload map[string]any `json:"payload,omitempty"` + ResponsePath string `json:"response,omitempty"` + Interval time.Duration + Messages []Message + framer *framer } // Key returns the key for the topic. diff --git a/pkg/plugin/datasource_test.go b/pkg/plugin/datasource_test.go index 96f8b65..93587e8 100644 --- a/pkg/plugin/datasource_test.go +++ b/pkg/plugin/datasource_test.go @@ -2,6 +2,7 @@ package plugin_test import ( "context" + "encoding/json" "testing" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -60,5 +61,8 @@ func (c *fakeMQTTClient) IsSubscribed(_ string) bool { } func (c *fakeMQTTClient) Subscribe(_ string) *mqtt.Topic { return nil } -func (c *fakeMQTTClient) Unsubscribe(_ string) {} -func (c *fakeMQTTClient) Dispose() {} +func (c *fakeMQTTClient) Publish(string, map[string]any, string) (json.RawMessage, error) { + return json.RawMessage{}, nil +} +func (c *fakeMQTTClient) Unsubscribe(_ string) {} +func (c *fakeMQTTClient) Dispose() {} diff --git a/pkg/plugin/query.go b/pkg/plugin/query.go index af8afc7..2e8b439 100644 --- a/pkg/plugin/query.go +++ b/pkg/plugin/query.go @@ -38,13 +38,24 @@ func (ds *MQTTDatasource) query(query backend.DataQuery) backend.DataResponse { return response } - t.Interval = query.Interval + // Subscribe + if len(t.Payload) == 0 { + t.Interval = query.Interval - frame := data.NewFrame("") - frame.SetMeta(&data.FrameMeta{ - Channel: path.Join(ds.channelPrefix, t.Key()), - }) + frame := data.NewFrame("") + frame.SetMeta(&data.FrameMeta{ + Channel: path.Join(ds.channelPrefix, t.Key()), + }) - response.Frames = append(response.Frames, frame) + response.Frames = append(response.Frames, frame) + return response + } + + // Publish + resp, err := ds.Client.Publish(t.Path, t.Payload, t.ResponsePath) + + field := data.NewField("Body", data.Labels{}, []json.RawMessage{resp}) + response.Frames = append(response.Frames, data.NewFrame("Response", field)) + response.Error = err return response } From 03174e7b10fbf4959a1691e5f74008f88cf234e1 Mon Sep 17 00:00:00 2001 From: Niklas Ciecior Date: Mon, 12 Feb 2024 10:52:02 +0100 Subject: [PATCH 5/7] Make response optional --- pkg/mqtt/client.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go index d4a62f9..b6e3d4b 100644 --- a/pkg/mqtt/client.go +++ b/pkg/mqtt/client.go @@ -147,24 +147,24 @@ func (c *client) Unsubscribe(reqPath string) { func (c *client) Publish(topic string, payload map[string]any, responseTopic string) (json.RawMessage, error) { var response json.RawMessage var err error + done := make(chan struct{}, 1) - if responseTopic == "" { - return nil, errors.New("response topic should not be empty") - } + if responseTopic != "" { + tokenSub := c.client.Subscribe(responseTopic, 2, func(c paho.Client, m paho.Message) { + response = m.Payload() + done <- struct{}{} + }) - done := make(chan struct{}) - tokenSub := c.client.Subscribe(responseTopic, 2, func(c paho.Client, m paho.Message) { - response = m.Payload() - done <- struct{}{} - }) + if !tokenSub.WaitTimeout(time.Second) && tokenSub.Error() != nil { + err = errors.Join(err, tokenSub.Error()) + return response, err + } - if !tokenSub.WaitTimeout(time.Second) && tokenSub.Error() != nil { - err = errors.Join(err, tokenSub.Error()) - return response, err + defer c.client.Unsubscribe(responseTopic) + } else { + done <- struct{}{} } - defer c.client.Unsubscribe(responseTopic) - data, errMarshal := json.Marshal(&payload) if errMarshal != nil { err = errors.Join(err, errMarshal) From 733263f2c2f9aae5e52f6cf49d5b8b1bfa31c462 Mon Sep 17 00:00:00 2001 From: Niklas Ciecior Date: Wed, 10 Sep 2025 17:04:11 +0200 Subject: [PATCH 6/7] Make publishing opt in --- pkg/mqtt/client.go | 17 +++++++++-------- pkg/plugin/datasource.go | 14 ++++++++------ pkg/plugin/datasource_test.go | 4 ++-- pkg/plugin/query.go | 2 +- src/ConfigEditor.tsx | 11 +++++++++++ src/types.ts | 1 + 6 files changed, 32 insertions(+), 17 deletions(-) diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go index dc05ea4..04d46d4 100644 --- a/pkg/mqtt/client.go +++ b/pkg/mqtt/client.go @@ -25,14 +25,15 @@ type Client interface { } type Options struct { - URI string `json:"uri"` - Username string `json:"username"` - Password string `json:"password"` - ClientID string `json:"clientID"` - TLSCACert string `json:"tlsCACert"` - TLSClientCert string `json:"tlsClientCert"` - TLSClientKey string `json:"tlsClientKey"` - TLSSkipVerify bool `json:"tlsSkipVerify"` + URI string `json:"uri"` + Username string `json:"username"` + Password string `json:"password"` + ClientID string `json:"clientID"` + TLSCACert string `json:"tlsCACert"` + TLSClientCert string `json:"tlsClientCert"` + TLSClientKey string `json:"tlsClientKey"` + TLSSkipVerify bool `json:"tlsSkipVerify"` + EnablePublishing bool `json:"enablePublishing"` } type client struct { diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index 83de2a4..e19b9d4 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -32,19 +32,21 @@ func NewMQTTInstance(_ context.Context, s backend.DataSourceInstanceSettings) (i return nil, err } - return NewMQTTDatasource(client, s.UID), nil + return NewMQTTDatasource(client, s.UID, settings.EnablePublishing), nil } type MQTTDatasource struct { - Client mqtt.Client - channelPrefix string + Client mqtt.Client + channelPrefix string + enablePublishing bool } // NewMQTTDatasource creates a new datasource instance. -func NewMQTTDatasource(client mqtt.Client, uid string) *MQTTDatasource { +func NewMQTTDatasource(client mqtt.Client, uid string, enablePublishing bool) *MQTTDatasource { return &MQTTDatasource{ - Client: client, - channelPrefix: path.Join("ds", uid), + Client: client, + channelPrefix: path.Join("ds", uid), + enablePublishing: enablePublishing, } } diff --git a/pkg/plugin/datasource_test.go b/pkg/plugin/datasource_test.go index 93587e8..7da5db9 100644 --- a/pkg/plugin/datasource_test.go +++ b/pkg/plugin/datasource_test.go @@ -16,7 +16,7 @@ func TestCheckHealthHandler(t *testing.T) { ds := plugin.NewMQTTDatasource(&fakeMQTTClient{ connected: true, subscribed: false, - }, "xyz") + }, "xyz", false) res, _ := ds.CheckHealth( context.Background(), @@ -31,7 +31,7 @@ func TestCheckHealthHandler(t *testing.T) { ds := plugin.NewMQTTDatasource(&fakeMQTTClient{ connected: false, subscribed: false, - }, "xyz") + }, "xyz", false) res, _ := ds.CheckHealth( context.Background(), diff --git a/pkg/plugin/query.go b/pkg/plugin/query.go index 0181501..750d2c4 100644 --- a/pkg/plugin/query.go +++ b/pkg/plugin/query.go @@ -40,7 +40,7 @@ func (ds *MQTTDatasource) query(query backend.DataQuery) backend.DataResponse { } // Subscribe - if len(t.Payload) == 0 { + if len(t.Payload) == 0 || !ds.enablePublishing { t.Interval = query.Interval frame := data.NewFrame("") diff --git a/src/ConfigEditor.tsx b/src/ConfigEditor.tsx index 9984aab..b0d63f2 100644 --- a/src/ConfigEditor.tsx +++ b/src/ConfigEditor.tsx @@ -119,6 +119,17 @@ export const ConfigEditor = (props: DataSourcePluginOptionsEditorProps ) : null} + + + + + + + + ); }; diff --git a/src/types.ts b/src/types.ts index fc3113d..816384b 100644 --- a/src/types.ts +++ b/src/types.ts @@ -14,6 +14,7 @@ export interface MqttDataSourceOptions extends DataSourceJsonData { tlsAuth: boolean; tlsAuthWithCACert: boolean; tlsSkipVerify: boolean; + enablePublishing: boolean; } export interface MqttSecureJsonData { From 18f554fb6f6ab4d9734ec38c51ebe80cc4fedb78 Mon Sep 17 00:00:00 2001 From: Niklas Ciecior Date: Thu, 11 Sep 2025 17:48:34 +0200 Subject: [PATCH 7/7] remove not used parameter --- pkg/plugin/query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/plugin/query.go b/pkg/plugin/query.go index e76d337..46ceba4 100644 --- a/pkg/plugin/query.go +++ b/pkg/plugin/query.go @@ -11,7 +11,7 @@ import ( "github.com/grafana/mqtt-datasource/pkg/mqtt" ) -func (ds *MQTTDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { +func (ds *MQTTDatasource) QueryData(_ context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { response := backend.NewQueryDataResponse() for _, q := range req.Queries {