Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ/AMQP sink #117

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
352 changes: 352 additions & 0 deletions sinks/amqpSink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,352 @@
package sinks

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net"
"sync"
"time"

cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
amqp "github.com/rabbitmq/amqp091-go"
"golang.org/x/exp/slices"
)

type AmqpSinkConfig struct {
// defines JSON tags for 'type' and 'meta_as_tags' (string list)
// See: metricSink.go
defaultSinkConfig
// Additional config options, for AmqpSink
QueueName string `json:"queue_name"`
// Maximum number of points sent to server in single request.
// Default: 1000
BatchSize int `json:"batch_size,omitempty"`

// Time interval for delayed sending of metrics.
// If the buffers are already filled before the end of this interval,
// the metrics are sent without further delay.
// Default: 1s
FlushInterval string `json:"flush_delay,omitempty"`
flushDelay time.Duration

Hostname string `json:"hostname"`
Port int `json:"port"`
PublishTimeout string `json:"publish_timeout,omitempty"`
publishTimeout time.Duration
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}

type AmqpSink struct {
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
sink
config AmqpSinkConfig // entry point to the AmqpSinkConfig
// influx line protocol encoder
encoder influx.Encoder
// number of records stored in the encoder
numRecordsInEncoder int
// List of tags and meta data tags which should be used as tags
extended_tag_list []key_value_pair
// Flush() runs in another goroutine and accesses the influx line protocol encoder,
// so this encoderLock has to protect the encoder and numRecordsInEncoder
encoderLock sync.Mutex

// timer to run Flush()
flushTimer *time.Timer
// Lock to assure that only one timer is running at a time
timerLock sync.Mutex

// WaitGroup to ensure only one send operation is running at a time
sendWaitGroup sync.WaitGroup

client *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
}

// Implement functions required for Sink interface
// Write(...), Flush(), Close()
// See: metricSink.go

// Code to submit a single CCMetric to the sink
func (s *AmqpSink) Write(m lp.CCMetric) error {

// Lock for encoder usage
s.encoderLock.Lock()

// Encode measurement name
s.encoder.StartLine(m.Name())

// copy tags and meta data which should be used as tags
s.extended_tag_list = s.extended_tag_list[:0]
for key, value := range m.Tags() {
s.extended_tag_list =
append(
s.extended_tag_list,
key_value_pair{
key: key,
value: value,
},
)
}
for _, key := range s.config.MetaAsTags {
if value, ok := m.GetMeta(key); ok {
s.extended_tag_list =
append(
s.extended_tag_list,
key_value_pair{
key: key,
value: value,
},
)
}
}

// Encode tags (they musts be in lexical order)
slices.SortFunc(
s.extended_tag_list,
func(a key_value_pair, b key_value_pair) int {
if a.key < b.key {
return -1
}
if a.key > b.key {
return +1
}
return 0
},
)
for i := range s.extended_tag_list {
s.encoder.AddTag(
s.extended_tag_list[i].key,
s.extended_tag_list[i].value,
)
}

// Encode fields
for key, value := range m.Fields() {
s.encoder.AddField(key, influx.MustNewValue(value))
}

// Encode time stamp
s.encoder.EndLine(m.Time())

// Check for encoder errors
if err := s.encoder.Err(); err != nil {
// Unlock encoder usage
s.encoderLock.Unlock()

return fmt.Errorf("encoding failed: %v", err)
}
s.numRecordsInEncoder++

if s.config.flushDelay == 0 {
// Unlock encoder usage
s.encoderLock.Unlock()

// Directly flush if no flush delay is configured
return s.Flush()
} else if s.numRecordsInEncoder == s.config.BatchSize {
// Unlock encoder usage
s.encoderLock.Unlock()

// Stop flush timer
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay")
s.timerLock.Unlock()
}
}

// Flush if batch size is reached
return s.Flush()
} else if s.timerLock.TryLock() {

// Setup flush timer when flush delay is configured
// and no other timer is already running
if s.flushTimer != nil {

// Restarting existing flush timer
cclog.ComponentDebug(s.name, "Write(): Restarting flush timer")
s.flushTimer.Reset(s.config.flushDelay)
} else {

// Creating and starting flush timer
cclog.ComponentDebug(s.name, "Write(): Starting new flush timer")
s.flushTimer = time.AfterFunc(
s.config.flushDelay,
func() {
defer s.timerLock.Unlock()
cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer")
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err)
}
})
}
}

// Unlock encoder usage
s.encoderLock.Unlock()
return nil
}

