Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: k6runner: improve error handling for k6 output #1063

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions internal/k6runner/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package k6runner

import (
"bytes"
"errors"
"strings"

"github.com/go-logfmt/logfmt"
"golang.org/x/exp/maps"
)

var (
ErrStacktrace = errors.New("fatal error occurred while running the script")
ErrThrown = errors.New("uncaught error occurred while running the script")
)

// errorFromLogs scans k6 logs for signs of significant errors that should be reported as such.
func errorFromLogs(logs []byte) error {
dec := logfmt.NewDecoder(bytes.NewReader(logs))

keyVals := make(map[string]string, 8) // Typically will be less than 8 fields.

for dec.ScanRecord() {
maps.Clear(keyVals)

for dec.ScanKeyval() {
// dec.Key and dec.Value values are not reusable across calls of ScanRecord, but that should be fine.
keyVals[string(dec.Key())] = string(dec.Value())
}

if dec.Err() != nil {
// Ignore errors.
continue
}

// Stacktrace errors are often fatal, like syntax errors or accessing properties of undefined objects.
if keyVals["level"] == "error" && keyVals["source"] == "stacktrace" {
return ErrStacktrace
}

// Uncaught exceptions and errors.
// These do not have an identifiable attribute (https://github.com/grafana/k6/issues/3842), so we rely in
// string matching.
if keyVals["level"] == "error" && strings.HasPrefix(keyVals["msg"], "Uncaught (in promise)") {
return ErrThrown
}
}

return nil
}
75 changes: 75 additions & 0 deletions internal/k6runner/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package k6runner

import (
"errors"
"testing"
)

func TestErrorFromLogs(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
logLines string
expect error
}{
{
name: "console.error",
logLines: `time="2024-04-18T19:39:22+02:00" level=error msg="console error" source=console`,
expect: nil,
},
{
name: "stack trace",
logLines: `
time="2024-04-18T19:39:22+02:00" level=error msg="console error" source=console
level=error msg=foobar source=stacktrace
`,
expect: ErrStacktrace,
},
{
name: "unsupported browser",
logLines: `
time="2024-07-12T16:47:13+02:00" level=error msg="Uncaught (in promise) GoError: browser not found in registry. make sure to set browser type option in scenario definition in order to use the browser module\n\tat github.com/grafana/xk6-browser/browser.syncMapBrowser.func7 (native)\n\tat file:///tmp/roobre/browser.js:20:15(4)\n" executor=shared-iterations scenario=default
`,
expect: ErrThrown,
},
{
name: "throw new Error()",
logLines: `
time="2024-07-12T16:48:01+02:00" level=error msg="Uncaught (in promise) Error: foobar\n\tat file:///tmp/roobre/browser.js:22:8(8)\n" executor=shared-iterations scenario=ui
`,
expect: ErrThrown,
},
{
name: "required filds in different lines",
logLines: `
lever=error msg=something
source=stacktrace msg=something
`,
expect: nil,
},
{
name: "badly formatted lines and stactrace",
//nolint:dupword // Duped
logLines: `
something something
lever=error something something
source=stacktrace something something
level=error msg="missing a quote
level=error msg=foobar source=stacktrace
`,
expect: nil, // FIXME: Probably the parser should tolerate malformed lines.
},
} {
tc := tc

t.Run(tc.name, func(t *testing.T) {
t.Parallel()

err := errorFromLogs([]byte(tc.logLines))
if !errors.Is(err, tc.expect) {
t.Fatalf("Unexpected error: %v", err)
}
})
}
}
8 changes: 8 additions & 0 deletions internal/k6runner/k6runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ func CheckInfoFromSM(smc smmmodel.Check) CheckInfo {
return ci
}

// MarshalZerologObject implements zerolog.LogObjectMarshaler so it can be logged in a friendly way.
func (ci *CheckInfo) MarshalZerologObject(e *zerolog.Event) {
e.Str("type", ci.Type)
for k, v := range ci.Metadata {
e.Any(k, v)
}
}

// ErrNoTimeout is returned by [Runner] implementations if the supplied script has a timeout of zero.
var ErrNoTimeout = errors.New("check has no timeout")

Expand Down
138 changes: 120 additions & 18 deletions internal/k6runner/local.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package k6runner

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"os/exec"
"time"

Expand All @@ -25,6 +28,8 @@ func (r Local) WithLogger(logger *zerolog.Logger) Runner {
}

