Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Om94 - support index-type and sindex-type array-kind of stats #89

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/gobwas/glob"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

goversion "github.com/hashicorp/go-version"
)
Expand All @@ -37,17 +38,24 @@ const (
CTX_LATENCIES ContextType = "latencies"
)

const STORAGE_ENGINE = "storage-engine_"

// below constant represent the labels we send along with metrics to Prometheus or something
const METRIC_LABEL_CLUSTER_NAME = "cluster_name"
const METRIC_LABEL_SERVICE = "service"
const METRIC_LABEL_NS = "ns"
const METRIC_LABEL_SET = "set"
const METRIC_LABEL_LE = "le"
const METRIC_LABEL_DC_NAME = "dc"
const METRIC_LABEL_SINDEX = "sindex"
const METRIC_LABEL_USER = "user"
const (
METRIC_LABEL_CLUSTER_NAME = "cluster_name"
METRIC_LABEL_SERVICE = "service"
METRIC_LABEL_NS = "ns"
METRIC_LABEL_SET = "set"
METRIC_LABEL_LE = "le"
METRIC_LABEL_DC_NAME = "dc"
METRIC_LABEL_SINDEX = "sindex"
METRIC_LABEL_USER = "user"
)

// constants used to identify type of metrics
const (
STORAGE_ENGINE = "storage-engine"
INDEX_TYPE = "index-type"
SINDEX_TYPE = "sindex-type"
)

