diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index 18000dadd93a..45d44298c1dc 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -57,6 +57,7 @@ go_library( "statement_diag.go", "testutils.go", "tsdump.go", + "tsdump_upload.go", "userfile.go", "zip.go", "zip_cluster_wide.go", diff --git a/pkg/cli/testdata/tsdump/json b/pkg/cli/testdata/tsdump/json index e7a6ee14264d..0b387b2ffd18 100644 --- a/pkg/cli/testdata/tsdump/json +++ b/pkg/cli/testdata/tsdump/json @@ -31,4 +31,7 @@ cr.node.admission.admitted.elastic.cpu 2 1.000000 1711130560 ---- POST: https://example.com/data DD-API-KEY: api-key -Body: {"series":[{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":17111304,"value":0},{"timestamp":17111304,"value":1},{"timestamp":17111304,"value":1},{"timestamp":17111305,"value":1}],"resources":null,"tags":["cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234","node_id:1"]},{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1},{"timestamp":17111305,"value":1}],"resources":null,"tags":["cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234","node_id:2"]}]} +Body: {"series":[{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":1711130470,"value":0},{"timestamp":1711130480,"value":1},{"timestamp":1711130490,"value":1},{"timestamp":1711130500,"value":1}],"resources":null,"tags":["node_id:1","cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234"]}]} +POST: https://example.com/data +DD-API-KEY: api-key +Body: {"series":[{"metric":"crdb.tsdump.admission.admitted.elastic.cpu","type":0,"points":[{"timestamp":1711130510,"value":1},{"timestamp":1711130520,"value":1},{"timestamp":1711130530,"value":1},{"timestamp":1711130540,"value":1},{"timestamp":1711130550,"value":1},{"timestamp":1711130560,"value":1}],"resources":null,"tags":["node_id:2","cluster_type:SELF_HOSTED","job:cockroachdb","region:local","cluster_label:test-cluster","upload_id:test-cluster-1234"]}]} diff --git a/pkg/cli/tsdump.go b/pkg/cli/tsdump.go index ec393b02f27a..4ce3deb54b09 100644 --- a/pkg/cli/tsdump.go +++ b/pkg/cli/tsdump.go @@ -13,7 +13,6 @@ package cli import ( "bufio" "bytes" - "compress/gzip" "context" "encoding/csv" "encoding/gob" @@ -28,17 +27,13 @@ import ( "sync" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cli/clierrorplus" "github.com/cockroachdb/cockroach/pkg/cli/clisqlclient" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/ts" "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/ts/tsutil" - "github.com/cockroachdb/cockroach/pkg/util/httputil" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/spf13/cobra" @@ -63,21 +58,6 @@ var debugTimeSeriesDumpOpts = struct { yaml: "/tmp/tsdump.yaml", } -var ( - // each site in datadog has a different host name. ddSiteToHostMap - // holds the mapping of site name to the host name. - ddSiteToHostMap = map[string]string{ - "us1": "api.datadoghq.com", - "us3": "api.us3.datadoghq.com", - "us5": "api.us5.datadoghq.com", - "eu1": "api.datadoghq.eu", - "ap1": "api.ap1.datadoghq.com", - "us1-fed": "api.ddog-gov.com", - } - - targetURLFormat = "https://%s/api/v2/series" -) - var debugTimeSeriesDumpCmd = &cobra.Command{ Use: "tsdump", Short: "dump all the raw timeseries values in a cluster", @@ -133,23 +113,24 @@ will then convert it to the --format requested in the current invocation. doRequest, ) case tsDumpDatadog: - w = makeDatadogWriter( - ctx, + var datadogWriter = makeDatadogWriter( targetURL, - false, /* init */ + false, debugTimeSeriesDumpOpts.ddApiKey, - 100, /* threshold */ + 100, doDDRequest, ) + return datadogWriter.upload(args[0]) + case tsDumpDatadogInit: - w = makeDatadogWriter( - ctx, + var datadogWriter = makeDatadogWriter( targetURL, - true, /* init */ + true, debugTimeSeriesDumpOpts.ddApiKey, - 100, /* threshold */ + 100, doDDRequest, ) + return datadogWriter.upload(args[0]) case tsDumpOpenMetrics: if debugTimeSeriesDumpOpts.targetURL != "" { write := beginHttpRequestWithWritePipe(debugTimeSeriesDumpOpts.targetURL) @@ -296,30 +277,6 @@ func doRequest(req *http.Request) error { return nil } -func doDDRequest(req *http.Request) error { - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - respBytes, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - ddResp := DatadogResp{} - err = json.Unmarshal(respBytes, &ddResp) - if err != nil { - return err - } - if len(ddResp.Errors) > 0 { - return errors.Newf("tsdump: error response from datadog: %v", ddResp.Errors) - } - if resp.StatusCode > 299 { - return errors.Newf("tsdump: bad response status code: %+v", resp) - } - return nil -} - // beginHttpRequestWithWritePipe initiates an HTTP request to the // `targetURL` argument and returns an `io.Writer` that pipes to the // request body. This function will return while the request runs @@ -349,218 +306,6 @@ type tsWriter interface { Flush() error } -// datadogWriter can convert our metrics to Datadog format and send -// them via HTTP to the public DD endpoint, assuming an API key is set -// in the CLI flags. -type datadogWriter struct { - sync.Once - targetURL string - buffer bytes.Buffer - series []DatadogSeries - uploadID string - init bool - apiKey string - // namePrefix sets the string to prepend to all metric names. The - // names are kept with `.` delimiters. - namePrefix string - doRequest func(req *http.Request) error - threshold int -} - -func makeDatadogWriter( - ctx context.Context, - targetURL string, - init bool, - apiKey string, - threshold int, - doRequest func(req *http.Request) error, -) *datadogWriter { - return &datadogWriter{ - targetURL: targetURL, - buffer: bytes.Buffer{}, - uploadID: newTsdumpUploadID(), - init: init, - apiKey: apiKey, - namePrefix: "crdb.tsdump.", // Default pre-set prefix to distinguish these uploads. - doRequest: doRequest, - threshold: threshold, - } -} - -var newTsdumpUploadID = func() string { - clusterTagValue := "" - if debugTimeSeriesDumpOpts.clusterLabel != "" { - clusterTagValue = debugTimeSeriesDumpOpts.clusterLabel - } else if serverCfg.ClusterName != "" { - clusterTagValue = serverCfg.ClusterName - } else { - clusterTagValue = fmt.Sprintf("cluster-debug-%d", timeutil.Now().Unix()) - } - return newUploadID(clusterTagValue) -} - -// DatadogPoint is a single metric point in Datadog format -type DatadogPoint struct { - // Timestamp must be in seconds since Unix epoch. - Timestamp int64 `json:"timestamp"` - Value float64 `json:"value"` -} - -// DatadogSeries contains a JSON encoding of a single series object -// that can be send to Datadog. -type DatadogSeries struct { - Metric string `json:"metric"` - Type int `json:"type"` - Points []DatadogPoint `json:"points"` - Resources []struct { - Name string `json:"name"` - Type string `json:"type"` - } `json:"resources"` - // In order to encode arbitrary key-value pairs, use a `:` delimited - // tag string like `cluster:dedicated`. - Tags []string `json:"tags"` -} - -// DatadogSubmitMetrics is the top level JSON object that must be sent to Datadog. -// See: https://docs.datadoghq.com/api/latest/metrics/#submit-metrics -type DatadogSubmitMetrics struct { - Series []DatadogSeries `json:"series"` -} - -const ( - DatadogSeriesTypeUnknown = iota - DatadogSeriesTypeCounter - DatadogSeriesTypeRate - DatadogSeriesTypeGauge -) - -func (d *datadogWriter) Emit(data *tspb.TimeSeriesData) error { - series := &DatadogSeries{ - // TODO(davidh): This is not correct. We should inspect metric metadata and set appropriately. - // The impact of not doing this is that the metric will be treated as a gauge by default. - Type: DatadogSeriesTypeUnknown, - Points: make([]DatadogPoint, len(data.Datapoints)), - } - - name := data.Name - - var tags []string - // Hardcoded values - tags = append(tags, "cluster_type:SELF_HOSTED") - tags = append(tags, "job:cockroachdb") - tags = append(tags, "region:local") - - if debugTimeSeriesDumpOpts.clusterLabel != "" { - tags = append(tags, makeDDTag("cluster_label", debugTimeSeriesDumpOpts.clusterLabel)) - } - - tags = append(tags, makeDDTag(uploadIDTag, d.uploadID)) - - d.Do(func() { - fmt.Println("Upload ID:", d.uploadID) - }) - - sl := reCrStoreNode.FindStringSubmatch(data.Name) - if len(sl) != 0 { - storeNodeKey := sl[1] - if storeNodeKey == "node" { - storeNodeKey += "_id" - } - tags = append(tags, fmt.Sprintf("%s:%s", storeNodeKey, data.Source)) - name = sl[2] - } else { - tags = append(tags, "node_id:0") - } - - series.Tags = tags - - series.Metric = d.namePrefix + name - - // When running in init mode, we insert zeros with the current - // timestamp in order to populate Datadog's metrics list. Then the - // user can enable these metrics for historic ingest and load the - // full dataset. This should only be necessary once globally. - if d.init { - series.Points = []DatadogPoint{{ - Value: 0, - Timestamp: timeutil.Now().Unix(), - }} - } else { - for i, ts := range data.Datapoints { - series.Points[i].Value = ts.Value - series.Points[i].Timestamp = ts.TimestampNanos / 1_000_000_000 - } - } - - // We append every series directly to the list. This isn't ideal - // because every series batch that `Emit` is called with will contain - // around 360 points and the same metric will repeat many many times. - // This causes us to repeat the metadata collection here. Ideally, we - // can find the series object for this metric if it already exists - // and insert the points there. - d.series = append(d.series, *series) - - // The limit of `100` is an experimentally set heuristic. It can - // probably be increased. This results in a payload that's generally - // below 2MB. DD's limit is 5MB. - if len(d.series) > d.threshold { - fmt.Printf( - "tsdump datadog upload: sending payload containing %d series including %s\n", - len(d.series), - d.series[0].Metric, - ) - return d.Flush() - } - return nil -} - -func (d *datadogWriter) Flush() error { - var buf bytes.Buffer - err := json.NewEncoder(&buf).Encode(&DatadogSubmitMetrics{Series: d.series}) - if err != nil { - return err - } - var zipBuf bytes.Buffer - g := gzip.NewWriter(&zipBuf) - _, err = io.Copy(g, &buf) - if err != nil { - return err - } - err = g.Close() - if err != nil { - return err - } - - retryOpts := base.DefaultRetryOptions() - retryOpts.MaxRetries = 3 - for retry := retry.Start(retryOpts); retry.Next(); { - req, err := http.NewRequest("POST", d.targetURL, &zipBuf) - if err != nil { - return err - } - req.Header.Set("DD-API-KEY", d.apiKey) - req.Header.Set(server.ContentTypeHeader, "application/json") - req.Header.Set(httputil.ContentEncodingHeader, "gzip") - - err = d.doRequest(req) - if err != nil { - fmt.Printf("retry attempt:%d tsdump: error while sending metrics to datadog: %v\n", retry.CurrentAttempt(), err) - continue - } else { - break - } - } - - d.series = nil - return nil -} - -type DatadogResp struct { - Errors []string `json:"errors"` -} - -var _ tsWriter = &datadogWriter{} - type jsonWriter struct { sync.Once targetURL string diff --git a/pkg/cli/tsdump_test.go b/pkg/cli/tsdump_test.go index aedc262f0283..4cad55fb95bc 100644 --- a/pkg/cli/tsdump_test.go +++ b/pkg/cli/tsdump_test.go @@ -13,7 +13,6 @@ package cli import ( "bytes" "compress/gzip" - "context" "fmt" "io" "math/rand" @@ -149,6 +148,48 @@ func parseTSInput(t *testing.T, input string, w tsWriter) { require.NoError(t, err) } +func parseDDInput(t *testing.T, input string, w *datadogWriter) { + var data *DatadogSeries + var source, storeNodeKey string + + for _, s := range strings.Split(input, "\n") { + nameValueTimestamp := strings.Split(s, " ") + sl := reCrStoreNode.FindStringSubmatch(nameValueTimestamp[0]) + if len(sl) != 0 { + storeNodeKey = sl[1] + if storeNodeKey == "node" { + storeNodeKey += "_id" + } + } + metricName := sl[2] + + // Advance to a new struct anytime name or source changes + if data == nil || + (data != nil && data.Metric != metricName || + (data != nil && source != nameValueTimestamp[1])) { + if data != nil { + err := w.emitDataDogMetrics([]DatadogSeries{*data}) + require.NoError(t, err) + } + data = &DatadogSeries{ + Metric: metricName, + } + source = nameValueTimestamp[1] + data.Tags = append(data.Tags, fmt.Sprintf("%s:%s", storeNodeKey, nameValueTimestamp[1])) + } + value, err := strconv.ParseFloat(nameValueTimestamp[2], 64) + require.NoError(t, err) + ts, err := strconv.ParseInt(nameValueTimestamp[3], 10, 64) + require.NoError(t, err) + data.Points = append(data.Points, DatadogPoint{ + Value: value, + Timestamp: ts, + }) + } + err := w.emitDataDogMetrics([]DatadogSeries{*data}) + require.NoError(t, err) +} + func TestTsDumpFormatsDataDriven(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -165,12 +206,12 @@ func TestTsDumpFormatsDataDriven(t *testing.T) { var testReqs []*http.Request var series int d.ScanArgs(t, "series-threshold", &series) - w = makeDatadogWriter(context.Background(), "https://example.com/data", false, "api-key", series, func(req *http.Request) error { + var ddwriter = makeDatadogWriter("https://example.com/data", false, "api-key", series, func(req *http.Request) error { testReqs = append(testReqs, req) return nil }) - parseTSInput(t, d.Input, w) + parseDDInput(t, d.Input, ddwriter) out := strings.Builder{} for _, tr := range testReqs { diff --git a/pkg/cli/tsdump_upload.go b/pkg/cli/tsdump_upload.go new file mode 100644 index 000000000000..cf04583f13af --- /dev/null +++ b/pkg/cli/tsdump_upload.go @@ -0,0 +1,341 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cli + +import ( + "bytes" + "compress/gzip" + "encoding/gob" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "sync" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/ts" + "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +const ( + DatadogSeriesTypeUnknown = iota + DatadogSeriesTypeCounter + DatadogSeriesTypeRate + DatadogSeriesTypeGauge +) + +var ( + // each site in datadog has a different host name. ddSiteToHostMap + // holds the mapping of site name to the host name. + ddSiteToHostMap = map[string]string{ + "us1": "api.datadoghq.com", + "us3": "api.us3.datadoghq.com", + "us5": "api.us5.datadoghq.com", + "eu1": "api.datadoghq.eu", + "ap1": "api.ap1.datadoghq.com", + "us1-fed": "api.ddog-gov.com", + } + + targetURLFormat = "https://%s/api/v2/series" +) + +// DatadogPoint is a single metric point in Datadog format +type DatadogPoint struct { + // Timestamp must be in seconds since Unix epoch. + Timestamp int64 `json:"timestamp"` + Value float64 `json:"value"` +} + +// DatadogSeries contains a JSON encoding of a single series object +// that can be send to Datadog. +type DatadogSeries struct { + Metric string `json:"metric"` + Type int `json:"type"` + Points []DatadogPoint `json:"points"` + Resources []struct { + Name string `json:"name"` + Type string `json:"type"` + } `json:"resources"` + // In order to encode arbitrary key-value pairs, use a `:` delimited + // tag string like `cluster:dedicated`. + Tags []string `json:"tags"` +} + +// DatadogSubmitMetrics is the top level JSON object that must be sent to Datadog. +// See: https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +type DatadogSubmitMetrics struct { + Series []DatadogSeries `json:"series"` +} + +type DatadogResp struct { + Errors []string `json:"errors"` +} + +var newTsdumpUploadID = func() string { + clusterTagValue := "" + if debugTimeSeriesDumpOpts.clusterLabel != "" { + clusterTagValue = debugTimeSeriesDumpOpts.clusterLabel + } else if serverCfg.ClusterName != "" { + clusterTagValue = serverCfg.ClusterName + } else { + clusterTagValue = fmt.Sprintf("cluster-debug-%d", timeutil.Now().Unix()) + } + return newUploadID(clusterTagValue) +} + +// datadogWriter can convert our metrics to Datadog format and send +// them via HTTP to the public DD endpoint, assuming an API key is set +// in the CLI flags. +type datadogWriter struct { + sync.Once + targetURL string + uploadID string + init bool + apiKey string + // namePrefix sets the string to prepend to all metric names. The + // names are kept with `.` delimiters. + namePrefix string + doRequest func(req *http.Request) error + threshold int +} + +func makeDatadogWriter( + targetURL string, + init bool, + apiKey string, + threshold int, + doRequest func(req *http.Request) error, +) *datadogWriter { + return &datadogWriter{ + targetURL: targetURL, + uploadID: newTsdumpUploadID(), + init: init, + apiKey: apiKey, + namePrefix: "crdb.tsdump.", // Default pre-set prefix to distinguish these uploads. + doRequest: doRequest, + threshold: threshold, + } +} + +func doDDRequest(req *http.Request) error { + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + ddResp := DatadogResp{} + err = json.Unmarshal(respBytes, &ddResp) + if err != nil { + return err + } + if len(ddResp.Errors) > 0 { + return errors.Newf("tsdump: error response from datadog: %v", ddResp.Errors) + } + if resp.StatusCode > 299 { + return errors.Newf("tsdump: bad response status code: %+v", resp) + } + return nil +} + +func dump(kv *roachpb.KeyValue) (*DatadogSeries, error) { + name, source, _, _, err := ts.DecodeDataKey(kv.Key) + if err != nil { + return nil, err + } + var idata roachpb.InternalTimeSeriesData + if err := kv.Value.GetProto(&idata); err != nil { + return nil, err + } + + series := &DatadogSeries{ + Metric: name, + Tags: []string{}, + Type: DatadogSeriesTypeUnknown, + Points: make([]DatadogPoint, idata.SampleCount()), + } + + sl := reCrStoreNode.FindStringSubmatch(name) + if len(sl) != 0 { + storeNodeKey := sl[1] + if storeNodeKey == "node" { + storeNodeKey += "_id" + } + series.Tags = append(series.Tags, fmt.Sprintf("%s:%s", storeNodeKey, source)) + series.Metric = sl[2] + } else { + series.Tags = append(series.Tags, "node_id:0") + } + + for i := 0; i < idata.SampleCount(); i++ { + if idata.IsColumnar() { + series.Points[i].Timestamp = idata.TimestampForOffset(idata.Offset[i]) / 1_000_000_000 + series.Points[i].Value = idata.Last[i] + } else { + series.Points[i].Timestamp = idata.TimestampForOffset(idata.Samples[i].Offset) / 1_000_000_000 + series.Points[i].Value = idata.Samples[i].Sum + } + + } + return series, nil +} + +func (d *datadogWriter) emitDataDogMetrics(data []DatadogSeries) error { + var tags []string + // Hardcoded values + tags = append(tags, "cluster_type:SELF_HOSTED") + tags = append(tags, "job:cockroachdb") + tags = append(tags, "region:local") + + if debugTimeSeriesDumpOpts.clusterLabel != "" { + tags = append(tags, makeDDTag("cluster_label", debugTimeSeriesDumpOpts.clusterLabel)) + } + + tags = append(tags, makeDDTag(uploadIDTag, d.uploadID)) + + d.Do(func() { + fmt.Println("Upload ID:", d.uploadID) + }) + + for i := 0; i < len(data); i++ { + data[i].Tags = append(data[i].Tags, tags...) + data[i].Metric = d.namePrefix + data[i].Metric + } + + // When running in init mode, we insert zeros with the current + // timestamp in order to populate Datadog's metrics list. Then the + // user can enable these metrics for historic ingest and load the + // full dataset. This should only be necessary once globally. + if d.init { + for i := 0; i < len(data); i++ { + data[i].Points = []DatadogPoint{{ + Value: 0, + Timestamp: timeutil.Now().Unix(), + }} + } + } + + fmt.Printf( + "tsdump datadog upload: sending payload containing %d series including %s\n", + len(data), + data[0].Metric, + ) + + return d.flush(data) +} + +func (d *datadogWriter) flush(data []DatadogSeries) error { + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(&DatadogSubmitMetrics{Series: data}) + if err != nil { + return err + } + var zipBuf bytes.Buffer + g := gzip.NewWriter(&zipBuf) + _, err = io.Copy(g, &buf) + if err != nil { + return err + } + err = g.Close() + if err != nil { + return err + } + + retryOpts := base.DefaultRetryOptions() + retryOpts.MaxRetries = 5 + for retry := retry.Start(retryOpts); retry.Next(); { + req, err := http.NewRequest("POST", d.targetURL, &zipBuf) + if err != nil { + return err + } + req.Header.Set("DD-API-KEY", d.apiKey) + req.Header.Set(server.ContentTypeHeader, "application/json") + req.Header.Set(httputil.ContentEncodingHeader, "gzip") + + err = d.doRequest(req) + if err == nil { + return nil + } + } + return err +} + +func (d *datadogWriter) upload(fileName string) error { + f, err := os.Open(fileName) + if err != nil { + return err + } + + dec := gob.NewDecoder(f) + gob.Register(&roachpb.KeyValue{}) + decodeOne := func() ([]DatadogSeries, error) { + var ddSeries []DatadogSeries + + for i := 0; i < d.threshold; i++ { + var v roachpb.KeyValue + err := dec.Decode(&v) + if err != nil { + return ddSeries, err + } + + datadogSeries, err := dump(&v) + if err != nil { + return nil, err + } + ddSeries = append(ddSeries, *datadogSeries) + } + + return ddSeries, nil + } + + var wg sync.WaitGroup + ch := make(chan []DatadogSeries, 4000) + + for i := 0; i < 1000; i++ { + go func() { + for data := range ch { + err := d.emitDataDogMetrics(data) + if err != nil { + fmt.Printf("retries exhausted for datadog upload containing series %s with error %v", data[0].Metric, err) + wg.Done() + return + } + wg.Done() + } + }() + } + + for { + data, err := decodeOne() + if err == io.EOF { + if len(data) != 0 { + wg.Add(1) + ch <- data + } + break + } + wg.Add(1) + ch <- data + } + + wg.Wait() + close(ch) + return nil +}