func (r Local) Run(ctx context.Context, script Script) (*RunResponse, error) {
logger := r.logger.With().Object("checkInfo", &script.CheckInfo).Logger()

afs := afero.Afero{Fs: r.fs}

checkTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond
Expand All @@ -39,7 +44,7 @@ func (r Local) Run(ctx context.Context, script Script) (*RunResponse, error) {

defer func() {
if err := r.fs.RemoveAll(workdir); err != nil {
r.logger.Error().Err(err).Str("severity", "critical").Msg("cannot remove temporary directory")
logger.Error().Err(err).Str("severity", "critical").Msg("cannot remove temporary directory")
}
}()

Expand Down Expand Up @@ -115,36 +120,68 @@ func (r Local) Run(ctx context.Context, script Script) (*RunResponse, error) {
cmd.Stderr = &stderr

start := time.Now()
logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script")
err = cmd.Run()

r.logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script")
duration := time.Since(start)

if err := cmd.Run(); err != nil {
r.logger.Error().Err(err).Str("stdout", stdout.String()).Str("stderr", stderr.String()).Msg("k6 exited with error")
return nil, fmt.Errorf("cannot execute k6 script: %w", err)
}
// If context error is non-nil, incorporate it into err.
// This brings context to log lines and plays well with both errors.Is and errors.As.
err = errors.Join(err, ctx.Err())

duration := time.Since(start)
if err != nil && !isUserError(err) {
logger.Error().
Err(err).
Dur("duration", duration).
Msg("cannot run k6")

r.logger.Debug().Str("stdout", stdout.String()).Str("stderr", stderr.String()).Dur("duration", duration).Msg("ran k6 script")
r.logger.Info().Dur("duration", duration).Msg("ran k6 script")
dumpK6OutputStream(r.logger, zerolog.ErrorLevel, &stdout, "stream", "stdout")
dumpK6OutputStream(r.logger, zerolog.ErrorLevel, &stderr, "stream", "stderr")

var result RunResponse
logs, _ := afs.ReadFile(logsFn)
dumpK6OutputStream(r.logger, zerolog.InfoLevel, bytes.NewReader(logs), "stream", "logs")
ka3de marked this conversation as resolved.
Show resolved Hide resolved

result.Metrics, err = afs.ReadFile(metricsFn)
return nil, fmt.Errorf("executing k6 script: %w", err)
}

// 256KiB is the maximum payload size for Loki. Set our limit slightly below that to avoid tripping the limit in
// case we inject some messages down the line.
const maxLogsSizeBytes = 255 * 1024

// Mimir can also ingest up to 256KiB, but that's JSON-encoded, not promhttp encoded.
// To be safe, we limit it to 100KiB promhttp-encoded, hoping than the more verbose json encoding overhead is less
// than 2.5x.
const maxMetricsSizeBytes = 100 * 1024

logs, truncated, err := readFileLimit(afs.Fs, logsFn, maxLogsSizeBytes)
if err != nil {
r.logger.Error().Err(err).Str("filename", metricsFn).Msg("cannot read metrics file")
return nil, fmt.Errorf("cannot read metrics: %w", err)
return nil, fmt.Errorf("reading k6 logs: %w", err)
}
if truncated {
logger.Warn().
Str("filename", logsFn).
Int("limitBytes", maxLogsSizeBytes).
Msg("Logs output larger than limit, truncating")

// Leave a truncation notice at the end.
fmt.Fprintf(logs, `level=error msg="Log output truncated at %d bytes"`+"\n", maxLogsSizeBytes)
}

result.Logs, err = afs.ReadFile(logsFn)
metrics, truncated, err := readFileLimit(afs.Fs, metricsFn, maxMetricsSizeBytes)
if err != nil {
r.logger.Error().Err(err).Str("filename", logsFn).Msg("cannot read metrics file")
return nil, fmt.Errorf("cannot read logs: %w", err)
return nil, fmt.Errorf("reading k6 metrics: %w", err)
}
if truncated {
logger.Warn().
Str("filename", metricsFn).
Int("limitBytes", maxMetricsSizeBytes).
Msg("Metrics output larger than limit, truncating")

r.logger.Debug().Bytes("metrics", result.Metrics).Bytes("logs", result.Logs).Msg("k6 result")
// If we truncate metrics, also leave a truncation notice at the end of the logs.
fmt.Fprintf(logs, `level=error msg="Metrics output truncated at %d bytes"`+"\n", maxMetricsSizeBytes)
}

return &result, nil
return &RunResponse{Metrics: metrics.Bytes(), Logs: logs.Bytes()}, errors.Join(err, errorFromLogs(logs.Bytes()))
}

func mktemp(fs afero.Fs, dir, pattern string) (string, error) {
Expand All @@ -157,3 +194,68 @@ func mktemp(fs afero.Fs, dir, pattern string) (string, error) {
}
return f.Name(), nil
}

func dumpK6OutputStream(logger *zerolog.Logger, lvl zerolog.Level, stream io.Reader, fields ...any) {
scanner := bufio.NewScanner(stream)

for scanner.Scan() {
logger.WithLevel(lvl).Fields(fields).Str("line", scanner.Text()).Msg("k6 output")
}

if err := scanner.Err(); err != nil {
logger.Error().Fields(fields).Err(err).Msg("reading k6 output")
}
}

// isUserError returns whether we attribute this error to the user, i.e. to a combination of the k6 script contents and
// settings. This includes timeouts and exit codes returned by k6.
func isUserError(err error) bool {
if errors.Is(err, context.DeadlineExceeded) {
return true
}

if exitErr := (&exec.ExitError{}); errors.As(err, &exitErr) && exitErr.ExitCode() < 127 {
// If this is an ExitError and the result code is < 127, this is a user error.
// https://github.com/grafana/k6/blob/v0.50.0/errext/exitcodes/codes.go
return true
}

return false
}

// readFileLimit reads up to limit bytes from the specified file using the specified FS. The limit respects newline
// boundaries: If the limit is reached, the portion between the last '\n' character and the limit will not be returned.
// A boolean is returned indicating whether the limit was reached.
func readFileLimit(f afero.Fs, name string, limit int64) (*bytes.Buffer, bool, error) {
file, err := f.Open(name)
if err != nil {
return nil, false, fmt.Errorf("opening file: %w", err)
}
defer file.Close()

buf := &bytes.Buffer{}
copied, err := io.Copy(buf, io.LimitReader(file, limit))
if err != nil && !errors.Is(err, io.EOF) {
return nil, false, fmt.Errorf("reading file: %w", err)
}

if copied < limit {
// Copied less than budget, we haven't truncated anything.
return buf, false, nil
}

peek := make([]byte, 1)
_, err = file.Read(peek)
if errors.Is(err, io.EOF) {
// Jackpot, file fit exactly within the limit.
return buf, false, nil
}

// Rewind until last newline
lastNewline := bytes.LastIndexByte(buf.Bytes(), '\n')
if lastNewline != -1 {
buf.Truncate(lastNewline + 1)
}

return buf, true, nil
}
Loading