diff --git a/integrations/grafana/m3coordinator_dashboard.json b/integrations/grafana/m3coordinator_dashboard.json index 2ee99ef848..434683bfe6 100644 --- a/integrations/grafana/m3coordinator_dashboard.json +++ b/integrations/grafana/m3coordinator_dashboard.json @@ -89,7 +89,7 @@ "steppedLine": false, "targets": [ { - "expr": "rate(coordinator_fetch_success{source=\"remote\"}[1m])", + "expr": "rate(coordinator_fetch_success{}[1m])", "format": "time_series", "intervalFactor": 1, "legendFormat": "queries_per_second", @@ -175,7 +175,7 @@ "steppedLine": false, "targets": [ { - "expr": "sum(rate(coordinator_fetch_errors{source=\"remote\"}[1m])) by (code)", + "expr": "sum(rate(coordinator_fetch_errors{}[1m])) by (code)", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{code}}", @@ -274,7 +274,7 @@ "steppedLine": false, "targets": [ { - "expr": "rate(coordinator_write_success{source=\"remote\"}[1m])", + "expr": "sum(irate(coordinator_m3db_client_write_success{}[1m]))", "format": "time_series", "intervalFactor": 1, "legendFormat": "write_per_second", @@ -285,7 +285,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Writes Per Second", + "title": "Remote Writes Per Second", "tooltip": { "shared": true, "sort": 0, @@ -360,7 +360,14 @@ "steppedLine": false, "targets": [ { - "expr": "sum(rate(coordinator_write_errors{source=\"remote\"}[1m])) by (code)", + "expr": "sum(rate(coordinator_write_success{}[1m]))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "2XX", + "refId": "B" + }, + { + "expr": "sum(rate(coordinator_write_errors{}[1m])) by (code)", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{code}}", @@ -371,7 +378,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Write Errors Per Second", + "title": "Remote Write Batch Requests Per Second", "tooltip": { "shared": true, "sort": 0, @@ -407,6 +414,114 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "description": "This is the measurement of the difference between the server's relative time now() and the datapoint's timestamp after it is persisted (or fails) to be written.", + "fill": 1, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 18 + }, + "id": 17, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "paceLength": 10, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, sum(rate(coordinator_ingest_latency_bucket[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "p99 latency", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(coordinator_ingest_latency_bucket[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "p95 latency", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.75, sum(rate(coordinator_ingest_latency_bucket[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "p75 latency", + "refId": "C" + }, + { + "expr": "histogram_quantile(0.5, sum(rate(coordinator_ingest_latency_bucket[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "p50 latency", + "refId": "D" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Write Ingest Latency (Relative to datapoint timestamp)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "refresh": false, diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 550a587f79..01c966b084 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -48,40 +48,24 @@ import ( "github.com/m3db/m3/src/metrics/rules" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" - "github.com/m3db/m3/src/x/serialize" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/serialize" xsync "github.com/m3db/m3/src/x/sync" ) const ( - instanceID = "downsampler_local" - placementKVKey = "/placement" - replicationFactor = 1 - defaultStorageFlushConcurrency = 20000 - defaultOpenTimeout = 10 * time.Second - minBufferPast = 5 * time.Second - maxBufferPast = 10 * time.Minute - defaultBufferPastTimedMetricFactor = 0.1 - defaultBufferFutureTimedMetric = time.Minute + instanceID = "downsampler_local" + placementKVKey = "/placement" + replicationFactor = 1 + defaultStorageFlushConcurrency = 20000 + defaultOpenTimeout = 10 * time.Second + defaultBufferFutureTimedMetric = time.Minute ) var ( - numShards = runtime.NumCPU() - defaultBufferForPastTimedMetricFn = func(r time.Duration) time.Duration { - value := time.Duration(defaultBufferPastTimedMetricFactor * float64(r)) - - // Clamp minBufferPast <= value <= maxBufferPast. - if value < minBufferPast { - return minBufferPast - } - if value > maxBufferPast { - return maxBufferPast - } - - return value - } + numShards = runtime.NumCPU() errNoStorage = errors.New("dynamic downsampling enabled with storage not set") errNoClusterClient = errors.New("dynamic downsampling enabled with cluster client not set") @@ -346,7 +330,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { SetElectionManager(electionManager). SetFlushManager(flushManager). SetFlushHandler(flushHandler). - SetBufferForPastTimedMetricFn(defaultBufferForPastTimedMetricFn). + SetBufferForPastTimedMetricFn(bufferForPastTimedMetric). SetBufferForFutureTimedMetric(defaultBufferFutureTimedMetric) if cfg.AggregationTypes != nil { @@ -617,3 +601,25 @@ func (o DownsamplerOptions) newAggregatorFlushManagerAndHandler( return flushManager, handler } + +var ( + bufferPastLimits = []struct { + upperBound time.Duration + bufferPast time.Duration + }{ + {upperBound: 0, bufferPast: 15 * time.Second}, + {upperBound: 30 * time.Second, bufferPast: 30 * time.Second}, + {upperBound: time.Minute, bufferPast: time.Minute}, + } +) + +func bufferForPastTimedMetric(tile time.Duration) time.Duration { + bufferPast := bufferPastLimits[0].bufferPast + for _, limit := range bufferPastLimits { + if tile < limit.upperBound { + return bufferPast + } + bufferPast = limit.bufferPast + } + return bufferPast +} diff --git a/src/cmd/services/m3coordinator/downsample/options_test.go b/src/cmd/services/m3coordinator/downsample/options_test.go new file mode 100644 index 0000000000..8fe043a04e --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/options_test.go @@ -0,0 +1,52 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBufferForPastTimedMetric(t *testing.T) { + tests := []struct { + value time.Duration + expected time.Duration + }{ + {value: -1 * time.Second, expected: 15 * time.Second}, + {value: 0, expected: 15 * time.Second}, + {value: 1 * time.Second, expected: 15 * time.Second}, + {value: 29 * time.Second, expected: 15 * time.Second}, + {value: 30 * time.Second, expected: 30 * time.Second}, + {value: 59 * time.Second, expected: 30 * time.Second}, + {value: 60 * time.Second, expected: time.Minute}, + {value: 59 * time.Minute, expected: time.Minute}, + {value: 61 * time.Minute, expected: time.Minute}, + } + for _, test := range tests { + t.Run(fmt.Sprintf("test_value_%v", test.value), func(t *testing.T) { + result := bufferForPastTimedMetric(test.value) + require.Equal(t, test.expected, result) + }) + } +}