// If the sink uses batched sends internally, you can tell to flush its buffers
func (s *AmqpSink) Flush() error {

// Lock for encoder usage
// Own lock for as short as possible: the time it takes to clone the buffer.
s.encoderLock.Lock()

buf := slices.Clone(s.encoder.Bytes())
numRecordsInBuf := s.numRecordsInEncoder
s.encoder.Reset()
s.numRecordsInEncoder = 0

// Unlock encoder usage
s.encoderLock.Unlock()

if len(buf) == 0 {
return nil
}

cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics")

// Asynchron send of encoder metrics
s.sendWaitGroup.Add(1)
go func() {
defer s.sendWaitGroup.Done()
//startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), s.config.flushDelay)
defer cancel()
err := s.channel.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: buf,
})
if err != nil {
cclog.ComponentError(s.name, err.Error())
}
}()
return nil
}

// Close sink: close network connection, close files, close libraries, ...
func (s *AmqpSink) Close() {

cclog.ComponentDebug(s.name, "CLOSE")

// Stop existing timer and immediately flush
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
s.timerLock.Unlock()
}
}

// Flush
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Close():", "Flush failed:", err)
}

// Wait for send operations to finish
s.sendWaitGroup.Wait()

s.client.Close()
s.client = nil
}

// New function to create a new instance of the sink
// Initialize the sink by giving it a name and reading in the config JSON
func NewAmqpSink(name string, config json.RawMessage) (Sink, error) {
s := new(AmqpSink)

// Set name of sampleSink
// The name should be chosen in such a way that different instances of AmqpSink can be distinguished
s.name = fmt.Sprintf("AmqpSink(%s)", name) // Always specify a name here

// Set defaults in s.config
// Allow overwriting these defaults by reading config JSON

s.config.PublishTimeout = "4s"
s.config.publishTimeout = time.Duration(4) * time.Second
s.config.Hostname = "localhost"
s.config.Port = 1883

// Read in the config JSON
if len(config) > 0 {
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&s.config); err != nil {
cclog.ComponentError(s.name, "Error reading config:", err.Error())
return nil, err
}
}

// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}

// Check if all required fields in the config are set
// E.g. use 'len(s.config.Option) > 0' for string settings
if t, err := time.ParseDuration(s.config.PublishTimeout); err == nil {
s.config.publishTimeout = t
} else {
err := fmt.Errorf("to parse duration for PublishTimeout: %s", s.config.PublishTimeout)
cclog.ComponentError(s.name, err.Error())
return nil, err
}
if t, err := time.ParseDuration(s.config.FlushInterval); err == nil {
s.config.flushDelay = t
} else {
err := fmt.Errorf("to parse duration for FlushInterval: %s", s.config.FlushInterval)
cclog.ComponentError(s.name, err.Error())
return nil, err
}

url := net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port))
userpart := ""
if len(s.config.Username) > 0 {
userpart = s.config.Username
if len(s.config.Password) > 0 {
userpart += ":" + s.config.Password
}
userpart += "@"
}
url = fmt.Sprintf("amqp://%s%s", userpart, url)

// Establish connection to the server, library, ...
// Check required files exist and lookup path(s) of executable(s)
c, err := amqp.Dial(url)
if err != nil {
return nil, err
}
s.client = c

ch, err := c.Channel()
if err != nil {
s.client.Close()
return nil, err
}
s.channel = ch

q, err := ch.QueueDeclare(
s.config.QueueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
s.channel.Close()
s.client.Close()
return nil, err
}
s.queue = q

// Return (nil, meaningful error message) in case of errors
return s, nil
}
33 changes: 33 additions & 0 deletions sinks/amqpSink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
## `amqp` sink

The `amqp` sink publishes all metrics into a RabbitMQ network. The publishing key is the queue name in the configuration file

### Configuration structure

```json
{
"<name>": {
"type": "amqp",
"queue_name" : "myqueue",
"batch_size" : 1000,
"flush_delay": "4s",
"publish_timeout": "1s",
"host": "dbhost.example.com",
"port": 5672,
"username": "exampleuser",
"password" : "examplepw",
"meta_as_tags" : [],
}
}
```

- `type`: makes the sink an `amqp` sink, also `rabbitmq` is allowed as alias
- `queue_name`: All metrics are published to this queue
- `host`: Hostname of the RabbitMQ server
- `port`: Port number of the RabbitMQ server
- `username`: Username for basic authentication
- `password`: Password for basic authentication
- `meta_as_tags`: print all meta information as tags in the output (optional)
- `publish_timeout`: Timeout for each publication operation (default `1s`)
- `flush_delay`: Group metrics coming in to a single batch (default `4s`)
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay (default: `1000`)
2 changes: 2 additions & 0 deletions sinks/sinkManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink,
"influxdb": NewInfluxSink,
"influxasync": NewInfluxAsyncSink,
"http": NewHttpSink,
"amqp": NewAmqpSink,
"rabbitmq": NewAmqpSink,
}

// Metric collector manager data structure
Expand Down
Loading