Skip to content

Commit

Permalink
Merge branch 'MERGE_92_94' into OM94
Browse files Browse the repository at this point in the history
  • Loading branch information
mphanias authored Jul 25, 2023
2 parents dbe90a3 + 34e3397 commit 29ae1ed
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 96 deletions.
19 changes: 19 additions & 0 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/gobwas/glob"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

goversion "github.com/hashicorp/go-version"
)
Expand Down Expand Up @@ -431,3 +432,21 @@ func getMetricType(pContext ContextType, pRawMetricName string) metricType {

return mtCounter
}

// This is a common utility, used by all the watchers to push metric to prometheus
func pushToPrometheus(asMetric AerospikeStat, pv float64, labels []string, labelValues []string,
ch chan<- prometheus.Metric) {

if asMetric.isAllowed {
// handle any panic from prometheus, this may occur when prom encounters a config/stat with special characters
defer func() {
if r := recover(); r != nil {
log.Tracef("%s recovered from panic while handling stat %s", string(asMetric.context), asMetric.name)
}
}()

desc, valueType := asMetric.makePromMetric(labels...)
ch <- prometheus.MustNewConstMetric(desc, valueType, pv, labelValues...)

}
}
19 changes: 11 additions & 8 deletions observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,13 @@ func (o *Observer) refresh(ch chan<- prometheus.Metric) (map[string]string, erro
infoKeys = append(infoKeys, keys...)
}
}
// append infoKey "build" - this is removed from WatcherLatencies to avoid forced watcher sequence during refresh
infoKeys = append(infoKeys, "build")

