Skip to content

Commit

Permalink
[core] Adding http metrics endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Tichák committed Dec 9, 2024
1 parent de4d866 commit 78908b9
Show file tree
Hide file tree
Showing 6 changed files with 421 additions and 2 deletions.
64 changes: 64 additions & 0 deletions common/ecsmetrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package ecsmetrics

import (
"fmt"
internalmetrics "runtime/metrics"
"time"

"github.com/AliceO2Group/Control/common/monitoring"
)

var endRequestChannel chan struct{}

func gather() monitoring.Metric {
samples := []internalmetrics.Sample{
{Name: "/gc/heap/allocs:bytes"},
{Name: "/gc/heap/frees:bytes"},
{Name: "/memory/classes/heap/free:bytes"},
{Name: "/memory/classes/heap/objects:bytes"},
{Name: "/sched/goroutines:goroutines"},
}

// Collect metrics data
internalmetrics.Read(samples)

timestamp := time.Now()
metric := monitoring.Metric{Name: "golangmetrics", Timestamp: timestamp.UnixMilli()}
metric.AddTag("subsystem", "ecs")
for _, sample := range samples {
switch sample.Value.Kind() {
case internalmetrics.KindUint64:
metric.AddValue(sample.Name, sample.Value.Uint64())
case internalmetrics.KindFloat64:
metric.AddValue(sample.Name, sample.Value.Float64())
case internalmetrics.KindFloat64Histogram:
fmt.Printf("Error: Histogram is not supported yet for metric [%s]", sample.Name)
continue

default:
fmt.Printf("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name)
continue
}
}
return metric
}

func StartGolangMetrics(period time.Duration) {
go func() {
for {
select {
case <-endRequestChannel:
endRequestChannel <- struct{}{}
return
default:
monitoring.Send(gather())
time.Sleep(period)
}
}
}()
}

func StopGolangMetrics() {
endRequestChannel <- struct{}{}
<-endRequestChannel
}
27 changes: 27 additions & 0 deletions common/monitoring/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package monitoring

type (
TagsType map[string]any
ValuesType map[string]any
)

type Metric struct {
Name string `json:"name"`
Values ValuesType `json:"values"`
Tags TagsType `json:"tags,omitempty"`
Timestamp int64 `json:"timestamp"`
}

func (metric *Metric) AddTag(tagName string, value any) {
if metric.Tags == nil {
metric.Tags = make(TagsType)
}
metric.Tags[tagName] = value
}

func (metric *Metric) AddValue(valueName string, value any) {
if metric.Values == nil {
metric.Values = make(ValuesType)
}
metric.Values[valueName] = value
}
111 changes: 111 additions & 0 deletions common/monitoring/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package monitoring

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)

var (
server *http.Server
metrics []Metric
// channel that is used to request end of metrics server, it sends notification when server ended.
// It needs to be read!!!
endChannel chan struct{}

// channel used to send metrics into the event loop
metricsChannel chan Metric

// channel for sending notifications to event loop that new http Request to report metrics arrived
metricsRequestChannel chan struct{}

// channel used to send metrics to be reported by http request from event loop
metricsToRequest chan []Metric
)

func initChannels(messageBufferSize int) {
endChannel = make(chan struct{})
metricsRequestChannel = make(chan struct{})
metricsChannel = make(chan Metric, messageBufferSize)
metricsToRequest = make(chan []Metric)
}

func closeChannels() {
close(endChannel)
close(metricsRequestChannel)
close(metricsChannel)
close(metricsToRequest)
}

func eventLoop() {
for {
select {
case <-metricsRequestChannel:
shallowCopyMetrics := metrics
metrics = make([]Metric, 0)
metricsToRequest <- shallowCopyMetrics

case metric := <-metricsChannel:
metrics = append(metrics, metric)

case <-endChannel:
endChannel <- struct{}{}
return
}
}
}

func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
metricsRequestChannel <- struct{}{}
metricsToConvert := <-metricsToRequest
if metricsToConvert == nil {
metricsToConvert = make([]Metric, 0)
}
json.NewEncoder(w).Encode(metricsToConvert)
}

func Send(metric Metric) {
metricsChannel <- metric
}

func handleFunc(endpointName string) {
// recover is here to correctly allow multiple Starts and Stops of server
defer func() {
recover()
}()

http.HandleFunc(endpointName, exportMetricsAndReset)
}

// \param url In format if url:port to be used together with
// \param endpoint
func Start(port uint16, endpointName string, messageBufferSize int) error {
if server != nil {
return nil
}

initChannels(messageBufferSize)

go eventLoop()

server := &http.Server{Addr: fmt.Sprintf(":%d", port)}
handleFunc(endpointName)
return server.ListenAndServe()
}

