Skip to content

Commit

Permalink
Refactor metrics (#5)
Browse files Browse the repository at this point in the history
* Refactor metrics

Signed-off-by: Daniel González Lopes <[email protected]>

* Increase request failed metric

Signed-off-by: Daniel González Lopes <[email protected]>

* Rm req_failed

Signed-off-by: Daniel González Lopes <[email protected]>

* Add response callback

Signed-off-by: Daniel González Lopes <[email protected]>

* Change version

Signed-off-by: Daniel González Lopes <[email protected]>
  • Loading branch information
dgzlopes authored Oct 27, 2021
1 parent 9901424 commit 1cdf79d
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 104 deletions.
27 changes: 16 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export default function () {
]
}]);
check(res, {
'is status 200': (r) => r.status_code === 200,
'is status 200': (r) => r.status === 200,
});
sleep(1)
}
Expand Down Expand Up @@ -84,15 +84,20 @@ default ✓ [======================================] 10 VUs 10s
✓ is status 200
checks......................: 100.00% ✓ 90 ✗ 0
data_received...............: 0 B 0 B/s
data_sent...................: 6.2 kB 596 B/s
iteration_duration..........: avg=1.14s min=1.13s med=1.13s max=1.26s p(90)=1.23s p(95)=1.24s
iterations..................: 90 8.624555/s
remote_write_num_series.....: 90 8.624555/s
remote_write_req_duration...: avg=146.62ms min=129ms med=132ms max=260ms p(90)=231ms p(95)=242.1ms
remote_write_reqs...........: 90 8.624555/s
vus.........................: 10 min=10 max=10
vus_max.....................: 10 min=10 max=10
checks.....................: 100.00% ✓ 90 ✗ 0
data_received..............: 46 kB 4.4 kB/s
data_sent..................: 24 kB 2.3 kB/s
http_req_blocked...........: avg=7.52ms min=290ns med=380ns max=68.08ms p(90)=67.2ms p(95)=67.7ms
http_req_connecting........: avg=1.88ms min=0s med=0s max=18.27ms p(90)=15.25ms p(95)=17.11ms
http_req_duration..........: avg=136.88ms min=131.24ms med=135.66ms max=215.49ms p(90)=139.2ms p(95)=140.72ms
http_req_receiving.........: avg=42.9µs min=22µs med=40.65µs max=86.74µs p(90)=57.7µs p(95)=64.26µs
http_req_sending...........: avg=68.74µs min=38.42µs med=61.17µs max=144.06µs p(90)=102.71µs p(95)=113.75µs
http_req_tls_handshaking...: avg=3.8ms min=0s med=0s max=35.93ms p(90)=32.92ms p(95)=34.2ms
http_req_waiting...........: avg=136.76ms min=131.07ms med=135.56ms max=215.35ms p(90)=139.09ms p(95)=140.63ms
http_reqs..................: 90 8.650581/s
iteration_duration.........: avg=1.14s min=1.13s med=1.13s max=1.28s p(90)=1.2s p(95)=1.2s
iterations.................: 90 8.650581/s
vus........................: 10 min=10 max=10
vus_max....................: 10 min=10 max=10
```
Inspect examples folder for more details.
11 changes: 2 additions & 9 deletions examples/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,7 @@ import remote from 'k6/x/remotewrite';

export let options = {
vus: 10,
duration: '5m',
ext: {
loadimpact: {
projectID: 3183896,
// Test runs with the same name groups test runs together
name: "Prometheus Remote Write Client"
}
}
duration: '10s',
};

const client = new remote.Client({
Expand All @@ -28,7 +21,7 @@ export default function () {
]
}]);
check(res, {
'is status 200': (r) => r.status_code === 200,
'is status 200': (r) => r.status === 200,
});
sleep(1)
}
2 changes: 1 addition & 1 deletion examples/benchtool.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export default function () {
"samples": generateEmptySamples(1000)
}]);
check(res, {
'is status 200': (r) => r.status_code === 200,
'is status 200': (r) => r.status === 200,
});
sleep(0.15);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/full_read_write_example.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export function write_scenario() {
);

check(res, {
'write worked': (r) => r.status_code === 200,
'write worked': (r) => r.status === 200,
}) || fail(JSON.stringify(res));
}

Expand Down
2 changes: 1 addition & 1 deletion examples/full_write_example.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export function write_scenario() {
);

check(res, {
'write worked': (r) => r.status_code === 200,
'write worked': (r) => r.status === 200,
}) || fail(JSON.stringify(res));
}

Expand Down
4 changes: 2 additions & 2 deletions examples/two_scenarios.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export function increaseCounter() {
},
]);
check(res, {
'is status 200': (r) => r.status_code === 200,
'is status 200': (r) => r.status === 200,
});
sleep(10);
}
Expand All @@ -61,7 +61,7 @@ export function sendBatch() {
}
]);
check(res, {
'is status 200': (r) => r.status_code === 200,
'is status 200': (r) => r.status === 200,
});
sleep(1)
}
105 changes: 37 additions & 68 deletions remote_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package remotewrite
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"log"
"net/http"
"net/url"
"time"

"github.com/golang/protobuf/proto"
Expand All @@ -17,9 +17,7 @@ import (
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/netext"
"go.k6.io/k6/stats"
"go.k6.io/k6/lib/netext/httpext"
)

// Register the extension on module initialization, available to
Expand All @@ -28,7 +26,7 @@ func init() {
modules.Register("k6/x/remotewrite", new(RemoteWrite))
}

// RemoteWrite is the k6 extension for interacting with Kubernetes jobs.
// RemoteWrite is the k6 extension for interacting Prometheus Remote Write endpoints.
type RemoteWrite struct {
}

Expand All @@ -51,7 +49,7 @@ func (r *RemoteWrite) XClient(ctxPtr *context.Context, config Config) interface{
log.Fatal(fmt.Errorf("url is required"))
}
if config.UserAgent == "" {
config.UserAgent = "k6-remote-write/0.0.1"
config.UserAgent = "k6-remote-write/0.0.2"
}
if config.Timeout == "" {
config.Timeout = "10s"
Expand All @@ -75,7 +73,7 @@ type Timeseries struct {
} `json:"samples"`
}

func (c *Client) Store(ctx context.Context, ts []Timeseries) (http.Response, error) {
func (c *Client) Store(ctx context.Context, ts []Timeseries) (httpext.Response, error) {
var batch []prompb.TimeSeries
for _, t := range ts {
batch = append(batch, FromTimeseriesToPrometheusTimeseries(t))
Expand All @@ -84,102 +82,73 @@ func (c *Client) Store(ctx context.Context, ts []Timeseries) (http.Response, err
// Required for k6 metrics
state := lib.GetState(ctx)
if state == nil {
return http.Response{}, errors.New("State is nil")
return *httpext.NewResponse(ctx), errors.New("State is nil")
}

now := time.Now()
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
Metric: RemoteWriteNumSeries,
Time: now,
Value: float64(len(batch)),
})

req := prompb.WriteRequest{
Timeseries: batch,
}

data, err := proto.Marshal(&req)
if err != nil {
return http.Response{}, errors.Wrap(err, "failed to marshal remote-write request")
return *httpext.NewResponse(ctx), errors.Wrap(err, "failed to marshal remote-write request")
}

compressed := snappy.Encode(nil, data)

res, err := c.send(ctx, state, compressed)
if err != nil {
return http.Response{}, errors.Wrap(err, "remote-write request failed")
return *httpext.NewResponse(ctx), errors.Wrap(err, "remote-write request failed")
}

return res, nil
}

// send sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
// and encoded bytes
func (c *Client) send(ctx context.Context, state *lib.State, req []byte) (http.Response, error) {
httpReq, err := http.NewRequest("POST", c.cfg.Url, bytes.NewReader(req))
func (c *Client) send(ctx context.Context, state *lib.State, req []byte) (httpext.Response, error) {
httpResp := httpext.NewResponse(ctx)
r, err := http.NewRequest("POST", c.cfg.Url, nil)
if err != nil {
return http.Response{}, err
return *httpResp, err
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("User-Agent", c.cfg.UserAgent)
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
r.Header.Add("Content-Encoding", "snappy")
r.Header.Set("Content-Type", "application/x-protobuf")
r.Header.Set("User-Agent", c.cfg.UserAgent)
r.Header.Set("X-Prometheus-Remote-Write-Version", "0.0.2")
if c.cfg.TenantName != "" {
httpReq.Header.Set("X-Scope-OrgID", c.cfg.TenantName)
r.Header.Set("X-Scope-OrgID", c.cfg.TenantName)
}

duration, err := str2duration.ParseDuration(c.cfg.Timeout)
if err != nil {
return http.Response{}, err
return *httpResp, err
}
ctx, cancel := context.WithTimeout(ctx, duration)
defer cancel()

httpReq = httpReq.WithContext(ctx)
now := time.Now()

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
Metric: RemoteWriteReqs,
Time: now,
Value: float64(1),
u, err := url.Parse(c.cfg.Url)
if err != nil {
return *httpResp, err
}

url, _ := httpext.NewURL(c.cfg.Url, u.Host+u.Path)
response, err := httpext.MakeRequest(ctx, &httpext.ParsedHTTPRequest{
URL: &url,
Req: r,
Body: bytes.NewBuffer(req),
Throw: state.Options.Throw.Bool,
Redirects: state.Options.MaxRedirects,
Timeout: duration,
ResponseCallback: ResponseCallback,
})

simpleNetTrail := netext.NetTrail{
BytesWritten: int64(binary.Size(req)),
StartTime: now.Add(-time.Minute),
EndTime: now,
Samples: []stats.Sample{
{
Time: now,
Metric: metrics.DataSent,
Value: float64(binary.Size(req)),
},
},
}
stats.PushIfNotDone(ctx, state.Samples, &simpleNetTrail)

start := time.Now()
httpResp, err := c.client.Do(httpReq)
elapsed := time.Since(start)
if err != nil {
return http.Response{}, err
return *httpResp, err
}

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
Metric: RemoteWriteReqDuration,
Time: now,
Value: float64(elapsed.Milliseconds()),
})

if httpResp.StatusCode != http.StatusOK {
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
Metric: RemoteWriteReqFailed,
Time: now,
Value: float64(1),
})
}
return *response, err
}

return *httpResp, err
func ResponseCallback(n int) bool {
return n == 200
}

func FromTimeseriesToPrometheusTimeseries(ts Timeseries) prompb.TimeSeries {
Expand Down
11 changes: 0 additions & 11 deletions stats.go

This file was deleted.

0 comments on commit 1cdf79d

Please sign in to comment.