// info request for first set of info keys
rawMetrics, err := o.requestInfo(retryCount, infoKeys)
// info request for first set of info keys, this retrives configs from server
// from namespaces,server/node-stats, xdr
// if for any context (like jobs, latencies etc.,) no configs, they are not sent to server
passOneOutput, err := o.requestInfo(retryCount, infoKeys)
if err != nil {
return nil, err
}
Expand All @@ -242,25 +246,24 @@ func (o *Observer) refresh(ch chan<- prometheus.Metric) (map[string]string, erro
infoKeys = []string{ikClusterName, ikService, ikBuild}
watcherInfoKeys := make([][]string, len(o.watchers))
for i, c := range o.watchers {
if keys := c.passTwoKeys(rawMetrics); len(keys) > 0 {
if keys := c.passTwoKeys(passOneOutput); len(keys) > 0 {
infoKeys = append(infoKeys, keys...)
watcherInfoKeys[i] = keys
}
}

// info request for second set of info keys
nRawMetrics, err := o.requestInfo(retryCount, infoKeys)
// info request for second set of info keys, this retrieves all the stats from server
rawMetrics, err := o.requestInfo(retryCount, infoKeys)
if err != nil {
return rawMetrics, err
return passOneOutput, err
}

rawMetrics = nRawMetrics

// sanitize the utf8 strings before sending them to watchers
for k, v := range rawMetrics {
rawMetrics[k] = sanitizeUTF8(v)
}

// sanitize the utf8 strings before sending them to watchers
for i, c := range o.watchers {
if err := c.refresh(o, watcherInfoKeys[i], rawMetrics, ch); err != nil {
return rawMetrics, err
Expand Down
3 changes: 2 additions & 1 deletion watcher_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ type LatencyWatcher struct {
func (lw *LatencyWatcher) describe(ch chan<- *prometheus.Desc) {}

func (lw *LatencyWatcher) passOneKeys() []string {
return []string{"build"}
// return []string{"build"}
return nil
}

func (lw *LatencyWatcher) passTwoKeys(rawMetrics map[string]string) (latencyCommands []string) {
Expand Down
63 changes: 45 additions & 18 deletions watcher_namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@ var regexToExtractArrayStats = map[string]string{
SINDEX_TYPE: "sindex\\-type\\.(?P<type>mount)\\[(?P<idx>\\d+)\\]\\.(?P<metric>.+)",
}

const (
KEY_NS_METADATA = "namespaces"
)

type NamespaceWatcher struct {
namespaceStats map[string]AerospikeStat
}

func (nw *NamespaceWatcher) describe(ch chan<- *prometheus.Desc) {}

func (nw *NamespaceWatcher) passOneKeys() []string {
return []string{"namespaces"}
// we are sending key "namespaces", server returns all the configs and stats in single call, unlike node-stats, xdr
return []string{KEY_NS_METADATA}
}

func (nw *NamespaceWatcher) passTwoKeys(rawMetrics map[string]string) []string {
s := rawMetrics["namespaces"]
s := rawMetrics[KEY_NS_METADATA]
list := strings.Split(s, ";")

log.Tracef("namespaces:%s", s)
Expand Down Expand Up @@ -67,23 +72,45 @@ func (nw *NamespaceWatcher) refresh(ott *Observer, infoKeys []string, rawMetrics
nw.handleArrayStats(nsName, stat, pv, stats, deviceType, rawMetrics, ch)
} else {
asMetric, exists := nw.namespaceStats[stat]
// to find regular metric or storage-engine metric, we split stat [using: seDynamicExtractor RegEx]
// after splitting, a storage-engine stat has 4 elements other stats have 3 elements
match := seDynamicExtractor.FindStringSubmatch(stat)

// holds the labels, values and stat holds the object by normal-stat/storage-engine-stat
var labels []string
var labelValues []string
var asMetric AerospikeStat

// process storage engine stat
constructedStatname := ""

if len(match) == 4 {
statType := match[1]
statIndex := match[2]
statName := match[3]

constructedStatname = STORAGE_ENGINE + statType + "_" + statName
deviceOrFileName := stats["storage-engine."+statType+"["+statIndex+"]"]

labels = []string{METRIC_LABEL_CLUSTER_NAME, METRIC_LABEL_SERVICE, METRIC_LABEL_NS, statType + "_index", statType}
labelValues = []string{rawMetrics[ikClusterName], rawMetrics[ikService], nsName, statIndex, deviceOrFileName}

} else { // regular stat (i.e. non-storage-engine related)
constructedStatname = stat
labels = []string{METRIC_LABEL_CLUSTER_NAME, METRIC_LABEL_SERVICE, METRIC_LABEL_NS}
labelValues = []string{rawMetrics[ikClusterName], rawMetrics[ikService], nsName}

}

asMetric, exists := nw.namespaceStats[constructedStatname]

if !exists {
asMetric = newAerospikeStat(CTX_NAMESPACE, constructedStatname)
nw.namespaceStats[constructedStatname] = asMetric
}

if !exists {
asMetric = newAerospikeStat(CTX_NAMESPACE, stat)
nw.namespaceStats[stat] = asMetric
}

defer func() {
// recover from panic if one occures. Set err to nil otherwise.
if recover() != nil {
log.Warnf("namespace-stats: recovered from panic while handling stat %s in %s", stat, nsName)
}
}()

if asMetric.isAllowed {
desc, valueType := asMetric.makePromMetric(METRIC_LABEL_CLUSTER_NAME, METRIC_LABEL_SERVICE, METRIC_LABEL_NS)
ch <- prometheus.MustNewConstMetric(desc, valueType, pv, rawMetrics[ikClusterName], rawMetrics[ikService], nsName)
}
if asMetric.isAllowed {
pushToPrometheus(asMetric, pv, labels, labelValues, ch)
}
}
}
Expand Down
49 changes: 33 additions & 16 deletions watcher_node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
log "github.com/sirupsen/logrus"
)

const (
KEY_SERVICE_CONFIG = "get-config:context=service"
KEY_SERVICE_STATISTICS = "statistics"
)

type StatsWatcher struct {
nodeMetrics map[string]AerospikeStat
}
Expand All @@ -17,21 +22,41 @@ func (sw *StatsWatcher) passOneKeys() []string {
}

func (sw *StatsWatcher) passTwoKeys(rawMetrics map[string]string) []string {
return []string{"statistics"}
// we need to fetch both configs and stat
return []string{KEY_SERVICE_CONFIG, KEY_SERVICE_STATISTICS}
}

// All (allowed/blocked) node stats. Based on the config.Aerospike.NodeMetricsAllowlist, config.Aerospike.NodeMetricsBlocklist.
// var nodeMetrics = make(map[string]AerospikeStat)

func (sw *StatsWatcher) refresh(o *Observer, infoKeys []string, rawMetrics map[string]string, ch chan<- prometheus.Metric) error {

log.Tracef("node-stats:%s", rawMetrics["statistics"])

if sw.nodeMetrics == nil {
sw.nodeMetrics = make(map[string]AerospikeStat)
}

stats := parseStats(rawMetrics["statistics"], ";")
nodeConfigs := rawMetrics[KEY_SERVICE_CONFIG]
nodeStats := rawMetrics[KEY_SERVICE_STATISTICS]
log.Tracef("node-configs:%s", nodeConfigs)
log.Tracef("node-stats:%s", nodeStats)

clusterName := rawMetrics[ikClusterName]
service := rawMetrics[ikService]

// we are sending configs and stats in same refresh call, as both are being sent to prom, instead of doing prom-push in 2 functions
// handle configs
sw.handleRefresh(o, nodeConfigs, clusterName, service, ch)

// handle stats
sw.handleRefresh(o, nodeStats, clusterName, service, ch)

return nil
}

func (sw *StatsWatcher) handleRefresh(o *Observer, nodeRawMetrics string, clusterName string, service string,
ch chan<- prometheus.Metric) {

stats := parseStats(nodeRawMetrics, ";")

for stat, value := range stats {
pv, err := tryConvert(value)
Expand All @@ -45,18 +70,10 @@ func (sw *StatsWatcher) refresh(o *Observer, infoKeys []string, rawMetrics map[s
sw.nodeMetrics[stat] = asMetric
}

// handle any panic from prometheus, this may occur when prom encounters a config/stat with special characters
defer func() {
if r := recover(); r != nil {
log.Tracef("node-stats: recovered from panic while handling stat %s in %s", stat, gService)
}
}()
labels := []string{METRIC_LABEL_CLUSTER_NAME, METRIC_LABEL_SERVICE}
labelsValues := []string{clusterName, service}

if asMetric.isAllowed {
desc, valueType := asMetric.makePromMetric(METRIC_LABEL_CLUSTER_NAME, METRIC_LABEL_SERVICE)
ch <- prometheus.MustNewConstMetric(desc, valueType, pv, rawMetrics[ikClusterName], rawMetrics[ikService])
}
}
pushToPrometheus(asMetric, pv, labels, labelsValues, ch)

return nil
}
}
28 changes: 17 additions & 11 deletions watcher_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func (sw *SetWatcher) refresh(o *Observer, infoKeys []string, rawMetrics map[str
}

for i := range setStats {
clusterName := rawMetrics[ikClusterName]
service := rawMetrics[ikService]

stats := parseStats(setStats[i], ":")
for stat, value := range stats {
Expand All @@ -48,17 +50,21 @@ func (sw *SetWatcher) refresh(o *Observer, infoKeys []string, rawMetrics map[str
sw.setMetrics[stat] = asMetric
}

// handle any panic from prometheus, this may occur when prom encounters a config/stat with special characters
defer func() {
if r := recover(); r != nil {
log.Tracef("set-stats: recovered from panic while handling stat %s in %s", stat, stats["set"])
}
}()

if asMetric.isAllowed {
desc, valueType := asMetric.makePromMetric(METRIC_LABEL_CLUSTER_NAME, METRIC_LABEL_SERVICE, METRIC_LABEL_NS, METRIC_LABEL_SET)
ch <- prometheus.MustNewConstMetric(desc, valueType, pv, rawMetrics[ikClusterName], rawMetrics[ikService], stats["ns"], stats["set"])
}
labels := []string{METRIC_LABEL_CLUSTER_NAME, METRIC_LABEL_SERVICE, METRIC_LABEL_NS, METRIC_LABEL_SET}
labelsValues := []string{clusterName, service, stats["ns"], stats["set"]}
pushToPrometheus(asMetric, pv, labels, labelsValues, ch)

// // handle any panic from prometheus, this may occur when prom encounters a config/stat with special characters
// defer func() {
// if r := recover(); r != nil {
// log.Tracef("set-stats: recovered from panic while handling stat %s in %s", stat, stats["set"])
// }
// }()

// if asMetric.isAllowed {
// desc, valueType := asMetric.makePromMetric(METRIC_LABEL_CLUSTER_NAME, METRIC_LABEL_SERVICE, METRIC_LABEL_NS, METRIC_LABEL_SET)
// ch <- prometheus.MustNewConstMetric(desc, valueType, pv, rawMetrics[ikClusterName], rawMetrics[ikService], stats["ns"], stats["set"])
// }

}

Expand Down
30 changes: 19 additions & 11 deletions watcher_sindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func (siw *SindexWatcher) refresh(o *Observer, infoKeys []string, rawMetrics map
sindexName := sindexInfoKeySplit[1]
log.Tracef("sindex-stats:%s:%s:%s", nsName, sindexName, rawMetrics[sindex])

clusterName := rawMetrics[ikClusterName]
service := rawMetrics[ikService]

stats := parseStats(rawMetrics[sindex], ";")
for stat, value := range stats {
pv, err := tryConvert(value)
Expand All @@ -77,17 +80,22 @@ func (siw *SindexWatcher) refresh(o *Observer, infoKeys []string, rawMetrics map
asMetric = newAerospikeStat(CTX_SINDEX, stat)
siw.sindexMetrics[stat] = asMetric
}
// handle any panic from prometheus, this may occur when prom encounters a config/stat with special characters
defer func() {
if r := recover(); r != nil {
log.Tracef("sindex-stats: recovered from panic while handling stat %s in %s", stat, sindexName)
}
}()

if asMetric.isAllowed {
desc, valueType := asMetric.makePromMetric(METRIC_LABEL_CLUSTER_NAME, METRIC_LABEL_SERVICE, METRIC_LABEL_NS, METRIC_LABEL_SINDEX)
ch <- prometheus.MustNewConstMetric(desc, valueType, pv, rawMetrics[ikClusterName], rawMetrics[ikService], nsName, sindexName)
}

labels := []string{METRIC_LABEL_CLUSTER_NAME, METRIC_LABEL_SERVICE, METRIC_LABEL_NS, METRIC_LABEL_SINDEX}
labelsValues := []string{clusterName, service, nsName, sindexName}
pushToPrometheus(asMetric, pv, labels, labelsValues, ch)

// // handle any panic from prometheus, this may occur when prom encounters a config/stat with special characters
// defer func() {
// if r := recover(); r != nil {
// log.Tracef("sindex-stats: recovered from panic while handling stat %s in %s", stat, sindexName)
// }
// }()

// if asMetric.isAllowed {
// desc, valueType := asMetric.makePromMetric(METRIC_LABEL_CLUSTER_NAME, METRIC_LABEL_SERVICE, METRIC_LABEL_NS, METRIC_LABEL_SINDEX)
// ch <- prometheus.MustNewConstMetric(desc, valueType, pv, rawMetrics[ikClusterName], rawMetrics[ikService], nsName, sindexName)
// }

}

Expand Down
Loading

0 comments on commit 29ae1ed

Please sign in to comment.