From 49df53f1456c345b773a957da8e4807a0842b4db Mon Sep 17 00:00:00 2001 From: Tomas Neubauer Date: Tue, 5 Nov 2024 12:30:07 +0100 Subject: [PATCH 1/2] Ouix output plugin First implementation - no defaults --- plugins/outputs/all/quix.go | 5 ++ plugins/outputs/quix/config.go | 66 ++++++++++++++ plugins/outputs/quix/quix.go | 130 +++++++++++++++++++++++++++ plugins/outputs/quix/scram_client.go | 38 ++++++++ plugins/outputs/quix/tls_config.go | 22 +++++ 5 files changed, 261 insertions(+) create mode 100644 plugins/outputs/all/quix.go create mode 100644 plugins/outputs/quix/config.go create mode 100644 plugins/outputs/quix/quix.go create mode 100644 plugins/outputs/quix/scram_client.go create mode 100644 plugins/outputs/quix/tls_config.go diff --git a/plugins/outputs/all/quix.go b/plugins/outputs/all/quix.go new file mode 100644 index 0000000000000..97f559634a497 --- /dev/null +++ b/plugins/outputs/all/quix.go @@ -0,0 +1,5 @@ +//go:build !custom || outputs || outputs.quix + +package all + +import _ "github.com/influxdata/telegraf/plugins/outputs/quix" // register plugin diff --git a/plugins/outputs/quix/config.go b/plugins/outputs/quix/config.go new file mode 100644 index 0000000000000..a800f154bbbe4 --- /dev/null +++ b/plugins/outputs/quix/config.go @@ -0,0 +1,66 @@ +// config.go +package quix + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +// BrokerConfig holds the broker configuration fields from the Quix API response +type BrokerConfig struct { + BootstrapServers string `json:"bootstrap.servers"` + SaslMechanism string `json:"sasl.mechanism"` + SaslUsername string `json:"sasl.username"` + SaslPassword string `json:"sasl.password"` + SecurityProtocol string `json:"security.protocol"` + SSLCertBase64 string `json:"ssl.ca.cert"` +} + +// fetchBrokerConfig retrieves broker configuration from the Quix API +func (q *Quix) fetchBrokerConfig() (*BrokerConfig, []byte, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("%s/workspaces/%s/broker/librdkafka", q.APIURL, q.Workspace), nil) + if err != nil { + return nil, nil, err + } + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", q.AuthToken)) + req.Header.Set("Accept", "application/json") + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(body)) + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, nil, err + } + + var config BrokerConfig + if err := json.Unmarshal(data, &config); err != nil { + return nil, nil, err + } + + decodedCert, err := base64.StdEncoding.DecodeString(config.SSLCertBase64) + if err != nil { + return nil, nil, err + } + + q.Log.Infof("Fetched broker configuration from Quix API.") + return &config, decodedCert, nil +} + +// parseTimestampUnits parses the timestamp units for metrics serialization +func parseTimestampUnits(units string) (time.Duration, error) { + return time.ParseDuration(units) +} diff --git a/plugins/outputs/quix/quix.go b/plugins/outputs/quix/quix.go new file mode 100644 index 0000000000000..88c71a1a0b10a --- /dev/null +++ b/plugins/outputs/quix/quix.go @@ -0,0 +1,130 @@ +// quix.go +package quix + +import ( + "crypto/sha256" + "strings" + + "github.com/IBM/sarama" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" +) + +// Quix is the main struct for the Quix plugin +type Quix struct { + Brokers []string `toml:"brokers"` + Topic string `toml:"topic"` + Workspace string `toml:"workspace"` + AuthToken string `toml:"auth_token"` + APIURL string `toml:"api_url"` + TimestampUnits string `toml:"timestamp_units"` + + producer sarama.SyncProducer + Log telegraf.Logger + serializer serializers.Serializer +} + +// SampleConfig returns a sample configuration for the Quix plugin +func (q *Quix) SampleConfig() string { + return ` + ## Quix output plugin configuration + workspace = "your_workspace" + auth_token = "your_auth_token" + api_url = "https://portal-api.platform.quix.io" + topic = "telegraf_metrics" + data_format = "json" + timestamp_units = "1s" +` +} + +// Init initializes the Quix plugin and sets up the serializer +func (q *Quix) Init() error { + duration, err := parseTimestampUnits(q.TimestampUnits) + if err != nil { + return err + } + + q.serializer, err = serializers.NewSerializer(&serializers.Config{ + DataFormat: "json", + TimestampUnits: duration, + }) + if err != nil { + return err + } + + q.Log.Infof("Initializing Quix plugin.") + return nil +} + +// Connect establishes the connection to Kafka +func (q *Quix) Connect() error { + quixConfig, cert, err := q.fetchBrokerConfig() + if err != nil { + return err + } + + config := sarama.NewConfig() + config.Producer.Return.Successes = true + config.Net.SASL.Enable = true + config.Net.SASL.User = quixConfig.SaslUsername + config.Net.SASL.Password = quixConfig.SaslPassword + config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: sha256.New} + } + + tlsConfig, err := q.createTLSConfig(cert) + if err != nil { + return err + } + config.Net.TLS.Enable = true + config.Net.TLS.Config = tlsConfig + + producer, err := sarama.NewSyncProducer(strings.Split(quixConfig.BootstrapServers, ","), config) + if err != nil { + return err + } + q.producer = producer + q.Log.Infof("Connected to Quix.") + return nil +} + +// Write sends serialized metrics to Quix +func (q *Quix) Write(metrics []telegraf.Metric) error { + q.Log.Debugf("Sending metrics to Quix.") + for _, metric := range metrics { + serialized, err := q.serializer.Serialize(metric) + if err != nil { + q.Log.Errorf("Error serializing metric: %v", err) + continue + } + + msg := &sarama.ProducerMessage{ + Topic: q.Workspace + "-" + q.Topic, + Value: sarama.ByteEncoder(serialized), + Timestamp: metric.Time(), + Key: sarama.StringEncoder("telegraf"), + } + + if _, _, err = q.producer.SendMessage(msg); err != nil { + q.Log.Errorf("Error sending message to Kafka: %v", err) + } + } + q.Log.Debugf("Metrics sent to Quix.") + return nil +} + +// Close shuts down the Kafka producer +func (q *Quix) Close() error { + if q.producer != nil { + q.Log.Infof("Closing Quix producer connection.") + return q.producer.Close() + } + return nil +} + +// Initialize Quix plugin in Telegraf +func init() { + outputs.Add("quix", func() telegraf.Output { return &Quix{} }) +} diff --git a/plugins/outputs/quix/scram_client.go b/plugins/outputs/quix/scram_client.go new file mode 100644 index 0000000000000..ddd7b77381fa2 --- /dev/null +++ b/plugins/outputs/quix/scram_client.go @@ -0,0 +1,38 @@ +// scram_client.go +package quix + +import ( + "github.com/xdg-go/scram" +) + +// XDGSCRAMClient wraps the SCRAM client for SCRAM-SHA-256 authentication +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + HashGeneratorFcn scram.HashGeneratorFcn +} + +// Begin initializes the SCRAM client with username and password +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) error { + client, err := x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.Client = client + x.ClientConversation = client.NewConversation() + return nil +} + +// Step processes the server's challenge and returns the client's response +func (x *XDGSCRAMClient) Step(challenge string) (string, error) { + return x.ClientConversation.Step(challenge) +} + +// Done returns true if the SCRAM conversation is complete +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} + +// Define SHA256 and SHA512 hash generators +var SHA256 scram.HashGeneratorFcn = scram.SHA256 +var SHA512 scram.HashGeneratorFcn = scram.SHA512 diff --git a/plugins/outputs/quix/tls_config.go b/plugins/outputs/quix/tls_config.go new file mode 100644 index 0000000000000..924a0fac7ece5 --- /dev/null +++ b/plugins/outputs/quix/tls_config.go @@ -0,0 +1,22 @@ +// tls_config.go +package quix + +import ( + "crypto/tls" + "crypto/x509" + "fmt" +) + +// createTLSConfig sets up TLS configuration using the CA certificate +func (q *Quix) createTLSConfig(caCert []byte) (*tls.Config, error) { + certPool := x509.NewCertPool() + if ok := certPool.AppendCertsFromPEM(caCert); !ok { + return nil, fmt.Errorf("failed to append CA certificate") + } + + q.Log.Debugf("TLS configuration created with CA certificate.") + return &tls.Config{ + RootCAs: certPool, + InsecureSkipVerify: false, + }, nil +} From b7dcc9f952a50043ba707c735e22f8c183fd97c3 Mon Sep 17 00:00:00 2001 From: stereosky Date: Fri, 15 Nov 2024 17:28:41 +0000 Subject: [PATCH 2/2] docs(outputs.quix): Add README --- plugins/outputs/quix/README.md | 60 ++++++++++++++++++++++++++++++++ plugins/outputs/quix/sample.conf | 7 ++++ 2 files changed, 67 insertions(+) create mode 100644 plugins/outputs/quix/README.md create mode 100644 plugins/outputs/quix/sample.conf diff --git a/plugins/outputs/quix/README.md b/plugins/outputs/quix/README.md new file mode 100644 index 0000000000000..cdb7ce03e58a1 --- /dev/null +++ b/plugins/outputs/quix/README.md @@ -0,0 +1,60 @@ +# Quix Output Plugin + +This plugin writes metrics to a [Quix](https://quix.io/) endpoint. + +Please consult Quix's [official documentation](https://quix.io/docs/) for more details on the Quix platform architecture and concepts. + +⭐ Telegraf v1.32.2 🏷️ cloud, messaging 💻 all + +## Quix Authentication + +This plugin uses an SDK token for authentication with Quix. You can generate one in the settings under the `API and tokens` section by clicking on `SDK Token`. + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support additional global and plugin configuration settings. These settings are used to modify metrics, tags, and field or create aliases and configure ordering, etc. See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +[[outputs.quix]] + workspace = "your_workspace" + auth_token = "your_auth_token" + topic = "telegraf_metrics" + api_url = "https://portal-api.platform.quix.io" + data_format = "json" + timestamp_units = "1s" +``` + +For this output plugin to function correctly the following variables must be +configured. + +* workspace +* auth_token +* topic + +### workspace + +The workspace is the environment of your Quix project and is the `Workspace ID` or the `Environment ID` used to target your environment. It can be found in the settings under the `General settings` section. + +### auth_token + +The auth_token is the `SDK Token` used to authenticate against your Quix environment and is limited to that environment. It can be found in the settings under the `API and tokens` section. + +### topic + +The plugin will send data to this named topic. + +### api_url + +The Quix platform API URL. Defaults to `https://portal-api.platform.quix.io`. + +### data_format + +The data format for serializing the messages. Defaults to `json`. + +### timestamp_units + +The timestamp units for precision. Defaults to `1s` for one second. diff --git a/plugins/outputs/quix/sample.conf b/plugins/outputs/quix/sample.conf new file mode 100644 index 0000000000000..6c759c8879276 --- /dev/null +++ b/plugins/outputs/quix/sample.conf @@ -0,0 +1,7 @@ +[[outputs.quix]] + workspace = "your_workspace" + auth_token = "your_auth_token" + topic = "telegraf_metrics" + api_url = "https://portal-api.platform.quix.io" + data_format = "json" + timestamp_units = "1s"