Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.quix): Add plugin #16144

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/outputs/all/quix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || outputs || outputs.quix

package all

import _ "github.com/influxdata/telegraf/plugins/outputs/quix" // register plugin
60 changes: 60 additions & 0 deletions plugins/outputs/quix/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Quix Output Plugin

This plugin writes metrics to a [Quix](https://quix.io/) endpoint.

Please consult Quix's [official documentation](https://quix.io/docs/) for more details on the Quix platform architecture and concepts.

⭐ Telegraf v1.32.2 🏷️ cloud, messaging 💻 all

## Quix Authentication

This plugin uses an SDK token for authentication with Quix. You can generate one in the settings under the `API and tokens` section by clicking on `SDK Token`.

## Global configuration options <!-- @/docs/includes/plugin_config.md -->

In addition to the plugin-specific configuration settings, plugins support additional global and plugin configuration settings. These settings are used to modify metrics, tags, and field or create aliases and configure ordering, etc. See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Configuration

```toml @sample.conf
[[outputs.quix]]
workspace = "your_workspace"
auth_token = "your_auth_token"
topic = "telegraf_metrics"
api_url = "https://portal-api.platform.quix.io"
data_format = "json"
timestamp_units = "1s"
```

For this output plugin to function correctly the following variables must be
configured.

* workspace
* auth_token
* topic

### workspace

The workspace is the environment of your Quix project and is the `Workspace ID` or the `Environment ID` used to target your environment. It can be found in the settings under the `General settings` section.

### auth_token

The auth_token is the `SDK Token` used to authenticate against your Quix environment and is limited to that environment. It can be found in the settings under the `API and tokens` section.

### topic

The plugin will send data to this named topic.

### api_url

The Quix platform API URL. Defaults to `https://portal-api.platform.quix.io`.

### data_format

The data format for serializing the messages. Defaults to `json`.

### timestamp_units

The timestamp units for precision. Defaults to `1s` for one second.
66 changes: 66 additions & 0 deletions plugins/outputs/quix/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// config.go
package quix

import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)

// BrokerConfig holds the broker configuration fields from the Quix API response
type BrokerConfig struct {
BootstrapServers string `json:"bootstrap.servers"`
SaslMechanism string `json:"sasl.mechanism"`
SaslUsername string `json:"sasl.username"`
SaslPassword string `json:"sasl.password"`
SecurityProtocol string `json:"security.protocol"`
SSLCertBase64 string `json:"ssl.ca.cert"`
}

// fetchBrokerConfig retrieves broker configuration from the Quix API
func (q *Quix) fetchBrokerConfig() (*BrokerConfig, []byte, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("%s/workspaces/%s/broker/librdkafka", q.APIURL, q.Workspace), nil)
if err != nil {
return nil, nil, err
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", q.AuthToken))
req.Header.Set("Accept", "application/json")

client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(body))
}

data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}

var config BrokerConfig
if err := json.Unmarshal(data, &config); err != nil {
return nil, nil, err
}

decodedCert, err := base64.StdEncoding.DecodeString(config.SSLCertBase64)
if err != nil {
return nil, nil, err
}

q.Log.Infof("Fetched broker configuration from Quix API.")
return &config, decodedCert, nil
}

// parseTimestampUnits parses the timestamp units for metrics serialization
func parseTimestampUnits(units string) (time.Duration, error) {
return time.ParseDuration(units)
}
130 changes: 130 additions & 0 deletions plugins/outputs/quix/quix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// quix.go
package quix

