From f2b50c6074d911dd2934cc7afad84e13d1f4cd0a Mon Sep 17 00:00:00 2001 From: Akshay Joshi Date: Mon, 23 Sep 2024 12:15:34 +0530 Subject: [PATCH] cli: improve tsdump datadog upload Previously, tsdump datadog upload was time consuming. This was due to synchronous upload requests to datadog. Along with this, we were decoding raw data to TimeSeriesData. Then again converting it to DatadogSeries. This change addresse both challenges. We are now decoding timeseries data to DatadogSeries. We have decoupled deocding & uploading data so that we will perform both operations concurrently. Epic: CC-27740 Release note: None --- pkg/cli/BUILD.bazel | 1 + pkg/cli/testdata/tsdump/json | 5 +- pkg/cli/tsdump.go | 273 +--------------------------- pkg/cli/tsdump_test.go | 47 ++++- pkg/cli/tsdump_upload.go | 341 +++++++++++++++++++++++++++++++++++ 5 files changed, 399 insertions(+), 268 deletions(-) create mode 100644 pkg/cli/tsdump_upload.go diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index 18000dadd93a..45d44298c1dc 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -57,6 +57,7 @@ go_library( "statement_diag.go", "testutils.go", "tsdump.go", + "tsdump_upload.go", "userfile.go", "zip.go", "zip_cluster_wide.go", diff --git a/pkg/cli/testdata/tsdump/json b/pkg/cli/testdata/tsdump/json index e7a6ee14264d..0b387b2ffd18 100644 --- a/pkg/cli/testdata/tsdump/json +++ b/pkg/cli/testdata/tsdump/json @@ -31,4 +31,7 @@ cr.node.admission.admitted.elastic.cpu 2 1.000000 1711130560 ---- POST: https://example.com/data DD-API-KEY: api-key -Body: {"series":[{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":17111304,"value":0},{"timestamp":17111304,"value":1},{"timestamp":17111304,"value":1},{"timestamp":17111305,"value":1}],"resources":null,"tags":["cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234","node_id:1"]},{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1}],"resources":null,"tags":["cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234","node_id:2"]}]} +Body: {"series":[{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":1711130470,"value":0},{"timestamp":1711130480,"value":1},{"timestamp":1711130490,"value":1},{"timestamp":1711130500,"value":1}],"resources":null,"tags":["node_id:1","cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234"]}]} +POST: https://example.com/data +DD-API-KEY: api-key +Body: {"series":[{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":1711130510,"value":1},{"timestamp":1711130520,"value":1},{"timestamp":1711130530,"value":1},{"timestamp":1711130540,"value":1},{"timestamp":1711130550,"value":1},{"timestamp":1711130560,"value":1}],"resources":null,"tags":["node_id:2","cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234"]}]} diff --git a/pkg/cli/tsdump.go b/pkg/cli/tsdump.go index ec393b02f27a..4ce3deb54b09 100644 --- a/pkg/cli/tsdump.go +++ b/pkg/cli/tsdump.go @@ -13,7 +13,6 @@ package cli import ( "bufio" "bytes" - "compress/gzip" "context" "encoding/csv" "encoding/gob" @@ -28,17 +27,13 @@ import ( "sync" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cli/clierrorplus" "github.com/cockroachdb/cockroach/pkg/cli/clisqlclient" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/ts" "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/ts/tsutil" - "github.com/cockroachdb/cockroach/pkg/util/httputil" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/spf13/cobra" @@ -63,21 +58,6 @@ var debugTimeSeriesDumpOpts = struct { yaml: "/tmp/tsdump.yaml", } -var ( - // each site in datadog has a different host name. ddSiteToHostMap - // holds the mapping of site name to the host name. - ddSiteToHostMap = map[string]string{ - "us1": "api.datadoghq.com", - "us3": "api.us3.datadoghq.com", - "us5": "api.us5.datadoghq.com", - "eu1": "api.datadoghq.eu", - "ap1": "api.ap1.datadoghq.com", - "us1-fed": "api.ddog-gov.com", - } - - targetURLFormat = "https://%s/api/v2/series" -) - var debugTimeSeriesDumpCmd = &cobra.Command{ Use: "tsdump", Short: "dump all the raw timeseries values in a cluster", @@ -133,23 +113,24 @@ will then convert it to the --format requested in the current invocation. doRequest, ) case tsDumpDatadog: - w = makeDatadogWriter( - ctx, + var datadogWriter = makeDatadogWriter( targetURL, - false, /* init */ + false, debugTimeSeriesDumpOpts.ddApiKey, - 100, /* threshold */ + 100, doDDRequest, ) + return datadogWriter.upload(args[0]) + case tsDumpDatadogInit: - w = makeDatadogWriter( - ctx, + var datadogWriter = makeDatadogWriter( targetURL, - true, /* init */ + true, debugTimeSeriesDumpOpts.ddApiKey, - 100, /* threshold */ + 100, doDDRequest, ) + return datadogWriter.upload(args[0]) case tsDumpOpenMetrics: if debugTimeSeriesDumpOpts.targetURL != "" { write := beginHttpRequestWithWritePipe(debugTimeSeriesDumpOpts.targetURL) @@ -296,30 +277,6 @@ func doRequest(req *http.Request) error { return nil } -func doDDRequest(req *http.Request) error { - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - respBytes, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - ddResp := DatadogResp{} - err = json.Unmarshal(respBytes, &ddResp) - if err != nil { - return err - } - if len(ddResp.Errors) > 0 { - return errors.Newf("tsdump: error response from datadog: %v", ddResp.Errors) - } - if resp.StatusCode > 299 { - return errors.Newf("tsdump: bad response status code: %+v", resp) - } - return nil -} - // beginHttpRequestWithWritePipe initiates an HTTP request to the // `targetURL` argument and returns an `io.Writer` that pipes to the // request body. This function will return while the request runs @@ -349,218 +306,6 @@ type tsWriter interface { Flush() error } -// datadogWriter can convert our metrics to Datadog format and send -// them via HTTP to the public DD endpoint, assuming an API key is set -// in the CLI flags. -type datadogWriter struct { - sync.Once - targetURL string - buffer bytes.Buffer - series []DatadogSeries - uploadID string - init bool - apiKey string - // namePrefix sets the string to prepend to all metric names. The - // names are kept with `.` delimiters. - namePrefix string - doRequest func(req *http.Request) error - threshold int -} - -func makeDatadogWriter( - ctx context.Context, - targetURL string, - init bool, - apiKey string, - threshold int, - doRequest func(req *http.Request) error, -) *datadogWriter { - return &datadogWriter{ - targetURL: targetURL, - buffer: bytes.Buffer{}, - uploadID: newTsdumpUploadID(), - init: init, - apiKey: apiKey, - namePrefix: "crdb.tsdump.", // Default pre-set prefix to distinguish these uploads. - doRequest: doRequest, - threshold: threshold, - } -} - -var newTsdumpUploadID = func() string { - clusterTagValue := "" - if debugTimeSeriesDumpOpts.clusterLabel != "" { - clusterTagValue = debugTimeSeriesDumpOpts.clusterLabel - } else if serverCfg.ClusterName != "" { - clusterTagValue = serverCfg.ClusterName - } else { - clusterTagValue = fmt.Sprintf("cluster-debug-%d", timeutil.Now().Unix()) - } - return newUploadID(clusterTagValue) -} - -// DatadogPoint is a single metric point in Datadog format -type DatadogPoint struct { - // Timestamp must be in seconds since Unix epoch. - Timestamp int64 `json:"timestamp"` - Value float64 `json:"value"` -} - -// DatadogSeries contains a JSON encoding of a single series object -// that can be send to Datadog. -type DatadogSeries struct { - Metric string `json:"metric"` - Type int `json:"type"` - Points []DatadogPoint `json:"points"` - Resources []struct { - Name string `json:"name"` - Type string `json:"type"` - } `json:"resources"` - // In order to encode arbitrary key-value pairs, use a `:` delimited - // tag string like `cluster:dedicated`. - Tags []string `json:"tags"` -} - -// DatadogSubmitMetrics is the top level JSON object that must be sent to Datadog. -// See: https://docs.datadoghq.com/api/latest/metrics/#submit-metrics -type DatadogSubmitMetrics struct { - Series []DatadogSeries `json:"series"` -} - -const ( - DatadogSeriesTypeUnknown = iota - DatadogSeriesTypeCounter - DatadogSeriesTypeRate - DatadogSeriesTypeGauge -) - -func (d *datadogWriter) Emit(data *tspb.TimeSeriesData) error { - series := &DatadogSeries{ - // TODO(davidh): This is not correct. We should inspect metric metadata and set appropriately. - // The impact of not doing this is that the metric will be treated as a gauge by default. - Type: DatadogSeriesTypeUnknown, - Points: make([]DatadogPoint, len(data.Datapoints)), - } - - name := data.Name - - var tags []string - // Hardcoded values - tags = append(tags, "cluster_type:SELF_HOSTED") - tags = append(tags, "job:cockroachdb") - tags = append(tags, "region:local") - - if debugTimeSeriesDumpOpts.clusterLabel != "" { - tags = append(tags, makeDDTag("cluster_label", debugTimeSeriesDumpOpts.clusterLabel)) - } - - tags = append(tags, makeDDTag(uploadIDTag, d.uploadID)) - - d.Do(func() { - fmt.Println("Upload ID:", d.uploadID) - }) - - sl := reCrStoreNode.FindStringSubmatch(data.Name) - if len(sl) != 0 { - storeNodeKey := sl[1] - if storeNodeKey == "node" { - storeNodeKey += "_id" - } - tags = append(tags, fmt.Sprintf("%s:%s", storeNodeKey, data.Source)) - name = sl[2] - } else { - tags = append(tags, "node_id:0") - } - - series.Tags = tags - - series.Metric = d.namePrefix + name - - // When running in init mode, we insert zeros with the current - // timestamp in order to populate Datadog's metrics list. Then the - // user can enable these metrics for historic ingest and load the - // full dataset. This should only be necessary once globally. - if d.init { - series.Points = []DatadogPoint{{ - Value: 0, - Timestamp: timeutil.Now().Unix(), - }} - } else { - for i, ts := range data.Datapoints { - series.Points[i].Value = ts.Value - series.Points[i].Timestamp = ts.TimestampNanos / 1_000_000_000 - } - } - - // We append every series directly to the list. This isn't ideal - // because every series batch that `Emit` is called with will contain - // around 360 points and the same metric will repeat many many times. - // This causes us to repeat the metadata collection here. Ideally, we - // can find the series object for this metric if it already exists - // and insert the points there. - d.series = append(d.series, *series) - - // The limit of `100` is an experimentally set heuristic. It can - // probably be increased. This results in a payload that's generally - // below 2MB. DD's limit is 5MB. - if len(d.series) > d.threshold { - fmt.Printf( - "tsdump datadog upload: sending payload containing %d series including %s\n", - len(d.series), - d.series[0].Metric, - ) - return d.Flush() - } - return nil -} - -func (d *datadogWriter) Flush() error { - var buf bytes.Buffer - err := json.NewEncoder(&buf).Encode(&DatadogSubmitMetrics{Series: d.series}) - if err != nil { - return err - } - var zipBuf bytes.Buffer - g := gzip.NewWriter(&zipBuf) - _, err = io.Copy(g, &buf) - if err != nil { - return err - } - err = g.Close() - if err != nil { - return err - } - - retryOpts := base.DefaultRetryOptions() - retryOpts.MaxRetries = 3 - for retry := retry.Start(retryOpts); retry.Next(); { - req, err := http.NewRequest("POST", d.targetURL, &zipBuf) - if err != nil { - return err - } - req.Header.Set("DD-API-KEY", d.apiKey) - req.Header.Set(server.ContentTypeHeader, "application/json") - req.Header.Set(httputil.ContentEncodingHeader, "gzip") - - err = d.doRequest(req) - if err != nil { - fmt.Printf("retry attempt:%d tsdump: error while sending metrics to datadog: %v\n", retry.CurrentAttempt(), err) - continue - } else { - break - } - } - - d.series = nil - return nil -} - -type DatadogResp struct { - Errors []string `json:"errors"` -} - -var _ tsWriter = &datadogWriter{} - type jsonWriter struct { sync.Once targetURL string diff --git a/pkg/cli/tsdump_test.go b/pkg/cli/tsdump_test.go index aedc262f0283..4cad55fb95bc 100644 --- a/pkg/cli/tsdump_test.go +++ b/pkg/cli/tsdump_test.go @@ -13,7 +13,6 @@ package cli import ( "bytes" "compress/gzip" - "context" "fmt" "io" "math/rand" @@ -149,6 +148,48 @@ func parseTSInput(t *testing.T, input string, w tsWriter) { require.NoError(t, err) } +func parseDDInput(t *testing.T, input string, w *datadogWriter) { + var data *DatadogSeries + var source, storeNodeKey string + + for _, s := range strings.Split(input, "\n") { + nameValueTimestamp := strings.Split(s, " ") + sl := reCrStoreNode.FindStringSubmatch(nameValueTimestamp[0]) + if len(sl) != 0 { + storeNodeKey = sl[1] + if storeNodeKey == "node" { + storeNodeKey += "_id" + } + } + metricName := sl[2] + + // Advance to a new struct anytime name or source changes + if data == nil || + (data != nil && data.Metric != metricName || + (data != nil && source != nameValueTimestamp[1])) { + if data != nil { + err := w.emitDataDogMetrics([]DatadogSeries{*data}) + require.NoError(t, err) + } + data = &DatadogSeries{ + Metric: metricName, + } + source = nameValueTimestamp[1] + data.Tags = append(data.Tags, fmt.Sprintf("%s:%s", storeNodeKey, nameValueTimestamp[1])) + } + value, err := strconv.ParseFloat(nameValueTimestamp[2], 64) + require.NoError(t, err) + ts, err := strconv.ParseInt(nameValueTimestamp[3], 10, 64) + require.NoError(t, err) + data.Points = append(data.Points, DatadogPoint{ + Value: value, + Timestamp: ts, + }) + } + err := w.emitDataDogMetrics([]DatadogSeries{*data}) + require.NoError(t, err) +} + func TestTsDumpFormatsDataDriven(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -165,12 +206,12 @@ func TestTsDumpFormatsDataDriven(t *testing.T) { var testReqs []*http.Request var series int d.ScanArgs(t, "series-threshold", &series) - w = makeDatadogWriter(context.Background(), "https://example.com/data", false, "api-key", series, func(req *http.Request) error { + var ddwriter = makeDatadogWriter("https://example.com/data", false, "api-key", series, func(req *http.Request) error { testReqs = append(testReqs, req) return nil }) - parseTSInput(t, d.Input, w) + parseDDInput(t, d.Input, ddwriter) out := strings.Builder{} for _, tr := range testReqs { diff --git a/pkg/cli/tsdump_upload.go b/pkg/cli/tsdump_upload.go new file mode 100644 index 000000000000..cf04583f13af --- /dev/null +++ b/pkg/cli/tsdump_upload.go @@ -0,0 +1,341 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cli + +import ( + "bytes" + "compress/gzip" + "encoding/gob" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "sync" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/ts" + "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +const ( + DatadogSeriesTypeUnknown = iota + DatadogSeriesTypeCounter + DatadogSeriesTypeRate + DatadogSeriesTypeGauge +) + +var ( + // each site in datadog has a different host name. ddSiteToHostMap + // holds the mapping of site name to the host name. + ddSiteToHostMap = map[string]string{ + "us1": "api.datadoghq.com", + "us3": "api.us3.datadoghq.com", + "us5": "api.us5.datadoghq.com", + "eu1": "api.datadoghq.eu", + "ap1": "api.ap1.datadoghq.com", + "us1-fed": "api.ddog-gov.com", + } + + targetURLFormat = "https://%s/api/v2/series" +) + +// DatadogPoint is a single metric point in Datadog format +type DatadogPoint struct { + // Timestamp must be in seconds since Unix epoch. + Timestamp int64 `json:"timestamp"` + Value float64 `json:"value"` +} + +// DatadogSeries contains a JSON encoding of a single series object +// that can be send to Datadog. +type DatadogSeries struct { + Metric string `json:"metric"` + Type int `json:"type"` + Points []DatadogPoint `json:"points"` + Resources []struct { + Name string `json:"name"` + Type string `json:"type"` + } `json:"resources"` + // In order to encode arbitrary key-value pairs, use a `:` delimited + // tag string like `cluster:dedicated`. + Tags []string `json:"tags"` +} + +// DatadogSubmitMetrics is the top level JSON object that must be sent to Datadog. +// See: https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +type DatadogSubmitMetrics struct { + Series []DatadogSeries `json:"series"` +} + +type DatadogResp struct { + Errors []string `json:"errors"` +} + +var newTsdumpUploadID = func() string { + clusterTagValue := "" + if debugTimeSeriesDumpOpts.clusterLabel != "" { + clusterTagValue = debugTimeSeriesDumpOpts.clusterLabel + } else if serverCfg.ClusterName != "" { + clusterTagValue = serverCfg.ClusterName + } else { + clusterTagValue = fmt.Sprintf("cluster-debug-%d", timeutil.Now().Unix()) + } + return newUploadID(clusterTagValue) +} + +// datadogWriter can convert our metrics to Datadog format and send +// them via HTTP to the public DD endpoint, assuming an API key is set +// in the CLI flags. +type datadogWriter struct { + sync.Once + targetURL string + uploadID string + init bool + apiKey string + // namePrefix sets the string to prepend to all metric names. The + // names are kept with `.` delimiters. + namePrefix string + doRequest func(req *http.Request) error + threshold int +} + +func makeDatadogWriter( + targetURL string, + init bool, + apiKey string, + threshold int, + doRequest func(req *http.Request) error, +) *datadogWriter { + return &datadogWriter{ + targetURL: targetURL, + uploadID: newTsdumpUploadID(), + init: init, + apiKey: apiKey, + namePrefix: "crdb.tsdump.", // Default pre-set prefix to distinguish these uploads. + doRequest: doRequest, + threshold: threshold, + } +} + +func doDDRequest(req *http.Request) error { + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + ddResp := DatadogResp{} + err = json.Unmarshal(respBytes, &ddResp) + if err != nil { + return err + } + if len(ddResp.Errors) > 0 { + return errors.Newf("tsdump: error response from datadog: %v", ddResp.Errors) + } + if resp.StatusCode > 299 { + return errors.Newf("tsdump: bad response status code: %+v", resp) + } + return nil +} + +func dump(kv *roachpb.KeyValue) (*DatadogSeries, error) { + name, source, _, _, err := ts.DecodeDataKey(kv.Key) + if err != nil { + return nil, err + } + var idata roachpb.InternalTimeSeriesData + if err := kv.Value.GetProto(&idata); err != nil { + return nil, err + } + + series := &DatadogSeries{ + Metric: name, + Tags: []string{}, + Type: DatadogSeriesTypeUnknown, + Points: make([]DatadogPoint, idata.SampleCount()), + } + + sl := reCrStoreNode.FindStringSubmatch(name) + if len(sl) != 0 { + storeNodeKey := sl[1] + if storeNodeKey == "node" { + storeNodeKey += "_id" + } + series.Tags = append(series.Tags, fmt.Sprintf("%s:%s", storeNodeKey, source)) + series.Metric = sl[2] + } else { + series.Tags = append(series.Tags, "node_id:0") + } + + for i := 0; i < idata.SampleCount(); i++ { + if idata.IsColumnar() { + series.Points[i].Timestamp = idata.TimestampForOffset(idata.Offset[i]) / 1_000_000_000 + series.Points[i].Value = idata.Last[i] + } else { + series.Points[i].Timestamp = idata.TimestampForOffset(idata.Samples[i].Offset) / 1_000_000_000 + series.Points[i].Value = idata.Samples[i].Sum + } + + } + return series, nil +} + +func (d *datadogWriter) emitDataDogMetrics(data []DatadogSeries) error { + var tags []string + // Hardcoded values + tags = append(tags, "cluster_type:SELF_HOSTED") + tags = append(tags, "job:cockroachdb") + tags = append(tags, "region:local") + + if debugTimeSeriesDumpOpts.clusterLabel != "" { + tags = append(tags, makeDDTag("cluster_label", debugTimeSeriesDumpOpts.clusterLabel)) + } + + tags = append(tags, makeDDTag(uploadIDTag, d.uploadID)) + + d.Do(func() { + fmt.Println("Upload ID:", d.uploadID) + }) + + for i := 0; i < len(data); i++ { + data[i].Tags = append(data[i].Tags, tags...) + data[i].Metric = d.namePrefix + data[i].Metric + } + + // When running in init mode, we insert zeros with the current + // timestamp in order to populate Datadog's metrics list. Then the + // user can enable these metrics for historic ingest and load the + // full dataset. This should only be necessary once globally. + if d.init { + for i := 0; i < len(data); i++ { + data[i].Points = []DatadogPoint{{ + Value: 0, + Timestamp: timeutil.Now().Unix(), + }} + } + } + + fmt.Printf( + "tsdump datadog upload: sending payload containing %d series including %s\n", + len(data), + data[0].Metric, + ) + + return d.flush(data) +} + +func (d *datadogWriter) flush(data []DatadogSeries) error { + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(&DatadogSubmitMetrics{Series: data}) + if err != nil { + return err + } + var zipBuf bytes.Buffer + g := gzip.NewWriter(&zipBuf) + _, err = io.Copy(g, &buf) + if err != nil { + return err + } + err = g.Close() + if err != nil { + return err + } + + retryOpts := base.DefaultRetryOptions() + retryOpts.MaxRetries = 5 + for retry := retry.Start(retryOpts); retry.Next(); { + req, err := http.NewRequest("POST", d.targetURL, &zipBuf) + if err != nil { + return err + } + req.Header.Set("DD-API-KEY", d.apiKey) + req.Header.Set(server.ContentTypeHeader, "application/json") + req.Header.Set(httputil.ContentEncodingHeader, "gzip") + + err = d.doRequest(req) + if err == nil { + return nil + } + } + return err +} + +func (d *datadogWriter) upload(fileName string) error { + f, err := os.Open(fileName) + if err != nil { + return err + } + + dec := gob.NewDecoder(f) + gob.Register(&roachpb.KeyValue{}) + decodeOne := func() ([]DatadogSeries, error) { + var ddSeries []DatadogSeries + + for i := 0; i < d.threshold; i++ { + var v roachpb.KeyValue + err := dec.Decode(&v) + if err != nil { + return ddSeries, err + } + + datadogSeries, err := dump(&v) + if err != nil { + return nil, err + } + ddSeries = append(ddSeries, *datadogSeries) + } + + return ddSeries, nil + } + + var wg sync.WaitGroup + ch := make(chan []DatadogSeries, 4000) + + for i := 0; i < 1000; i++ { + go func() { + for data := range ch { + err := d.emitDataDogMetrics(data) + if err != nil { + fmt.Printf("retries exhausted for datadog upload containing series %s with error %v", data[0].Metric, err) + wg.Done() + return + } + wg.Done() + } + }() + } + + for { + data, err := decodeOne() + if err == io.EOF { + if len(data) != 0 { + wg.Add(1) + ch <- data + } + break + } + wg.Add(1) + ch <- data + } + + wg.Wait() + close(ch) + return nil +}