Skip to content

Commit

Permalink
Merge #131177
Browse files Browse the repository at this point in the history
131177: cli: improve tsdump datadog upload r=aa-joshi a=aa-joshi

Previously, tsdump datadog upload was time consuming. This was due to synchronous upload requests to datadog. Along with this, we were decoding raw data to TimeSeriesData. Then again converting it to DatadogSeries. This change addresse both challenges. We are now decoding timeseries data to DatadogSeries. We have decoupled deocding & uploading data so that we will perform both operations concurrently.

Epic: CC-27740
Release note: None

Co-authored-by: Akshay Joshi <[email protected]>
  • Loading branch information
craig[bot] and aa-joshi committed Sep 25, 2024
2 parents cbc681e + f2b50c6 commit a6c075a
Show file tree
Hide file tree
Showing 5 changed files with 399 additions and 268 deletions.
1 change: 1 addition & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_library(
"statement_diag.go",
"testutils.go",
"tsdump.go",
"tsdump_upload.go",
"userfile.go",
"zip.go",
"zip_cluster_wide.go",
Expand Down
5 changes: 4 additions & 1 deletion pkg/cli/testdata/tsdump/json
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ cr.node.admission.admitted.elastic.cpu 2 1.000000 1711130560
----
POST: https://example.com/data
DD-API-KEY: api-key
Body: {"series":[{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":17111304,"value":0},{"timestamp":17111304,"value":1},{"timestamp":17111304,"value":1},{"timestamp":17111305,"value":1}],"resources":null,"tags":["cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234","node_id:1"]},{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1}],"resources":null,"tags":["cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234","node_id:2"]}]}
Body: {"series":[{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":1711130470,"value":0},{"timestamp":1711130480,"value":1},{"timestamp":1711130490,"value":1},{"timestamp":1711130500,"value":1}],"resources":null,"tags":["node_id:1","cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234"]}]}
POST: https://example.com/data
DD-API-KEY: api-key
Body: {"series":[{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":1711130510,"value":1},{"timestamp":1711130520,"value":1},{"timestamp":1711130530,"value":1},{"timestamp":1711130540,"value":1},{"timestamp":1711130550,"value":1},{"timestamp":1711130560,"value":1}],"resources":null,"tags":["node_id:2","cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234"]}]}
273 changes: 9 additions & 264 deletions pkg/cli/tsdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package cli
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/csv"
"encoding/gob"
Expand All @@ -28,17 +27,13 @@ import (
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cli/clierrorplus"
"github.com/cockroachdb/cockroach/pkg/cli/clisqlclient"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/ts/tsutil"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
Expand All @@ -63,21 +58,6 @@ var debugTimeSeriesDumpOpts = struct {
yaml: "/tmp/tsdump.yaml",
}

var (
// each site in datadog has a different host name. ddSiteToHostMap
// holds the mapping of site name to the host name.
ddSiteToHostMap = map[string]string{
"us1": "api.datadoghq.com",
"us3": "api.us3.datadoghq.com",
"us5": "api.us5.datadoghq.com",
"eu1": "api.datadoghq.eu",
"ap1": "api.ap1.datadoghq.com",
"us1-fed": "api.ddog-gov.com",
}

targetURLFormat = "https://%s/api/v2/series"
)

var debugTimeSeriesDumpCmd = &cobra.Command{
Use: "tsdump",
Short: "dump all the raw timeseries values in a cluster",
Expand Down Expand Up @@ -133,23 +113,24 @@ will then convert it to the --format requested in the current invocation.
doRequest,
)
case tsDumpDatadog:
w = makeDatadogWriter(
ctx,
var datadogWriter = makeDatadogWriter(
targetURL,
false, /* init */
false,
debugTimeSeriesDumpOpts.ddApiKey,
100, /* threshold */
100,
doDDRequest,
)
return datadogWriter.upload(args[0])

case tsDumpDatadogInit:
w = makeDatadogWriter(
ctx,
var datadogWriter = makeDatadogWriter(
targetURL,
true, /* init */
true,
debugTimeSeriesDumpOpts.ddApiKey,
100, /* threshold */
100,
doDDRequest,
)
return datadogWriter.upload(args[0])
case tsDumpOpenMetrics:
if debugTimeSeriesDumpOpts.targetURL != "" {
write := beginHttpRequestWithWritePipe(debugTimeSeriesDumpOpts.targetURL)
Expand Down Expand Up @@ -296,30 +277,6 @@ func doRequest(req *http.Request) error {
return nil
}

func doDDRequest(req *http.Request) error {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
ddResp := DatadogResp{}
err = json.Unmarshal(respBytes, &ddResp)
if err != nil {
return err
}
if len(ddResp.Errors) > 0 {
return errors.Newf("tsdump: error response from datadog: %v", ddResp.Errors)
}
if resp.StatusCode > 299 {
return errors.Newf("tsdump: bad response status code: %+v", resp)
}
return nil
}

// beginHttpRequestWithWritePipe initiates an HTTP request to the
// `targetURL` argument and returns an `io.Writer` that pipes to the
// request body. This function will return while the request runs
Expand Down Expand Up @@ -349,218 +306,6 @@ type tsWriter interface {
Flush() error
}

// datadogWriter can convert our metrics to Datadog format and send
// them via HTTP to the public DD endpoint, assuming an API key is set
// in the CLI flags.
type datadogWriter struct {
sync.Once
targetURL string
buffer bytes.Buffer
series []DatadogSeries
uploadID string
init bool
apiKey string
// namePrefix sets the string to prepend to all metric names. The
// names are kept with `.` delimiters.
namePrefix string
doRequest func(req *http.Request) error
threshold int
}

func makeDatadogWriter(
ctx context.Context,
targetURL string,
init bool,
apiKey string,
threshold int,
doRequest func(req *http.Request) error,
) *datadogWriter {
return &datadogWriter{
targetURL: targetURL,
buffer: bytes.Buffer{},
uploadID: newTsdumpUploadID(),
init: init,
apiKey: apiKey,
namePrefix: "crdb.tsdump.", // Default pre-set prefix to distinguish these uploads.
doRequest: doRequest,
threshold: threshold,
}
}

var newTsdumpUploadID = func() string {
clusterTagValue := ""
if debugTimeSeriesDumpOpts.clusterLabel != "" {
clusterTagValue = debugTimeSeriesDumpOpts.clusterLabel
} else if serverCfg.ClusterName != "" {
clusterTagValue = serverCfg.ClusterName
} else {
clusterTagValue = fmt.Sprintf("cluster-debug-%d", timeutil.Now().Unix())
}
return newUploadID(clusterTagValue)
}

// DatadogPoint is a single metric point in Datadog format
type DatadogPoint struct {
// Timestamp must be in seconds since Unix epoch.
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
}

// DatadogSeries contains a JSON encoding of a single series object
// that can be send to Datadog.
type DatadogSeries struct {
Metric string `json:"metric"`
Type int `json:"type"`
Points []DatadogPoint `json:"points"`
Resources []struct {
Name string `json:"name"`
Type string `json:"type"`
} `json:"resources"`
// In order to encode arbitrary key-value pairs, use a `:` delimited
// tag string like `cluster:dedicated`.
Tags []string `json:"tags"`
}

// DatadogSubmitMetrics is the top level JSON object that must be sent to Datadog.
// See: https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type DatadogSubmitMetrics struct {
Series []DatadogSeries `json:"series"`
}

const (
DatadogSeriesTypeUnknown = iota
DatadogSeriesTypeCounter
DatadogSeriesTypeRate
DatadogSeriesTypeGauge
)

func (d *datadogWriter) Emit(data *tspb.TimeSeriesData) error {
series := &DatadogSeries{
// TODO(davidh): This is not correct. We should inspect metric metadata and set appropriately.
// The impact of not doing this is that the metric will be treated as a gauge by default.
Type: DatadogSeriesTypeUnknown,
Points: make([]DatadogPoint, len(data.Datapoints)),
}

name := data.Name

var tags []string
// Hardcoded values
tags = append(tags, "cluster_type:SELF_HOSTED")
tags = append(tags, "job:cockroachdb")
tags = append(tags, "region:local")

if debugTimeSeriesDumpOpts.clusterLabel != "" {
tags = append(tags, makeDDTag("cluster_label", debugTimeSeriesDumpOpts.clusterLabel))
}

tags = append(tags, makeDDTag(uploadIDTag, d.uploadID))

d.Do(func() {
fmt.Println("Upload ID:", d.uploadID)
})

sl := reCrStoreNode.FindStringSubmatch(data.Name)
if len(sl) != 0 {
storeNodeKey := sl[1]
if storeNodeKey == "node" {
storeNodeKey += "_id"
}
tags = append(tags, fmt.Sprintf("%s:%s", storeNodeKey, data.Source))
name = sl[2]
} else {
tags = append(tags, "node_id:0")
}

series.Tags = tags

series.Metric = d.namePrefix + name

// When running in init mode, we insert zeros with the current
// timestamp in order to populate Datadog's metrics list. Then the
// user can enable these metrics for historic ingest and load the
// full dataset. This should only be necessary once globally.
if d.init {
series.Points = []DatadogPoint{{
Value: 0,
Timestamp: timeutil.Now().Unix(),
}}
} else {
for i, ts := range data.Datapoints {
series.Points[i].Value = ts.Value
series.Points[i].Timestamp = ts.TimestampNanos / 1_000_000_000
}
}

// We append every series directly to the list. This isn't ideal
// because every series batch that `Emit` is called with will contain
// around 360 points and the same metric will repeat many many times.
// This causes us to repeat the metadata collection here. Ideally, we
// can find the series object for this metric if it already exists
// and insert the points there.
d.series = append(d.series, *series)

// The limit of `100` is an experimentally set heuristic. It can
// probably be increased. This results in a payload that's generally
// below 2MB. DD's limit is 5MB.
if len(d.series) > d.threshold {
fmt.Printf(
"tsdump datadog upload: sending payload containing %d series including %s\n",
len(d.series),
d.series[0].Metric,
)
return d.Flush()
}
return nil
}

func (d *datadogWriter) Flush() error {
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(&DatadogSubmitMetrics{Series: d.series})
if err != nil {
return err
}
var zipBuf bytes.Buffer
g := gzip.NewWriter(&zipBuf)
_, err = io.Copy(g, &buf)
if err != nil {
return err
}
err = g.Close()
if err != nil {
return err
}

retryOpts := base.DefaultRetryOptions()
retryOpts.MaxRetries = 3
for retry := retry.Start(retryOpts); retry.Next(); {
req, err := http.NewRequest("POST", d.targetURL, &zipBuf)
if err != nil {
return err
}
req.Header.Set("DD-API-KEY", d.apiKey)
req.Header.Set(server.ContentTypeHeader, "application/json")
req.Header.Set(httputil.ContentEncodingHeader, "gzip")

err = d.doRequest(req)
if err != nil {
fmt.Printf("retry attempt:%d tsdump: error while sending metrics to datadog: %v\n", retry.CurrentAttempt(), err)
continue
} else {
break
}
}

d.series = nil
return nil
}

type DatadogResp struct {
Errors []string `json:"errors"`
}

var _ tsWriter = &datadogWriter{}

type jsonWriter struct {
sync.Once
targetURL string
Expand Down
Loading

0 comments on commit a6c075a

Please sign in to comment.