func makeMetric(namespace, name string, t metricType, constLabels map[string]string, labels ...string) promMetric {
promDesc := prometheus.NewDesc(
Expand Down Expand Up @@ -424,3 +432,21 @@ func getMetricType(pContext ContextType, pRawMetricName string) metricType {

return mtCounter
}

// This is a common utility, used by all the watchers to push metric to prometheus
func pushToPrometheus(asMetric AerospikeStat, pv float64, labels []string, labelValues []string,
ch chan<- prometheus.Metric) {

if asMetric.isAllowed {
// handle any panic from prometheus, this may occur when prom encounters a config/stat with special characters
defer func() {
if r := recover(); r != nil {
log.Tracef("%s recovered from panic while handling stat %s", string(asMetric.context), asMetric.name)
}
}()

desc, valueType := asMetric.makePromMetric(labels...)
ch <- prometheus.MustNewConstMetric(desc, valueType, pv, labelValues...)

}
}
17 changes: 1 addition & 16 deletions gauge_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,16 @@ package main

import (
"fmt"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetGaugesNotEmpty(t *testing.T) {

// this is to force-reload the config in the NamespaceWatcher, this is a check on this param in NamespaceWatcher implementation
os.Setenv(TESTCASE_MODE, TESTCASE_MODE_TRUE)

fmt.Println("initializing GaugeMetrics ... TestGetGaugesNotEmpty")

// Initialize and validate Gauge config
gaugeList := new(GaugeStats)

initGaugeStats(METRICS_CONFIG_FILE, gaugeList)

nslist := gaugeList.NamespaceStats
Expand All @@ -27,10 +21,6 @@ func TestGetGaugesNotEmpty(t *testing.T) {
}

func TestGetGaugesCounts(t *testing.T) {

// this is to force-reload the config in the NamespaceWatcher, this is a check on this param in NamespaceWatcher implementation
os.Setenv(TESTCASE_MODE, TESTCASE_MODE_TRUE)

fmt.Println("initializing GaugeMetrics ... TestGetGaugesCounts")

// Initialize and validate Gauge config
Expand All @@ -56,9 +46,7 @@ func TestGetGaugesCounts(t *testing.T) {
}

func TestIsAGaugeTrue(t *testing.T) {

// this is to force-reload the config in the NamespaceWatcher, this is a check on this param in NamespaceWatcher implementation
os.Setenv(TESTCASE_MODE, TESTCASE_MODE_TRUE)
fmt.Println("initializing GaugeMetrics ... TestIsAGaugeTrue")

// Initialize and validate Gauge config
gaugeList := new(GaugeStats)
Expand Down Expand Up @@ -86,9 +74,6 @@ func TestIsAGaugeTrue(t *testing.T) {

func TestNoGaugeExists(t *testing.T) {

// this is to force-reload the config in the NamespaceWatcher, this is a check on this param in NamespaceWatcher implementation
os.Setenv(TESTCASE_MODE, TESTCASE_MODE_TRUE)

fmt.Println("initializing GaugeMetrics ... TestNoGaugeExists")

// Initialize and validate Gauge config
Expand Down
47 changes: 42 additions & 5 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"sort"
"strconv"
"strings"

Expand All @@ -13,13 +14,9 @@ var LABELS_APE_TOML = "tests/labels_ape.toml"
var NS_ALLOWLIST_APE_TOML = "tests/ns_allowlist_ape.toml"
var NS_BLOCKLIST_APE_TOML = "tests/ns_blocklist_ape.toml"

var TESTCASE_MODE = "TESTCASE_MODE"
var TESTCASE_MODE_TRUE = "true"
var TESTCASE_MODE_FALSE = "false"

var METRICS_CONFIG_FILE = "gauge_stats_list.toml"

// var g_ns_metric_allow_list = []string{"aerospike_namespace_master_objects", "aerospike_namespace_memory_used_bytes"}
var MOCK_TEST_DATA_FILE = "tests/mock_test_data.txt"

func extractNamespaceFromLabel(label string) string {
// [name:"cluster_name" value:"" name:"ns" value:"bar" name:"service" value:"" ]
Expand All @@ -30,6 +27,14 @@ func extractNamespaceFromLabel(label string) string {
return nsFromLabel
}

func stringifyLabel(label string) string {
labelReplacerFunc := strings.NewReplacer(".", "_", "-", "_", " ", "_", " ", "_", "[", "_", "]", "_", "\"", "_", ":", "_", "/", "_")
hypenReplacerFunc := strings.NewReplacer("_", "")

// return labelReplacerFunc.Replace(label)
return hypenReplacerFunc.Replace(labelReplacerFunc.Replace(label))
}

func extractLabelNameValueFromFullLabel(fullLabel string, reqName string) string {

// Example Given Original: [name:"cluster_name" value:"null" name:"service" value:"172.17.0.3:3000" ]
Expand Down Expand Up @@ -256,3 +261,35 @@ func extractOperationFromMetric(metricName string) string {

return operationName
}

func copyConfigLabels() map[string]string {
cfgLabels := config.AeroProm.MetricLabels

returnLabelMap := make(map[string]string)
for key, value := range cfgLabels {
returnLabelMap[key] = value
}

return returnLabelMap
}

func createLabelByNames(labelsMap map[string]string) string {

arr_label_names := []string{}
createdLabelString := ""

for key := range labelsMap {
arr_label_names = append(arr_label_names, strings.TrimSpace(key))
}

// sort the keys
sort.Strings(arr_label_names)

for idx := range arr_label_names {
keyName := arr_label_names[idx]
value := labelsMap[keyName]
createdLabelString = createdLabelString + constructLabelElement(keyName, strings.TrimSpace(value))
}

return strings.TrimSpace(createdLabelString)
}
618 changes: 440 additions & 178 deletions mock_data_generator_test.go

Large diffs are not rendered by default.

19 changes: 11 additions & 8 deletions observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,13 @@ func (o *Observer) refresh(ch chan<- prometheus.Metric) (map[string]string, erro
infoKeys = append(infoKeys, keys...)
}
}
// append infoKey "build" - this is removed from WatcherLatencies to avoid forced watcher sequence during refresh
infoKeys = append(infoKeys, "build")

// info request for first set of info keys
rawMetrics, err := o.requestInfo(retryCount, infoKeys)
// info request for first set of info keys, this retrives configs from server
// from namespaces,server/node-stats, xdr
// if for any context (like jobs, latencies etc.,) no configs, they are not sent to server
passOneOutput, err := o.requestInfo(retryCount, infoKeys)
if err != nil {
return nil, err
}
Expand All @@ -242,25 +246,24 @@ func (o *Observer) refresh(ch chan<- prometheus.Metric) (map[string]string, erro
infoKeys = []string{ikClusterName, ikService, ikBuild}
watcherInfoKeys := make([][]string, len(o.watchers))
for i, c := range o.watchers {
if keys := c.passTwoKeys(rawMetrics); len(keys) > 0 {
if keys := c.passTwoKeys(passOneOutput); len(keys) > 0 {
infoKeys = append(infoKeys, keys...)
watcherInfoKeys[i] = keys
}
}

// info request for second set of info keys
nRawMetrics, err := o.requestInfo(retryCount, infoKeys)
// info request for second set of info keys, this retrieves all the stats from server
rawMetrics, err := o.requestInfo(retryCount, infoKeys)
if err != nil {
return rawMetrics, err
return passOneOutput, err
}

rawMetrics = nRawMetrics

// sanitize the utf8 strings before sending them to watchers
for k, v := range rawMetrics {
rawMetrics[k] = sanitizeUTF8(v)
}

// sanitize the utf8 strings before sending them to watchers
for i, c := range o.watchers {
if err := c.refresh(o, watcherInfoKeys[i], rawMetrics, ch); err != nil {
return rawMetrics, err
Expand Down
56 changes: 56 additions & 0 deletions tests/mock_test_data.txt

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion watcher_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ type LatencyWatcher struct {
func (lw *LatencyWatcher) describe(ch chan<- *prometheus.Desc) {}

func (lw *LatencyWatcher) passOneKeys() []string {
return []string{"build"}
// return []string{"build"}
return nil
}

func (lw *LatencyWatcher) passTwoKeys(rawMetrics map[string]string) (latencyCommands []string) {
Expand Down
42 changes: 21 additions & 21 deletions watcher_latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"fmt"
"os"
"strings"
"testing"
"time"
Expand All @@ -23,30 +22,30 @@ func TestLatencies_PassOneKeys(t *testing.T) {
}

func TestLatencies_PassTwoKeys(t *testing.T) {
fmt.Println("initializing config ... TestLatencies_PassTwoKeys")

os.Setenv(TESTCASE_MODE, TESTCASE_MODE_TRUE)
mas := new(MockAerospikeServer)
mas.initialize()

fmt.Println("initializing config ... TestLatencies_PassTwoKeys")
// Initialize and validate config
config = new(Config)
initConfig(DEFAULT_APE_TOML, config)
config.validateAndUpdate()

watcher := new(LatencyWatcher)

rawMetrics := getRawMetrics()
rawMetrics := mas.fetchRawMetrics()
// simulate, as if we are sending requestInfo to AS and get the Latencies, these are coming from mock-data-generator
outputs := watcher.passTwoKeys(rawMetrics)

fmt.Println("TestLatencies_PassTwoKeys: outputs: ", outputs)

// output is not nil
assert.NotNil(t, outputs, "build details returns are: ", outputs)
assert.Equal(t, outputs, []string{"latencies:"})
}

func TestLatencies_RefreshDefault(t *testing.T) {
os.Setenv(TESTCASE_MODE, TESTCASE_MODE_TRUE)

fmt.Println("initializing config ... TestLatencies_RefreshDefault")
// Initialize and validate config
config = new(Config)
Expand All @@ -56,13 +55,14 @@ func TestLatencies_RefreshDefault(t *testing.T) {
// run the test-case logic
latencies_runTestCase(t)

os.Setenv(TESTCASE_MODE, TESTCASE_MODE_FALSE)
}

func TestLatencies_RefreshWithLabelsConfig(t *testing.T) {
os.Setenv(TESTCASE_MODE, TESTCASE_MODE_TRUE)

fmt.Println("initializing config ... TestLatencies_RefreshWithLabelsConfig")

mas := new(MockAerospikeServer)
mas.initialize()

// Initialize and validate config
config = new(Config)
initConfig(LABELS_APE_TOML, config)
Expand All @@ -71,15 +71,13 @@ func TestLatencies_RefreshWithLabelsConfig(t *testing.T) {
watcher := new(LatencyWatcher)

gaugeStatHandler = new(GaugeStats)

initGaugeStats(METRICS_CONFIG_FILE, gaugeStatHandler)
rawMetrics := getRawMetrics()
rawMetrics := mas.fetchRawMetrics()

lObserver := &Observer{}
ch := make(chan prometheus.Metric, 1000)
latenciesInfoKeys := watcher.passTwoKeys(rawMetrics)

watcher.passTwoKeys(rawMetrics)
err := watcher.refresh(lObserver, latenciesInfoKeys, rawMetrics, ch)

if err != nil {
Expand Down Expand Up @@ -123,20 +121,22 @@ func TestLatencies_RefreshWithLabelsConfig(t *testing.T) {
* complete logic to call watcher, generate-mock data and asset is part of this function
*/
func latencies_runTestCase(t *testing.T) {
watcher := new(LatencyWatcher)

gaugeStatHandler = new(GaugeStats)
// mock server
mas := new(MockAerospikeServer)
mas.initialize()

latdg := new(MockLatencyPromMetricGenerator)

gaugeStatHandler = new(GaugeStats)
initGaugeStats(METRICS_CONFIG_FILE, gaugeStatHandler)
rawMetrics := getRawMetrics()
rawMetrics := mas.fetchRawMetrics()

watcher := new(LatencyWatcher)
lObserver := &Observer{}
ch := make(chan prometheus.Metric, 1000)
latenciesInfoKeys := watcher.passTwoKeys(rawMetrics)
ch := make(chan prometheus.Metric, 10000)

fmt.Println(" processing latencies with InfoKeys: ", latenciesInfoKeys)

watcher.passTwoKeys(rawMetrics)
latenciesInfoKeys := watcher.passTwoKeys(rawMetrics)
err := watcher.refresh(lObserver, latenciesInfoKeys, rawMetrics, ch)

if err != nil {
Expand Down Expand Up @@ -201,7 +201,7 @@ func latencies_runTestCase(t *testing.T) {
// we have only 1 service in our mock-data, however loop thru service array
for _, keyValue := range arrServices {

lExpectedMetricNamedValues, lExpectedMetricLabels := createLatencysWatcherExpectedOutputs(keyValue)
lExpectedMetricNamedValues, lExpectedMetricLabels := latdg.createLatencysWatcherExpectedOutputs(mas, keyValue)

for key := range lOutputValues {
expectedValues := lExpectedMetricNamedValues[key]
Expand Down
Loading
Loading