Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Adding http metrics endpoint #641

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT))
GENERATE_DIRS := ./apricot ./coconut/cmd ./common ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping
SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./walnut
TEST_DIRS := ./apricot/local ./common/gera ./common/utils/safeacks ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment
GO_TEST_DIRS := ./core/repos ./core/integration/dcs
GO_TEST_DIRS := ./core/repos ./core/integration/dcs ./common/monitoring

coverage:COVERAGE_PREFIX := ./coverage_results
coverage:GOTEST_COVERAGE_FILE := $(COVERAGE_PREFIX)/gotest.out
Expand Down
14 changes: 14 additions & 0 deletions common/ecsmetrics/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package ecsmetrics

import (
"time"

"github.com/AliceO2Group/Control/common/monitoring"
)

func NewMetric(name string) monitoring.Metric {
timestamp := time.Now()
metric := monitoring.Metric{Name: name, Timestamp: timestamp.UnixMilli()}
metric.AddTag("subsystem", "ECS")
return metric
}
73 changes: 73 additions & 0 deletions common/ecsmetrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ecsmetrics

import (
internalmetrics "runtime/metrics"
"time"

"github.com/AliceO2Group/Control/common/logger"
"github.com/AliceO2Group/Control/common/monitoring"
"github.com/sirupsen/logrus"
)

var (
endRequestChannel chan struct{}
log = logger.New(logrus.StandardLogger(), "ecsmetrics")
)

func gather() monitoring.Metric {
samples := []internalmetrics.Sample{
{Name: "/gc/cycles/total:gc-cycles"},
{Name: "/memory/classes/other:bytes"},
{Name: "/memory/classes/total:bytes"},
{Name: "/sched/goroutines:goroutines"},
{Name: "/sync/mutex/wait/total:seconds"},
{Name: "/memory/classes/other:bytes"},
{Name: "/memory/classes/total:bytes"},
{Name: "/memory/classes/heap/free:bytes"},
{Name: "/memory/classes/heap/objects:bytes"},
{Name: "/memory/classes/heap/released:bytes"},
{Name: "/memory/classes/heap/stacks:bytes"},
{Name: "/memory/classes/heap/unused:bytes"},
}

// Collect metrics data
internalmetrics.Read(samples)

metric := NewMetric("golangruntimemetrics")

for _, sample := range samples {
switch sample.Value.Kind() {
case internalmetrics.KindUint64:
metric.AddValue(sample.Name, sample.Value.Uint64())
case internalmetrics.KindFloat64:
metric.AddValue(sample.Name, sample.Value.Float64())
case internalmetrics.KindFloat64Histogram:
log.Warning("Error: Histogram is not supported yet for metric [%s]", sample.Name)
continue
default:
log.Warning("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name)
continue
}
}
return metric
}

func StartGolangMetrics(period time.Duration) {
go func() {
for {
select {
case <-endRequestChannel:
endRequestChannel <- struct{}{}
return
default:
monitoring.Send(gather())
time.Sleep(period)
}
}
}()
}

func StopGolangMetrics() {
endRequestChannel <- struct{}{}
<-endRequestChannel
}
27 changes: 27 additions & 0 deletions common/monitoring/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package monitoring

type (
TagsType map[string]any
ValuesType map[string]any
)

type Metric struct {
Name string `json:"name"`
Values ValuesType `json:"values"`
Tags TagsType `json:"tags,omitempty"`
Timestamp int64 `json:"timestamp"`
}

func (metric *Metric) AddTag(tagName string, value any) {
if metric.Tags == nil {
metric.Tags = make(TagsType)
}
metric.Tags[tagName] = value
}

func (metric *Metric) AddValue(valueName string, value any) {
if metric.Values == nil {
metric.Values = make(ValuesType)
}
metric.Values[valueName] = value
}
125 changes: 125 additions & 0 deletions common/monitoring/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package monitoring

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/AliceO2Group/Control/common/logger"
"github.com/sirupsen/logrus"
)

var (
server *http.Server
metricsLimit int = 1000000
metrics []Metric
// channel that is used to request end of metrics server, it sends notification when server ended.
// It needs to be read!!!
endChannel chan struct{}

// channel used to send metrics into the event loop
metricsChannel chan Metric

// channel for sending notifications to event loop that new http Request to report metrics arrived
metricsRequestChannel chan struct{}

// channel used to send metrics to be reported by http request from event loop
metricsToRequest chan []Metric

Log = logger.New(logrus.StandardLogger(), "metrics")
)

func initChannels(messageBufferSize int) {
endChannel = make(chan struct{})
metricsRequestChannel = make(chan struct{})
metricsChannel = make(chan Metric, 100)
metricsToRequest = make(chan []Metric)
metricsLimit = messageBufferSize
}

func closeChannels() {
close(endChannel)
close(metricsRequestChannel)
close(metricsChannel)
close(metricsToRequest)
}

func eventLoop() {
for {
select {
case <-metricsRequestChannel:
shallowCopyMetrics := metrics
metrics = make([]Metric, 0)
metricsToRequest <- shallowCopyMetrics

case metric := <-metricsChannel:
if len(metrics) < metricsLimit {
metrics = append(metrics, metric)
} else {
Log.Warn("too many metrics waiting to be scraped. Are you sure that metrics scraping is running?")
}

case <-endChannel:
endChannel <- struct{}{}
return
}
}
}

func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
metricsRequestChannel <- struct{}{}
metricsToConvert := <-metricsToRequest
if metricsToConvert == nil {
metricsToConvert = make([]Metric, 0)
}
json.NewEncoder(w).Encode(metricsToConvert)
}

func Send(metric Metric) {
metricsChannel <- metric
}

func handleFunc(endpointName string) {
// recover is here to correctly allow multiple Starts and Stops of server
defer func() {
recover()
}()

http.HandleFunc(endpointName, exportMetricsAndReset)
}

// \param port port where the scraping endpoint will be created
// \param endpointName name of the endpoint, which must start with a slash eg. "/internalmetrics"
// \param messageBufferSize size of buffer for messages where messages are kept between scraping request.
//
// If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged.
func Start(port uint16, endpointName string, messageBufferSize int) error {
if server != nil {
return nil
}

initChannels(messageBufferSize)

go eventLoop()

server := &http.Server{Addr: fmt.Sprintf(":%d", port)}
handleFunc(endpointName)
return server.ListenAndServe()
}

func Stop() {
if server == nil {
return
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
server.Shutdown(ctx)

endChannel <- struct{}{}
<-endChannel
server = nil
}
Loading