Skip to content

Commit

Permalink
feat: Add the RDS performance and generic metrics option
Browse files Browse the repository at this point in the history
  • Loading branch information
ZPascal committed Aug 15, 2024
1 parent 5903ed6 commit 6f80f8e
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 61 deletions.
12 changes: 12 additions & 0 deletions plugins/inputs/aliyuncms/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# public_key_id = ""
# role_name = ""

## Aliyun Metric services
## Specified which metric services to capture from Aliyun, choose from:
## * cms - Cloud Monitor service (default settings)
## * rds - Relational Database service
# metric_services = ["cms"]

## Specify ali cloud regions to be queried for metric and object discovery
## If not set, all supported regions (see below) would be covered, it can
## provide a significant load on API, so the recommendation here is to
Expand Down Expand Up @@ -133,6 +139,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Or you can specify several dimensions at once:
# dimensions = '[{"instanceId": "p-example"},{"instanceId": "q-example"}]'

## Specified which metric service to capture from Aliyun
## * cms - Cloud Monitor service (default settings)
## * rds - Relational Database service
# service = 'rds'

## Tag Query Path
## The following tags added by default:
## * regionId (if discovery enabled)
Expand Down Expand Up @@ -167,6 +178,7 @@ Plugin Configuration utilizes [preset metric items references][2]
- `project` must be a preset project value
- `names` must be preset metric names
- `dimensions` must be preset dimension values
- `service` must be a valid Alicloud service value

[2]: https://www.alibabacloud.com/help/doc-detail/28619.htm?spm=a2c63.p38356.a3.2.389f233d0kPJn0

