Skip to content

Commit

Permalink
feat: k6runner: improve error handling for k6 output
Browse files Browse the repository at this point in the history
  • Loading branch information
nadiamoe committed Nov 25, 2024
1 parent 63bad77 commit 856e47e
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 18 deletions.
50 changes: 50 additions & 0 deletions internal/k6runner/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package k6runner

import (
"bytes"
"errors"
"strings"

"github.com/go-logfmt/logfmt"
"golang.org/x/exp/maps"
)

var (
ErrStacktrace = errors.New("fatal error occurred while running the script")
ErrThrown = errors.New("uncaught error occurred while running the script")
)

// errorFromLogs scans k6 logs for signs of significant errors that should be reported as such.
func errorFromLogs(logs []byte) error {
dec := logfmt.NewDecoder(bytes.NewReader(logs))

keyVals := make(map[string]string, 8) // Typically will be less than 8 fields.

for dec.ScanRecord() {
maps.Clear(keyVals)

for dec.ScanKeyval() {
// dec.Key and dec.Value values are not reusable across calls of ScanRecord, but that should be fine.
keyVals[string(dec.Key())] = string(dec.Value())
}

if dec.Err() != nil {
// Ignore errors.
continue
}

// Stacktrace errors are often fatal, like syntax errors or accessing properties of undefined objects.
if keyVals["level"] == "error" && keyVals["source"] == "stacktrace" {
return ErrStacktrace
}

// Uncaught exceptions and errors.
// These do not have an identifiable attribute (https://github.com/grafana/k6/issues/3842), so we rely in
// string matching.
if keyVals["level"] == "error" && strings.HasPrefix(keyVals["msg"], "Uncaught (in promise)") {
return ErrThrown
}
}

return nil
}
75 changes: 75 additions & 0 deletions internal/k6runner/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package k6runner

import (
"errors"
"testing"
)

func TestErrorFromLogs(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
logLines string
expect error
}{
{
name: "console.error",
logLines: `time="2024-04-18T19:39:22+02:00" level=error msg="console error" source=console`,
expect: nil,
},
{
name: "stack trace",
logLines: `
time="2024-04-18T19:39:22+02:00" level=error msg="console error" source=console
level=error msg=foobar source=stacktrace
`,
expect: ErrStacktrace,
},
{
name: "unsupported browser",
logLines: `
time="2024-07-12T16:47:13+02:00" level=error msg="Uncaught (in promise) GoError: browser not found in registry. make sure to set browser type option in scenario definition in order to use the browser module\n\tat github.com/grafana/xk6-browser/browser.syncMapBrowser.func7 (native)\n\tat file:///tmp/roobre/browser.js:20:15(4)\n" executor=shared-iterations scenario=default
`,
expect: ErrThrown,
},
{
name: "throw new Error()",
logLines: `
time="2024-07-12T16:48:01+02:00" level=error msg="Uncaught (in promise) Error: foobar\n\tat file:///tmp/roobre/browser.js:22:8(8)\n" executor=shared-iterations scenario=ui
`,
expect: ErrThrown,
},
{
name: "required filds in different lines",
logLines: `
lever=error msg=something
source=stacktrace msg=something
`,
expect: nil,
},
{
name: "badly formatted lines and stactrace",
//nolint:dupword // Duped
logLines: `
something something
lever=error something something
source=stacktrace something something
level=error msg="missing a quote
level=error msg=foobar source=stacktrace
`,
expect: nil, // FIXME: Probably the parser should tolerate malformed lines.
},
} {
tc := tc

t.Run(tc.name, func(t *testing.T) {
t.Parallel()

err := errorFromLogs([]byte(tc.logLines))
if !errors.Is(err, tc.expect) {
t.Fatalf("Unexpected error: %v", err)
}
})
}
}
8 changes: 8 additions & 0 deletions internal/k6runner/k6runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ func CheckInfoFromSM(smc smmmodel.Check) CheckInfo {
return ci
}