import (
"crypto/sha256"
"strings"

"github.com/IBM/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

// Quix is the main struct for the Quix plugin
type Quix struct {
Brokers []string `toml:"brokers"`
Topic string `toml:"topic"`
Workspace string `toml:"workspace"`
AuthToken string `toml:"auth_token"`
APIURL string `toml:"api_url"`
TimestampUnits string `toml:"timestamp_units"`

producer sarama.SyncProducer
Log telegraf.Logger
serializer serializers.Serializer
}

// SampleConfig returns a sample configuration for the Quix plugin
func (q *Quix) SampleConfig() string {
return `
## Quix output plugin configuration
workspace = "your_workspace"
auth_token = "your_auth_token"
api_url = "https://portal-api.platform.quix.io"
topic = "telegraf_metrics"
data_format = "json"
timestamp_units = "1s"
`
}

// Init initializes the Quix plugin and sets up the serializer
func (q *Quix) Init() error {
duration, err := parseTimestampUnits(q.TimestampUnits)
if err != nil {
return err
}

q.serializer, err = serializers.NewSerializer(&serializers.Config{
DataFormat: "json",
TimestampUnits: duration,
})
if err != nil {
return err
}

q.Log.Infof("Initializing Quix plugin.")
return nil
}

// Connect establishes the connection to Kafka
func (q *Quix) Connect() error {
quixConfig, cert, err := q.fetchBrokerConfig()
if err != nil {
return err
}

config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Net.SASL.Enable = true
config.Net.SASL.User = quixConfig.SaslUsername
config.Net.SASL.Password = quixConfig.SaslPassword
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: sha256.New}
}

tlsConfig, err := q.createTLSConfig(cert)
if err != nil {
return err
}
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig

producer, err := sarama.NewSyncProducer(strings.Split(quixConfig.BootstrapServers, ","), config)
if err != nil {
return err
}
q.producer = producer
q.Log.Infof("Connected to Quix.")
return nil
}

// Write sends serialized metrics to Quix
func (q *Quix) Write(metrics []telegraf.Metric) error {
q.Log.Debugf("Sending metrics to Quix.")
for _, metric := range metrics {
serialized, err := q.serializer.Serialize(metric)
if err != nil {
q.Log.Errorf("Error serializing metric: %v", err)
continue
}

msg := &sarama.ProducerMessage{
Topic: q.Workspace + "-" + q.Topic,
Value: sarama.ByteEncoder(serialized),
Timestamp: metric.Time(),
Key: sarama.StringEncoder("telegraf"),
}

if _, _, err = q.producer.SendMessage(msg); err != nil {
q.Log.Errorf("Error sending message to Kafka: %v", err)
}
}
q.Log.Debugf("Metrics sent to Quix.")
return nil
}

// Close shuts down the Kafka producer
func (q *Quix) Close() error {
if q.producer != nil {
q.Log.Infof("Closing Quix producer connection.")
return q.producer.Close()
}
return nil
}

// Initialize Quix plugin in Telegraf
func init() {
outputs.Add("quix", func() telegraf.Output { return &Quix{} })
}
7 changes: 7 additions & 0 deletions plugins/outputs/quix/sample.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[[outputs.quix]]
workspace = "your_workspace"
auth_token = "your_auth_token"
topic = "telegraf_metrics"
api_url = "https://portal-api.platform.quix.io"
data_format = "json"
timestamp_units = "1s"
38 changes: 38 additions & 0 deletions plugins/outputs/quix/scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// scram_client.go
package quix

import (
"github.com/xdg-go/scram"
)

// XDGSCRAMClient wraps the SCRAM client for SCRAM-SHA-256 authentication
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
HashGeneratorFcn scram.HashGeneratorFcn
}

// Begin initializes the SCRAM client with username and password
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) error {
client, err := x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.Client = client
x.ClientConversation = client.NewConversation()
return nil
}

// Step processes the server's challenge and returns the client's response
func (x *XDGSCRAMClient) Step(challenge string) (string, error) {
return x.ClientConversation.Step(challenge)
}

// Done returns true if the SCRAM conversation is complete
func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

// Define SHA256 and SHA512 hash generators
var SHA256 scram.HashGeneratorFcn = scram.SHA256
var SHA512 scram.HashGeneratorFcn = scram.SHA512
22 changes: 22 additions & 0 deletions plugins/outputs/quix/tls_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// tls_config.go
package quix

import (
"crypto/tls"
"crypto/x509"
"fmt"
)

// createTLSConfig sets up TLS configuration using the CA certificate
func (q *Quix) createTLSConfig(caCert []byte) (*tls.Config, error) {
certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("failed to append CA certificate")
}

q.Log.Debugf("TLS configuration created with CA certificate.")
return &tls.Config{
RootCAs: certPool,
InsecureSkipVerify: false,
}, nil
}
Loading