diff --git a/receivers/appMetricReceiver.go b/receivers/appMetricReceiver.go new file mode 100644 index 0000000..deeef24 --- /dev/null +++ b/receivers/appMetricReceiver.go @@ -0,0 +1,164 @@ +package receivers + +import ( + "bufio" + "encoding/json" + "fmt" + "net" + "os" + "sync" + + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + influx "github.com/influxdata/line-protocol" +) + +// SampleReceiver configuration: receiver type, listen address, port +type AppMetricReceiverConfig struct { + Type string `json:"type"` + SocketFile string `json:"socket_file"` +} + +type AppMetricReceiver struct { + receiver + config AppMetricReceiverConfig + + // Storage for static information + meta map[string]string + // Use in case of own go routine + done chan bool + wg sync.WaitGroup + // Influx stuff + handler *influx.MetricHandler + parser *influx.Parser + // WaitGroup for individual connections + connWg sync.WaitGroup + listener net.Listener +} + +func (r *AppMetricReceiver) newConnection(conn net.Conn) { + //defer conn.Close() + //defer wg.Done() + + buffer, err := bufio.NewReader(conn).ReadBytes('\n') + if err != nil { + conn.Close() + return + } + + metrics, err := r.parser.Parse(buffer) + if err != nil { + cclog.ComponentError(r.name, "failed to parse received metrics") + return + } + for _, m := range metrics { + y := lp.FromInfluxMetric(m) + for k, v := range r.meta { + y.AddMeta(k, v) + } + if r.sink != nil { + r.sink <- y + } + } + + r.newConnection(conn) + +} + +func (r *AppMetricReceiver) newAccepter(listenSocket net.Listener) { +accept_loop: + for { + select { + case <-r.done: + break accept_loop + default: + conn, err := listenSocket.Accept() + if err == nil { + r.connWg.Add(1) + go func() { + r.newConnection(conn) + r.connWg.Done() + }() + } + } + } + r.wg.Done() +} + +// Implement functions required for Receiver interface +// Start(), Close() +// See: metricReceiver.go + +func (r *AppMetricReceiver) Start() { + var err error = nil + cclog.ComponentDebug(r.name, "START") + + r.listener, err = net.Listen("unix", r.config.SocketFile) + if err != nil { + cclog.ComponentError(r.name, "failed to listen at socket", r.config.SocketFile) + } + if _, err := os.Stat(r.config.SocketFile); err != nil { + cclog.ComponentError(r.name, "failed to create socket", r.config.SocketFile) + } + + r.done = make(chan bool) + r.wg.Add(1) + go r.newAccepter(r.listener) + +} + +// Close receiver: close network connection, close files, close libraries, ... +func (r *AppMetricReceiver) Close() { + cclog.ComponentDebug(r.name, "CLOSE") + + if _, err := os.Stat(r.config.SocketFile); err == nil { + if err := os.RemoveAll(r.config.SocketFile); err != nil { + cclog.ComponentError(r.name, "Failed to remove UNIX socket", r.config.SocketFile) + } + } + + // in case of own go routine, send the signal and wait + r.listener.Close() + r.done <- true + close(r.done) + r.connWg.Wait() + r.wg.Wait() +} + +// New function to create a new instance of the receiver +// Initialize the receiver by giving it a name and reading in the config JSON +func NewAppMetricReceiver(name string, config json.RawMessage) (Receiver, error) { + r := new(AppMetricReceiver) + + // Set name of SampleReceiver + // The name should be chosen in such a way that different instances of SampleReceiver can be distinguished + r.name = fmt.Sprintf("AppMetricReceiver(%s)", name) + + // Set static information + r.meta = map[string]string{"source": r.name} + + // Set defaults in r.config + // Allow overwriting these defaults by reading config JSON + r.config.SocketFile = "/tmp/cc.sock" + + // Read the sample receiver specific JSON config + if len(config) > 0 { + err := json.Unmarshal(config, &r.config) + if err != nil { + cclog.ComponentError(r.name, "Error reading config:", err.Error()) + return nil, err + } + } + if len(r.config.SocketFile) == 0 { + cclog.ComponentError(r.name, "Invalid socket_file setting:", r.config.SocketFile) + return nil, fmt.Errorf("invalid socket_file setting: %s", r.config.SocketFile) + } + + // Check that all required fields in the configuration are set + // Use 'if len(r.config.Option) > 0' for strings + r.handler = influx.NewMetricHandler() + r.parser = influx.NewParser(r.handler) + r.parser.SetTimeFunc(DefaultTime) + + return r, nil +} diff --git a/receivers/appMetricReceiver.md b/receivers/appMetricReceiver.md new file mode 100644 index 0000000..88940af --- /dev/null +++ b/receivers/appMetricReceiver.md @@ -0,0 +1,23 @@ +## `appmetrics` receiver + +The `appmetrics` receiver can be used to submit metrics from an application into the monitoring system. It listens for incoming connections on a UNIX socket. + +### Configuration structure + +```json +{ + "": { + "type": "appmetrics", + "socket_file" : "/tmp/cc.sock", + } +} +``` + +- `type`: makes the receiver a `appmetrics` receiver +- `socket_file`: Listen UNIX socket + +### Inputs from applications + +Applcations can connect to the `appmetrics` socket and provide metric in the [InfluxDB line protocol](https://github.com/influxdata/line-protocol). It is currently not possible to submit meta information as the Influx line protocol does not know them. + + diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index 31853fe..71fc18e 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -11,9 +11,10 @@ import ( ) var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){ - "ipmi": NewIPMIReceiver, - "nats": NewNatsReceiver, - "redfish": NewRedfishReceiver, + "ipmi": NewIPMIReceiver, + "nats": NewNatsReceiver, + "redfish": NewRedfishReceiver, + "appmetrics": NewAppMetricReceiver, } type receiveManager struct {