Expand Down
188 changes: 142 additions & 46 deletions plugins/inputs/aliyuncms/aliyuncms.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/jmespath/go-jmespath"
"reflect"
"slices"
"strconv"
"strings"
"sync"
Expand All @@ -14,8 +17,7 @@ import (
"github.com/aliyun/alibaba-cloud-sdk-go/sdk"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials/providers"
"github.com/aliyun/alibaba-cloud-sdk-go/services/cms"
"github.com/jmespath/go-jmespath"

"github.com/aliyun/alibaba-cloud-sdk-go/services/rds"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
Expand All @@ -27,8 +29,8 @@ import (
var sampleConfig string

type (
// AliyunCMS is aliyun cms config info.
AliyunCMS struct {
// AliyunMetrics is aliyun cms config info.
AliyunMetrics struct {
AccessKeyID string `toml:"access_key_id"`
AccessKeySecret string `toml:"access_key_secret"`
AccessKeyStsToken string `toml:"access_key_sts_token"`
Expand All @@ -38,6 +40,7 @@ type (
PublicKeyID string `toml:"public_key_id"`
RoleName string `toml:"role_name"`

MetricServices []string `toml:"metric_services"`
Regions []string `toml:"regions"`
DiscoveryInterval config.Duration `toml:"discovery_interval"`
Period config.Duration `toml:"period"`
Expand All @@ -48,7 +51,9 @@ type (

Log telegraf.Logger `toml:"-"`

client aliyuncmsClient
cmsClient aliyuncmsClient
rdsClient aliyunrdsClient

windowStart time.Time
windowEnd time.Time
dt *discoveryTool
Expand All @@ -61,14 +66,15 @@ type (
Metric struct {
ObjectsFilter string `toml:"objects_filter"`
MetricNames []string `toml:"names"`
Service string `toml:"service"`
Dimensions string `toml:"dimensions"` //String representation of JSON dimensions
TagsQueryPath []string `toml:"tag_query_path"`
AllowDataPointWODiscoveryData bool `toml:"allow_dps_without_discovery"` //Allow data points without discovery data (if no discovery data found)

dtLock sync.Mutex //Guard for discoveryTags & dimensions
discoveryTags map[string]map[string]string //Internal data structure that can enrich metrics with tags
dimensionsUdObj map[string]string
dimensionsUdArr []map[string]string //Parsed Dimesnsions JSON string (unmarshalled)
dimensionsUdArr []map[string]string //Parsed Dimensions JSON string (unmarshalled)
requestDimensions []map[string]string //this is the actual dimensions list that would be used in API request
requestDimensionsStr string //String representation of the above

Expand All @@ -82,6 +88,10 @@ type (
aliyuncmsClient interface {
DescribeMetricList(request *cms.DescribeMetricListRequest) (response *cms.DescribeMetricListResponse, err error)
}

aliyunrdsClient interface {
DescribeDBInstancePerformance(request *rds.DescribeDBInstancePerformanceRequest) (response *rds.DescribeDBInstancePerformanceResponse, err error)
}
)

// https://www.alibabacloud.com/help/doc-detail/40654.htm?gclid=Cj0KCQjw4dr0BRCxARIsAKUNjWTAMfyVUn_Y3OevFBV3CMaazrhq0URHsgE7c0m0SeMQRKlhlsJGgIEaAviyEALw_wcB
Expand Down Expand Up @@ -109,12 +119,12 @@ var aliyunRegionList = []string{
"me-east-1",
}

func (*AliyunCMS) SampleConfig() string {
func (*AliyunMetrics) SampleConfig() string {
return sampleConfig
}

// Init perform checks of plugin inputs and initialize internals
func (s *AliyunCMS) Init() error {
func (s *AliyunMetrics) Init() error {
if s.Project == "" {
return errors.New("project is not set")
}
Expand Down Expand Up @@ -144,11 +154,18 @@ func (s *AliyunCMS) Init() error {
if err != nil {
return fmt.Errorf("failed to retrieve credential: %w", err)
}
s.client, err = cms.NewClientWithOptions("", sdk.NewConfig(), credential)
s.cmsClient, err = cms.NewClientWithOptions("", sdk.NewConfig(), credential)
if err != nil {
return fmt.Errorf("failed to create cms client: %w", err)
}

if s.Project == "acs_rds_dashboard" && slices.Contains(s.MetricServices, "rds") {
s.rdsClient, err = rds.NewClientWithOptions("", sdk.NewConfig(), credential)
if err != nil {
return fmt.Errorf("failed to create rds client: %w", err)
}
}

//check metrics dimensions consistency
for i := range s.Metrics {
metric := s.Metrics[i]
Expand Down Expand Up @@ -179,6 +196,12 @@ func (s *AliyunCMS) Init() error {
len(s.Regions), strings.Join(s.Regions, ","))
}

//Check metric services
if len(s.MetricServices) == 0 {
s.MetricServices = []string{"cms"}
s.Log.Info("'metric_services' is not set. Metrics will be queried from the cms service")
}

//Init discovery...
if s.dt == nil { //Support for tests
s.dt, err = newDiscoveryTool(s.Regions, s.Project, s.Log, credential, int(float32(s.RateLimit)*0.2), time.Duration(s.DiscoveryInterval))
Expand All @@ -202,12 +225,11 @@ func (s *AliyunCMS) Init() error {
if s.Project == "acs_oss" {
s.dimensionKey = "BucketName"
}

return nil
}

// Start plugin discovery loop, metrics are gathered through Gather
func (s *AliyunCMS) Start(telegraf.Accumulator) error {
func (s *AliyunMetrics) Start(telegraf.Accumulator) error {
//Start periodic discovery process
if s.dt != nil {
s.dt.start()
Expand All @@ -217,7 +239,7 @@ func (s *AliyunCMS) Start(telegraf.Accumulator) error {
}

// Gather implements telegraf.Inputs interface
func (s *AliyunCMS) Gather(acc telegraf.Accumulator) error {
func (s *AliyunMetrics) Gather(acc telegraf.Accumulator) error {
s.updateWindow(time.Now())

// limit concurrency or we can easily exhaust user connection limit
Expand All @@ -243,13 +265,13 @@ func (s *AliyunCMS) Gather(acc telegraf.Accumulator) error {
}

// Stop - stops the plugin discovery loop
func (s *AliyunCMS) Stop() {
func (s *AliyunMetrics) Stop() {
if s.dt != nil {
s.dt.stop()
}
}

func (s *AliyunCMS) updateWindow(relativeTo time.Time) {
func (s *AliyunMetrics) updateWindow(relativeTo time.Time) {
//https://help.aliyun.com/document_detail/51936.html?spm=a2c4g.11186623.6.701.54025679zh6wiR
//The start and end times are executed in the mode of
//opening left and closing right, and startTime cannot be equal
Expand All @@ -269,36 +291,107 @@ func (s *AliyunCMS) updateWindow(relativeTo time.Time) {
}

// Gather given metric and emit error
func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, metric *Metric) error {
func (s *AliyunMetrics) gatherMetric(acc telegraf.Accumulator, metricName string, metric *Metric) error {
for _, region := range s.Regions {
req := cms.CreateDescribeMetricListRequest()
req.Period = strconv.FormatInt(int64(time.Duration(s.Period).Seconds()), 10)
req.MetricName = metricName
req.Length = "10000"
req.Namespace = s.Project
req.EndTime = strconv.FormatInt(s.windowEnd.Unix()*1000, 10)
req.StartTime = strconv.FormatInt(s.windowStart.Unix()*1000, 10)
req.Dimensions = metric.requestDimensionsStr
req.RegionId = region

for more := true; more; {
resp, err := s.client.DescribeMetricList(req)
if err != nil {
return fmt.Errorf("failed to query metricName list: %w", err)
}
if resp.Code != "200" {
s.Log.Errorf("failed to query metricName list: %v", resp.Message)
break
}

var datapoints []map[string]interface{}
if err := json.Unmarshal([]byte(resp.Datapoints), &datapoints); err != nil {
return fmt.Errorf("failed to decode response datapoints: %w", err)
}
var respCms cms.DescribeMetricListResponse
reqCms := cms.CreateDescribeMetricListRequest()

if s.rdsClient != nil && metric.Service == "rds" {
for _, instanceID := range metric.requestDimensions {
req := rds.CreateDescribeDBInstancePerformanceRequest()
req.DBInstanceId = instanceID["instanceId"]
req.Key = metricName
startTime := s.windowStart.UTC()
req.StartTime = fmt.Sprintf("%d-%02d-%02dT%02d:%02dZ", startTime.Year(), startTime.Month(),
startTime.Day(), startTime.Hour(), startTime.Minute())
endTime := s.windowEnd.UTC()
req.EndTime = fmt.Sprintf("%d-%02d-%02dT%02d:%02dZ", endTime.Year(), endTime.Month(),
endTime.Day(), endTime.Hour(), endTime.Minute())
req.RegionId = region

resp, err := s.rdsClient.DescribeDBInstancePerformance(req)

if err != nil {
return fmt.Errorf("failed to get the database instance performance metrics: %w", err)
}
if resp.GetHttpStatus() != 200 {
s.Log.Errorf("failed to get the database instance performance metrics: %v", resp.BaseResponse.GetHttpContentString())
break
}

for _, performanceKey := range resp.PerformanceKeys.PerformanceKey {
for _, performanceValue := range performanceKey.Values.PerformanceValue {
parsedTime, err := time.Parse(time.RFC3339, performanceValue.Date)
if err != nil {
return fmt.Errorf("failed to parse response performance time datapoints: %w", err)
}

if strings.Contains(performanceValue.Value, "&") {
performanceKeys := strings.Split(performanceKey.ValueFormat, "&")
performanceValues := strings.Split(performanceValue.Value, "&")

for i, value := range performanceValues {
valueAsFloat, err := strconv.ParseFloat(value, 32)
if err != nil {
return fmt.Errorf("failed to convert the performance value string to an float: %w", err)
}
datapoints = append(datapoints,
map[string]interface{}{
"instanceId": instanceID["instanceId"],
performanceKeys[i]: valueAsFloat,
"timestamp": parsedTime.Unix(),
})
}
} else {
valueAsFloat, err := strconv.ParseFloat(performanceValue.Value, 32)
if err != nil {
return fmt.Errorf("failed to convert the performance value string to an float: %w", err)
}
datapoints = append(datapoints,
map[string]interface{}{
"instanceId": instanceID["instanceId"],
performanceKey.ValueFormat: valueAsFloat,
"timestamp": parsedTime.Unix(),
})
}
}
}

if len(datapoints) == 0 {
s.Log.Debugf("No rds performance metrics returned from RDS, response msg: %s", resp.GetHttpContentString())
break
}
}
} else {
reqCms.Period = strconv.FormatInt(int64(time.Duration(s.Period).Seconds()), 10)
reqCms.MetricName = metricName
reqCms.Length = "10000"
reqCms.Namespace = s.Project
reqCms.EndTime = strconv.FormatInt(s.windowEnd.Unix()*1000, 10)
reqCms.StartTime = strconv.FormatInt(s.windowStart.Unix()*1000, 10)
reqCms.Dimensions = metric.requestDimensionsStr
reqCms.RegionId = region

respCms, err := s.cmsClient.DescribeMetricList(reqCms)

if len(datapoints) == 0 {
s.Log.Debugf("No metrics returned from CMS, response msg: %s", resp.Message)
break
if err != nil {
return fmt.Errorf("failed to query metricName list: %w", err)
}
if respCms.Code != "200" {
s.Log.Errorf("failed to query metricName list: %v", respCms.Message)
break
}

if err := json.Unmarshal([]byte(respCms.Datapoints), &datapoints); err != nil {
return fmt.Errorf("failed to decode response datapoints: %w", err)
}

if len(datapoints) == 0 {
s.Log.Debugf("No metrics returned from CMS, response msg: %s", respCms.Message)
break
}
}

NextDataPoint:
Expand Down Expand Up @@ -326,16 +419,19 @@ func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, me
case "userId":
tags[key] = value.(string)
case "timestamp":
datapointTime = int64(value.(float64)) / 1000
if reflect.TypeOf(value).String() == "int64" {
datapointTime = value.(int64)
} else {
datapointTime = int64(value.(float64)) / 1000
}
default:
fields[formatField(metricName, key)] = value
}
}
acc.AddFields(s.measurement, fields, tags, time.Unix(datapointTime, 0))
}

req.NextToken = resp.NextToken
more = req.NextToken != ""
reqCms.NextToken = respCms.NextToken
more = reqCms.NextToken != ""
}
}
return nil
Expand Down Expand Up @@ -372,7 +468,7 @@ func parseTag(tagSpec string, data interface{}) (tagKey string, tagValue string,
return tagKey, tagValue, nil
}

func (s *AliyunCMS) prepareTagsAndDimensions(metric *Metric) {
func (s *AliyunMetrics) prepareTagsAndDimensions(metric *Metric) {
var (
newData bool
defaultTags = []string{"RegionId:RegionId"}
Expand Down Expand Up @@ -495,7 +591,7 @@ func snakeCase(s string) string {

func init() {
inputs.Add("aliyuncms", func() telegraf.Input {
return &AliyunCMS{
return &AliyunMetrics{
RateLimit: 200,
DiscoveryInterval: config.Duration(time.Minute),
dimensionKey: "instanceId",
Expand Down
Loading

0 comments on commit 6f80f8e

Please sign in to comment.