From 54ca5f8d0fdcc2afef79ff67a1f9243293a0a983 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Mon, 9 Dec 2024 17:53:43 +0100 Subject: [PATCH 1/2] [core] Adding http metrics endpoint --- Makefile | 2 +- common/ecsmetrics/metric.go | 14 ++ common/ecsmetrics/metrics.go | 73 ++++++++++ common/monitoring/metric.go | 27 ++++ common/monitoring/monitoring.go | 125 ++++++++++++++++++ common/monitoring/monitoring_test.go | 190 +++++++++++++++++++++++++++ core/config.go | 4 + core/core.go | 53 +++++++- 8 files changed, 485 insertions(+), 3 deletions(-) create mode 100644 common/ecsmetrics/metric.go create mode 100644 common/ecsmetrics/metrics.go create mode 100644 common/monitoring/metric.go create mode 100644 common/monitoring/monitoring.go create mode 100644 common/monitoring/monitoring_test.go diff --git a/Makefile b/Makefile index 4fcb0f8f..c87c21b6 100644 --- a/Makefile +++ b/Makefile @@ -71,7 +71,7 @@ INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT)) GENERATE_DIRS := ./apricot ./coconut/cmd ./common ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./walnut TEST_DIRS := ./apricot/local ./common/gera ./common/utils/safeacks ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment -GO_TEST_DIRS := ./core/repos ./core/integration/dcs +GO_TEST_DIRS := ./core/repos ./core/integration/dcs ./common/monitoring coverage:COVERAGE_PREFIX := ./coverage_results coverage:GOTEST_COVERAGE_FILE := $(COVERAGE_PREFIX)/gotest.out diff --git a/common/ecsmetrics/metric.go b/common/ecsmetrics/metric.go new file mode 100644 index 00000000..fda30f87 --- /dev/null +++ b/common/ecsmetrics/metric.go @@ -0,0 +1,14 @@ +package ecsmetrics + +import ( + "time" + + "github.com/AliceO2Group/Control/common/monitoring" +) + +func NewMetric(name string) monitoring.Metric { + timestamp := time.Now() + metric := monitoring.Metric{Name: name, Timestamp: timestamp.UnixMilli()} + metric.AddTag("subsystem", "ECS") + return metric +} diff --git a/common/ecsmetrics/metrics.go b/common/ecsmetrics/metrics.go new file mode 100644 index 00000000..f9d5f018 --- /dev/null +++ b/common/ecsmetrics/metrics.go @@ -0,0 +1,73 @@ +package ecsmetrics + +import ( + internalmetrics "runtime/metrics" + "time" + + "github.com/AliceO2Group/Control/common/logger" + "github.com/AliceO2Group/Control/common/monitoring" + "github.com/sirupsen/logrus" +) + +var ( + endRequestChannel chan struct{} + log = logger.New(logrus.StandardLogger(), "ecsmetrics") +) + +func gather() monitoring.Metric { + samples := []internalmetrics.Sample{ + {Name: "/gc/cycles/total:gc-cycles"}, + {Name: "/memory/classes/other:bytes"}, + {Name: "/memory/classes/total:bytes"}, + {Name: "/sched/goroutines:goroutines"}, + {Name: "/sync/mutex/wait/total:seconds"}, + {Name: "/memory/classes/other:bytes"}, + {Name: "/memory/classes/total:bytes"}, + {Name: "/memory/classes/heap/free:bytes"}, + {Name: "/memory/classes/heap/objects:bytes"}, + {Name: "/memory/classes/heap/released:bytes"}, + {Name: "/memory/classes/heap/stacks:bytes"}, + {Name: "/memory/classes/heap/unused:bytes"}, + } + + // Collect metrics data + internalmetrics.Read(samples) + + metric := NewMetric("golangruntimemetrics") + + for _, sample := range samples { + switch sample.Value.Kind() { + case internalmetrics.KindUint64: + metric.AddValue(sample.Name, sample.Value.Uint64()) + case internalmetrics.KindFloat64: + metric.AddValue(sample.Name, sample.Value.Float64()) + case internalmetrics.KindFloat64Histogram: + log.Warning("Error: Histogram is not supported yet for metric [%s]", sample.Name) + continue + default: + log.Warning("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name) + continue + } + } + return metric +} + +func StartGolangMetrics(period time.Duration) { + go func() { + for { + select { + case <-endRequestChannel: + endRequestChannel <- struct{}{} + return + default: + monitoring.Send(gather()) + time.Sleep(period) + } + } + }() +} + +func StopGolangMetrics() { + endRequestChannel <- struct{}{} + <-endRequestChannel +} diff --git a/common/monitoring/metric.go b/common/monitoring/metric.go new file mode 100644 index 00000000..89173323 --- /dev/null +++ b/common/monitoring/metric.go @@ -0,0 +1,27 @@ +package monitoring + +type ( + TagsType map[string]any + ValuesType map[string]any +) + +type Metric struct { + Name string `json:"name"` + Values ValuesType `json:"values"` + Tags TagsType `json:"tags,omitempty"` + Timestamp int64 `json:"timestamp"` +} + +func (metric *Metric) AddTag(tagName string, value any) { + if metric.Tags == nil { + metric.Tags = make(TagsType) + } + metric.Tags[tagName] = value +} + +func (metric *Metric) AddValue(valueName string, value any) { + if metric.Values == nil { + metric.Values = make(ValuesType) + } + metric.Values[valueName] = value +} diff --git a/common/monitoring/monitoring.go b/common/monitoring/monitoring.go new file mode 100644 index 00000000..a6eae11b --- /dev/null +++ b/common/monitoring/monitoring.go @@ -0,0 +1,125 @@ +package monitoring + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/AliceO2Group/Control/common/logger" + "github.com/sirupsen/logrus" +) + +var ( + server *http.Server + metricsLimit int = 1000000 + metrics []Metric + // channel that is used to request end of metrics server, it sends notification when server ended. + // It needs to be read!!! + endChannel chan struct{} + + // channel used to send metrics into the event loop + metricsChannel chan Metric + + // channel for sending notifications to event loop that new http Request to report metrics arrived + metricsRequestChannel chan struct{} + + // channel used to send metrics to be reported by http request from event loop + metricsToRequest chan []Metric + + Log = logger.New(logrus.StandardLogger(), "metrics") +) + +func initChannels(messageBufferSize int) { + endChannel = make(chan struct{}) + metricsRequestChannel = make(chan struct{}) + metricsChannel = make(chan Metric, 100) + metricsToRequest = make(chan []Metric) + metricsLimit = messageBufferSize +} + +func closeChannels() { + close(endChannel) + close(metricsRequestChannel) + close(metricsChannel) + close(metricsToRequest) +} + +func eventLoop() { + for { + select { + case <-metricsRequestChannel: + shallowCopyMetrics := metrics + metrics = make([]Metric, 0) + metricsToRequest <- shallowCopyMetrics + + case metric := <-metricsChannel: + if len(metrics) < metricsLimit { + metrics = append(metrics, metric) + } else { + Log.Warn("too many metrics waiting to be scraped. Are you sure that metrics scraping is running?") + } + + case <-endChannel: + endChannel <- struct{}{} + return + } + } +} + +func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + metricsRequestChannel <- struct{}{} + metricsToConvert := <-metricsToRequest + if metricsToConvert == nil { + metricsToConvert = make([]Metric, 0) + } + json.NewEncoder(w).Encode(metricsToConvert) +} + +func Send(metric Metric) { + metricsChannel <- metric +} + +func handleFunc(endpointName string) { + // recover is here to correctly allow multiple Starts and Stops of server + defer func() { + recover() + }() + + http.HandleFunc(endpointName, exportMetricsAndReset) +} + +// \param port port where the scraping endpoint will be created +// \param endpointName name of the endpoint, which must start with a slash eg. "/internalmetrics" +// \param messageBufferSize size of buffer for messages where messages are kept between scraping request. +// +// If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged. +func Start(port uint16, endpointName string, messageBufferSize int) error { + if server != nil { + return nil + } + + initChannels(messageBufferSize) + + go eventLoop() + + server := &http.Server{Addr: fmt.Sprintf(":%d", port)} + handleFunc(endpointName) + return server.ListenAndServe() +} + +func Stop() { + if server == nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + server.Shutdown(ctx) + + endChannel <- struct{}{} + <-endChannel + server = nil +} diff --git a/common/monitoring/monitoring_test.go b/common/monitoring/monitoring_test.go new file mode 100644 index 00000000..4d3a3d95 --- /dev/null +++ b/common/monitoring/monitoring_test.go @@ -0,0 +1,190 @@ +package monitoring + +import ( + "encoding/json" + "fmt" + "net/http" + "testing" + "time" +) + +func TestSimpleStartStop(t *testing.T) { + go Start(1234, "/random", 100) + time.Sleep(time.Millisecond * 100) + Stop() +} + +func TestStartMultipleStop(t *testing.T) { + go Start(1234, "/random", 100) + time.Sleep(time.Millisecond * 100) + Stop() + Stop() +} + +func cleaningUpAfterTest() { + endChannel <- struct{}{} + <-endChannel + closeChannels() + metrics = make([]Metric, 0) +} + +func initTest() { + initChannels(100) + // we need metrics channel to block so we don't end to quickly + metricsChannel = make(chan Metric, 0) + go eventLoop() +} + +// decorator function that properly inits and cleans after higher level test of Monitoring package +func testFunction(t *testing.T, testToRun func(*testing.T)) { + initTest() + testToRun(t) + cleaningUpAfterTest() +} + +func TestSendingSingleMetric(t *testing.T) { + testFunction(t, func(t *testing.T) { + metric := Metric{Name: "test"} + Send(metric) + if len(metrics) != 1 { + t.Error("wrong number of metrics, should be 1") + } + + if metrics[0].Name != "test" { + t.Errorf("Got wrong name %s in stored metric", metrics[0].Name) + } + }) +} + +func TestExportingMetrics(t *testing.T) { + testFunction(t, func(t *testing.T) { + metric := Metric{Name: "test"} + Send(metric) + + metricsRequestChannel <- struct{}{} + metrics := <-metricsToRequest + + if len(metrics) != 1 { + t.Errorf("Got wrong amount of metrics %d, expected 1", len(metrics)) + } + + if metrics[0].Name != "test" { + t.Errorf("Got wrong name of metric %s, expected test", metrics[0].Name) + } + }) +} + +func TestBufferLimit(t *testing.T) { + testFunction(t, func(t *testing.T) { + metricsLimit = 1 + metric := Metric{Name: "test"} + metric.Timestamp = 10 + metric.AddTag("tag1", 42) + metric.AddValue("value1", 11) + Send(metric) + + if len(metrics) != 1 { + t.Errorf("Metrics length is %d, but should be 1 after sending first metric", len(metrics)) + } + + Send(metric) + time.Sleep(100 * time.Millisecond) + + if len(metrics) != 1 { + t.Errorf("Metrics length is %d, but should be 1 after sending second metric", len(metrics)) + } + }) +} + +func TestHttpRun(t *testing.T) { + go Start(12345, "/metrics", 10) + defer Stop() + + time.Sleep(time.Second) + + metric := Metric{Name: "test"} + metric.Timestamp = 10 + metric.AddTag("tag1", 42) + metric.AddValue("value1", 11) + Send(metric) + + response, err := http.Get("http://localhost:12345/metrics") + if err != nil { + t.Fatalf("Failed to GET metrics at port 12345: %v", err) + } + decoder := json.NewDecoder(response.Body) + var receivedMetrics []Metric + if err = decoder.Decode(&receivedMetrics); err != nil { + t.Fatalf("Failed to decoded Metric: %v", err) + } + + receivedMetric := receivedMetrics[0] + + if receivedMetric.Name != "test" { + t.Errorf("Got wrong name of metric %s, expected test", receivedMetric.Name) + } + + if receivedMetric.Timestamp != 10 { + t.Errorf("Got wrong timestamp of metric %d, expected 10", receivedMetric.Timestamp) + } + + if len(receivedMetric.Tags) != 1 { + t.Errorf("Got wrong number of tags %d, expected 1", len(receivedMetric.Tags)) + } + + if receivedMetric.Tags["tag1"].(float64) != 42 { + t.Error("Failed to retreive tags: tag1 with value 42") + } + + if len(receivedMetric.Values) != 1 { + t.Errorf("Got wrong number of values %d, expected 1", len(receivedMetric.Values)) + } + + if receivedMetric.Values["value1"].(float64) != 11 { + t.Error("Failed to retreive tags: value1 with value 11") + } +} + +// This benchmark cannot be run for too long as it will fill whole RAM even with +// results: +// goos: linux +// goarch: amd64 +// pkg: github.com/AliceO2Group/Control/common/monitoring +// cpu: 11th Gen Intel(R) Core(TM) i9-11900H @ 2.50GHz +// BenchmarkSendingMetrics-16 +// +// 123365481 192.6 ns/op +// PASS +// ok github.com/AliceO2Group/Control/common/monitoring 44.686s +func BenchmarkSendingMetrics(b *testing.B) { + Start(12345, "/metrics", 100) + + // this goroutine keeps clearing results so RAM does not exhausted + go func() { + for { + select { + case <-endChannel: + endChannel <- struct{}{} + break + default: + if len(metrics) >= 10000000 { + metricsRequestChannel <- struct{}{} + <-metricsToRequest + } + } + time.Sleep(100 * time.Millisecond) + } + }() + + defer Stop() + + metric := Metric{Name: "testname", Timestamp: 12345} + metric.AddValue("value", 42) + metric.AddTag("tag", 40) + + for i := 0; i < b.N; i++ { + Send(metric) + } + + fmt.Println("") +} diff --git a/core/config.go b/core/config.go index a6a891f0..e772f694 100644 --- a/core/config.go +++ b/core/config.go @@ -128,6 +128,8 @@ func setDefaults() error { viper.SetDefault("kafkaEndpoints", []string{"localhost:9092"}) viper.SetDefault("enableKafka", true) viper.SetDefault("logAllIL", false) + viper.SetDefault("metricsEndpoint", "8086/metrics") + viper.SetDefault("metricsBufferSize", 10000) return nil } @@ -198,6 +200,8 @@ func setFlags() error { pflag.StringSlice("kafkaEndpoints", viper.GetStringSlice("kafkaEndpoints"), "List of Kafka endpoints to connect to (default: localhost:9092)") pflag.Bool("enableKafka", viper.GetBool("enableKafka"), "Turn on the kafka messaging") pflag.Bool("logAllIL", viper.GetBool("logAllIL"), "Send all the logs into IL, including Debug and Trace messages") + pflag.String("metricsEndpoint", viper.GetString("metricsEndpoint"), "Http endpoint from which metrics can be scraped: [port/endpoint]") + pflag.Int("metricsBufferSize", viper.GetInt("metricsBufferSize"), "Limit for how many metrics can be stored in buffer in between scraping requests") pflag.Parse() return viper.BindPFlags(pflag.CommandLine) diff --git a/core/core.go b/core/core.go index e0db7224..90358508 100644 --- a/core/core.go +++ b/core/core.go @@ -26,12 +26,19 @@ package core import ( "context" + "errors" "fmt" "net" + "net/http" + "regexp" + "strconv" "syscall" + "time" + "github.com/AliceO2Group/Control/common/ecsmetrics" "github.com/AliceO2Group/Control/common/event/topic" "github.com/AliceO2Group/Control/common/logger/infologger" + "github.com/AliceO2Group/Control/common/monitoring" pb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/core/the" @@ -44,8 +51,45 @@ import ( var log = logger.New(logrus.StandardLogger(), "core") -const fileLimitWant = 65536 -const fileLimitMin = 8192 +const ( + fileLimitWant = 65536 + fileLimitMin = 8192 +) + +func parseMetricsEndpoint(metricsEndpoint string) (error, uint16, string) { + pattern := `(^[0-9]{4,5})\/([a-zA-Z]+)` + re := regexp.MustCompile(pattern) + matches := re.FindStringSubmatch(metricsEndpoint) + + if matches != nil { + port, err := strconv.ParseUint(matches[1], 10, 16) + if err != nil { + return err, 0, "" + } + return nil, uint16(port), matches[2] + } else { + return errors.New("Failed to parse metrics endpoint: %s"), 0, "" + } +} + +func runMetrics() { + log.Info("Starting run metrics") + metricsEndpoint := viper.GetString("metricsEndpoint") + err, port, endpoint := parseMetricsEndpoint(metricsEndpoint) + if err != nil { + log.WithField("error", err).Error("Failed to parse metrics endpoint") + return + } + + go func() { + if err := monitoring.Start(port, fmt.Sprintf("/%s", endpoint), viper.GetInt("metricsBufferSize")); err != nil && err != http.ErrServerClosed { + ecsmetrics.StopGolangMetrics() + log.Errorf("failed to run metrics on port %d and endpoint: %s") + } + }() + + ecsmetrics.StartGolangMetrics(10 * time.Second) +} // Run is the entry point for this scheduler. // TODO: refactor Config to reflect our specific requirements @@ -99,6 +143,11 @@ func Run() error { // Plugins need to start after taskman is running, because taskman provides the FID integration.PluginsInstance().InitAll(state.taskman.GetFrameworkID()) + runMetrics() + defer ecsmetrics.StopGolangMetrics() + defer monitoring.Stop() + + log.Infof("Everything initiated Listening on control port: %d", viper.GetInt("controlPort")) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", viper.GetInt("controlPort"))) if err != nil { From 3919a84d4a6daeef21fea55cc9c78cabfd76b6b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Thu, 12 Dec 2024 11:51:52 +0100 Subject: [PATCH 2/2] fixup! [core] Adding http metrics endpoint --- core/core.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/core.go b/core/core.go index 90358508..d1c708cc 100644 --- a/core/core.go +++ b/core/core.go @@ -147,7 +147,7 @@ func Run() error { defer ecsmetrics.StopGolangMetrics() defer monitoring.Stop() - log.Infof("Everything initiated Listening on control port: %d", viper.GetInt("controlPort")) + log.WithField("level", infologger.IL_Devel).Infof("Everything initiated and listening on control port: %d", viper.GetInt("controlPort")) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", viper.GetInt("controlPort"))) if err != nil {