// MarshalZerologObject implements zerolog.LogObjectMarshaler so it can be logged in a friendly way.
func (ci *CheckInfo) MarshalZerologObject(e *zerolog.Event) {
e.Str("type", ci.Type)
for k, v := range ci.Metadata {
e.Any(k, v)
}
}

// ErrNoTimeout is returned by [Runner] implementations if the supplied script has a timeout of zero.
var ErrNoTimeout = errors.New("check has no timeout")

Expand Down
138 changes: 120 additions & 18 deletions internal/k6runner/local.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package k6runner

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"os/exec"
"time"

Expand All @@ -25,6 +28,8 @@ func (r Local) WithLogger(logger *zerolog.Logger) Runner {
}

func (r Local) Run(ctx context.Context, script Script) (*RunResponse, error) {
logger := r.logger.With().Object("checkInfo", &script.CheckInfo).Logger()

afs := afero.Afero{Fs: r.fs}

checkTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond
Expand All @@ -39,7 +44,7 @@ func (r Local) Run(ctx context.Context, script Script) (*RunResponse, error) {

defer func() {
if err := r.fs.RemoveAll(workdir); err != nil {
r.logger.Error().Err(err).Str("severity", "critical").Msg("cannot remove temporary directory")
logger.Error().Err(err).Str("severity", "critical").Msg("cannot remove temporary directory")
}
}()

Expand Down Expand Up @@ -115,36 +120,68 @@ func (r Local) Run(ctx context.Context, script Script) (*RunResponse, error) {
cmd.Stderr = &stderr

start := time.Now()
logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script")
err = cmd.Run()

r.logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script")
duration := time.Since(start)

if err := cmd.Run(); err != nil {
r.logger.Error().Err(err).Str("stdout", stdout.String()).Str("stderr", stderr.String()).Msg("k6 exited with error")
return nil, fmt.Errorf("cannot execute k6 script: %w", err)
}
// If context error is non-nil, incorporate it into err.
// This brings context to log lines and plays well with both errors.Is and errors.As.
err = errors.Join(err, ctx.Err())

duration := time.Since(start)
if err != nil && !isUserError(err) {
logger.Error().
Err(err).
Dur("duration", duration).
Msg("cannot run k6")

r.logger.Debug().Str("stdout", stdout.String()).Str("stderr", stderr.String()).Dur("duration", duration).Msg("ran k6 script")
r.logger.Info().Dur("duration", duration).Msg("ran k6 script")
dumpK6OutputStream(r.logger, zerolog.ErrorLevel, &stdout, "stream", "stdout")
dumpK6OutputStream(r.logger, zerolog.ErrorLevel, &stderr, "stream", "stderr")

var result RunResponse
logs, _ := afs.ReadFile(logsFn)
dumpK6OutputStream(r.logger, zerolog.InfoLevel, bytes.NewReader(logs), "stream", "logs")

result.Metrics, err = afs.ReadFile(metricsFn)
return nil, fmt.Errorf("executing k6 script: %w", err)
}

// 256KiB is the maximum payload size for Loki. Set our limit slightly below that to avoid tripping the limit in
// case we inject some messages down the line.
const maxLogsSizeBytes = 255 * 1024

// Mimir can also ingest up to 256KiB, but that's JSON-encoded, not promhttp encoded. Experimentally, the
// To be safe. we limit it to 100KiB promhttp-encoded, hoping than the more verbose json encoding overhead is less
// than 2.5x.
const maxMetricsSizeBytes = 100 * 1024

logs, truncated, err := readFileLimit(afs.Fs, logsFn, maxLogsSizeBytes)
if err != nil {
r.logger.Error().Err(err).Str("filename", metricsFn).Msg("cannot read metrics file")
return nil, fmt.Errorf("cannot read metrics: %w", err)
return nil, fmt.Errorf("reading k6 logs: %w", err)
}
if truncated {
logger.Warn().
Str("filename", logsFn).
Int("limitBytes", maxLogsSizeBytes).
Msg("Logs output larger than limit, truncating")

// Leave a truncation notice at the end.
fmt.Fprintf(logs, `level=error msg="Log output truncated at %d bytes"`+"\n", maxLogsSizeBytes)
}

result.Logs, err = afs.ReadFile(logsFn)
metrics, truncated, err := readFileLimit(afs.Fs, metricsFn, maxMetricsSizeBytes)
if err != nil {
r.logger.Error().Err(err).Str("filename", logsFn).Msg("cannot read metrics file")
return nil, fmt.Errorf("cannot read logs: %w", err)
return nil, fmt.Errorf("reading k6 metrics: %w", err)
}
if truncated {
logger.Warn().
Str("filename", metricsFn).
Int("limitBytes", maxMetricsSizeBytes).
Msg("Metrics output larger than limit, truncating")

r.logger.Debug().Bytes("metrics", result.Metrics).Bytes("logs", result.Logs).Msg("k6 result")
// If we truncate metrics, also leave a truncation notice at the end of the logs.
fmt.Fprintf(logs, `level=error msg="Metrics output truncated at %d bytes"`+"\n", maxMetricsSizeBytes)
}

return &result, nil
return &RunResponse{Metrics: metrics.Bytes(), Logs: logs.Bytes()}, errors.Join(err, errorFromLogs(logs.Bytes()))
}

func mktemp(fs afero.Fs, dir, pattern string) (string, error) {
Expand All @@ -157,3 +194,68 @@ func mktemp(fs afero.Fs, dir, pattern string) (string, error) {
}
return f.Name(), nil
}

func dumpK6OutputStream(logger *zerolog.Logger, lvl zerolog.Level, stream io.Reader, fields ...any) {
scanner := bufio.NewScanner(stream)

for scanner.Scan() {
logger.WithLevel(lvl).Fields(fields).Str("line", scanner.Text()).Msg("k6 output")
}

if err := scanner.Err(); err != nil {
logger.Error().Fields(fields).Err(err).Msg("reading k6 output")
}
}

// isUserError returns whether we attribute this error to the user, i.e. to a combination of the k6 script contents and
// settings. This includes timeouts and exit codes returned by k6.
func isUserError(err error) bool {
if errors.Is(err, context.DeadlineExceeded) {
return true
}

if exitErr := (&exec.ExitError{}); errors.As(err, &exitErr) && exitErr.ExitCode() < 127 {
// If this is an ExitError and the result code is < 127, this is a user error.
// https://github.com/grafana/k6/blob/v0.50.0/errext/exitcodes/codes.go
return true
}

return false
}

// readFileLimit reads up to limit bytes from the specified file using the specified FS. The limit respects newline
// boundaries: If the limit is reached, the portion between the last '\n' character and the limit will not be returned.
// A boolean is returned indicating whether the limit was reached.
func readFileLimit(f afero.Fs, name string, limit int64) (*bytes.Buffer, bool, error) {
file, err := f.Open(name)
if err != nil {
return nil, false, fmt.Errorf("opening file: %w", err)
}
defer file.Close()

buf := &bytes.Buffer{}
copied, err := io.Copy(buf, io.LimitReader(file, limit))
if err != nil && !errors.Is(err, io.EOF) {
return nil, false, fmt.Errorf("reading file: %w", err)
}

if copied < limit {
// Copied less than budget, we haven't truncated anything.
return buf, false, nil
}

peek := make([]byte, 1)
_, err = file.Read(peek)
if errors.Is(err, io.EOF) {
// Jackpot, file fit exactly within the limit.
return buf, false, nil
}

// Rewind until last newline
lastNewline := bytes.LastIndexByte(buf.Bytes(), '\n')
if lastNewline != -1 {
buf.Truncate(lastNewline + 1)
}

return buf, true, nil
}

0 comments on commit 856e47e

Please sign in to comment.