func Stop() {
if server == nil {
return
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
server.Shutdown(ctx)

endChannel <- struct{}{}
<-endChannel
server = nil
}
167 changes: 167 additions & 0 deletions common/monitoring/monitoring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package monitoring

import (
"encoding/json"
"fmt"
"net/http"
"testing"
"time"
)

func TestSimpleStartStop(t *testing.T) {
go Start(1234, "/random", 100)
time.Sleep(time.Millisecond * 100)
Stop()
}

func TestStartMultipleStop(t *testing.T) {
go Start(1234, "/random", 100)
time.Sleep(time.Millisecond * 100)
Stop()
Stop()
}

func cleaningUpAfterTest() {
endChannel <- struct{}{}
<-endChannel
closeChannels()
metrics = make([]Metric, 0)
}

func initTest() {
// we need message channel to block so we don't end to quickly
initChannels(0)
go eventLoop()
}

// decorator function that properly inits and cleans after higher level test of Monitoring package
func testFunction(t *testing.T, testToRun func(*testing.T)) {
initTest()
testToRun(t)
cleaningUpAfterTest()
}

func TestSendingSingleMetric(t *testing.T) {
testFunction(t, func(t *testing.T) {
metric := Metric{Name: "test"}
Send(metric)
if len(metrics) != 1 {
t.Error("wrong number of metrics, should be 1")
}

if metrics[0].Name != "test" {
t.Errorf("Got wrong name %s in stored metric", metrics[0].Name)
}
})
}

func TestExportingMetrics(t *testing.T) {
testFunction(t, func(t *testing.T) {
metric := Metric{Name: "test"}
Send(metric)

metricsRequestChannel <- struct{}{}
metrics := <-metricsToRequest

if len(metrics) != 1 {
t.Errorf("Got wrong amount of metrics %d, expected 1", len(metrics))
}

if metrics[0].Name != "test" {
t.Errorf("Got wrong name of metric %s, expected test", metrics[0].Name)
}
})
}

func TestHttpRun(t *testing.T) {
go Start(12345, "/metrics", 10)
defer Stop()

time.Sleep(time.Second)

metric := Metric{Name: "test"}
metric.Timestamp = 10
metric.AddTag("tag1", 42)
metric.AddValue("value1", 11)
Send(metric)

response, err := http.Get("http://localhost:12345/metrics")
if err != nil {
t.Fatalf("Failed to GET metrics at port 12345: %v", err)
}
decoder := json.NewDecoder(response.Body)
var receivedMetrics []Metric
if err = decoder.Decode(&receivedMetrics); err != nil {
t.Fatalf("Failed to decoded Metric: %v", err)
}

receivedMetric := receivedMetrics[0]

if receivedMetric.Name != "test" {
t.Errorf("Got wrong name of metric %s, expected test", receivedMetric.Name)
}

if receivedMetric.Timestamp != 10 {
t.Errorf("Got wrong timestamp of metric %d, expected 10", receivedMetric.Timestamp)
}

if len(receivedMetric.Tags) != 1 {
t.Errorf("Got wrong number of tags %d, expected 1", len(receivedMetric.Tags))
}

if receivedMetric.Tags["tag1"].(float64) != 42 {
t.Error("Failed to retreive tags: tag1 with value 42")
}

if len(receivedMetric.Values) != 1 {
t.Errorf("Got wrong number of values %d, expected 1", len(receivedMetric.Values))
}

if receivedMetric.Values["value1"].(float64) != 11 {
t.Error("Failed to retreive tags: value1 with value 11")
}
}

// This benchmark cannot be run for too long as it will fill whole RAM even with
// results:
// goos: linux
// goarch: amd64
// pkg: github.com/AliceO2Group/Control/common/monitoring
// cpu: 11th Gen Intel(R) Core(TM) i9-11900H @ 2.50GHz
// BenchmarkSendingMetrics-16
//
// 123365481 192.6 ns/op
// PASS
// ok github.com/AliceO2Group/Control/common/monitoring 44.686s
func BenchmarkSendingMetrics(b *testing.B) {
Start(12345, "/metrics", 100)

// this goroutine keeps clearing results so RAM does not exhausted
go func() {
for {
select {
case <-endChannel:
endChannel <- struct{}{}
break
default:
if len(metrics) >= 10000000 {
metricsRequestChannel <- struct{}{}
<-metricsToRequest
}
}
time.Sleep(100 * time.Millisecond)
}
}()

defer Stop()

metric := Metric{Name: "testname", Timestamp: 12345}
metric.AddValue("value", 42)
metric.AddTag("tag", 40)

for i := 0; i < b.N; i++ {
Send(metric)
}

fmt.Println("")
}
2 changes: 2 additions & 0 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func setDefaults() error {
viper.SetDefault("kafkaEndpoints", []string{"localhost:9092"})
viper.SetDefault("enableKafka", true)
viper.SetDefault("logAllIL", false)
viper.SetDefault("metricsEndpoint", "8086/metrics")
return nil
}

Expand Down Expand Up @@ -198,6 +199,7 @@ func setFlags() error {
pflag.StringSlice("kafkaEndpoints", viper.GetStringSlice("kafkaEndpoints"), "List of Kafka endpoints to connect to (default: localhost:9092)")
pflag.Bool("enableKafka", viper.GetBool("enableKafka"), "Turn on the kafka messaging")
pflag.Bool("logAllIL", viper.GetBool("logAllIL"), "Send all the logs into IL, including Debug and Trace messages")
pflag.String("metricsEndpoint", viper.GetString("metricsEndpoint"), "Http endpoint from which metrics can be scraped: [port/endpoint]")

pflag.Parse()
return viper.BindPFlags(pflag.CommandLine)
Expand Down
Loading

0 comments on commit 78908b9

Please sign in to comment.