From e6930178c8665af873ba8d29d6b6dc2b420858e2 Mon Sep 17 00:00:00 2001 From: Nadia Santalla Date: Mon, 25 Nov 2024 11:38:04 +0100 Subject: [PATCH] feat: k6runner: improve error handling for k6 output --- internal/k6runner/error.go | 50 ++++++++++++ internal/k6runner/error_test.go | 75 +++++++++++++++++ internal/k6runner/k6runner.go | 8 ++ internal/k6runner/local.go | 138 +++++++++++++++++++++++++++----- 4 files changed, 253 insertions(+), 18 deletions(-) create mode 100644 internal/k6runner/error.go create mode 100644 internal/k6runner/error_test.go diff --git a/internal/k6runner/error.go b/internal/k6runner/error.go new file mode 100644 index 000000000..bc2e66aea --- /dev/null +++ b/internal/k6runner/error.go @@ -0,0 +1,50 @@ +package k6runner + +import ( + "bytes" + "errors" + "strings" + + "github.com/go-logfmt/logfmt" + "golang.org/x/exp/maps" +) + +var ( + ErrStacktrace = errors.New("fatal error occurred while running the script") + ErrThrown = errors.New("uncaught error occurred while running the script") +) + +// errorFromLogs scans k6 logs for signs of significant errors that should be reported as such. +func errorFromLogs(logs []byte) error { + dec := logfmt.NewDecoder(bytes.NewReader(logs)) + + keyVals := make(map[string]string, 8) // Typically will be less than 8 fields. + + for dec.ScanRecord() { + maps.Clear(keyVals) + + for dec.ScanKeyval() { + // dec.Key and dec.Value values are not reusable across calls of ScanRecord, but that should be fine. + keyVals[string(dec.Key())] = string(dec.Value()) + } + + if dec.Err() != nil { + // Ignore errors. + continue + } + + // Stacktrace errors are often fatal, like syntax errors or accessing properties of undefined objects. + if keyVals["level"] == "error" && keyVals["source"] == "stacktrace" { + return ErrStacktrace + } + + // Uncaught exceptions and errors. + // These do not have an identifiable attribute (https://github.com/grafana/k6/issues/3842), so we rely in + // string matching. + if keyVals["level"] == "error" && strings.HasPrefix(keyVals["msg"], "Uncaught (in promise)") { + return ErrThrown + } + } + + return nil +} diff --git a/internal/k6runner/error_test.go b/internal/k6runner/error_test.go new file mode 100644 index 000000000..ebcfe0d96 --- /dev/null +++ b/internal/k6runner/error_test.go @@ -0,0 +1,75 @@ +package k6runner + +import ( + "errors" + "testing" +) + +func TestErrorFromLogs(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + logLines string + expect error + }{ + { + name: "console.error", + logLines: `time="2024-04-18T19:39:22+02:00" level=error msg="console error" source=console`, + expect: nil, + }, + { + name: "stack trace", + logLines: ` + time="2024-04-18T19:39:22+02:00" level=error msg="console error" source=console + level=error msg=foobar source=stacktrace + `, + expect: ErrStacktrace, + }, + { + name: "unsupported browser", + logLines: ` +time="2024-07-12T16:47:13+02:00" level=error msg="Uncaught (in promise) GoError: browser not found in registry. make sure to set browser type option in scenario definition in order to use the browser module\n\tat github.com/grafana/xk6-browser/browser.syncMapBrowser.func7 (native)\n\tat file:///tmp/roobre/browser.js:20:15(4)\n" executor=shared-iterations scenario=default + `, + expect: ErrThrown, + }, + { + name: "throw new Error()", + logLines: ` +time="2024-07-12T16:48:01+02:00" level=error msg="Uncaught (in promise) Error: foobar\n\tat file:///tmp/roobre/browser.js:22:8(8)\n" executor=shared-iterations scenario=ui + `, + expect: ErrThrown, + }, + { + name: "required filds in different lines", + logLines: ` + lever=error msg=something + source=stacktrace msg=something + `, + expect: nil, + }, + { + name: "badly formatted lines and stactrace", + //nolint:dupword // Duped + logLines: ` + something something + lever=error something something + source=stacktrace something something + level=error msg="missing a quote + level=error msg=foobar source=stacktrace + `, + expect: nil, // FIXME: Probably the parser should tolerate malformed lines. + }, + } { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + err := errorFromLogs([]byte(tc.logLines)) + if !errors.Is(err, tc.expect) { + t.Fatalf("Unexpected error: %v", err) + } + }) + } +} diff --git a/internal/k6runner/k6runner.go b/internal/k6runner/k6runner.go index 6b00fbb35..bc34d94b3 100644 --- a/internal/k6runner/k6runner.go +++ b/internal/k6runner/k6runner.go @@ -67,6 +67,14 @@ func CheckInfoFromSM(smc smmmodel.Check) CheckInfo { return ci } +// MarshalZerologObject implements zerolog.LogObjectMarshaler so it can be logged in a friendly way. +func (ci *CheckInfo) MarshalZerologObject(e *zerolog.Event) { + e.Str("type", ci.Type) + for k, v := range ci.Metadata { + e.Any(k, v) + } +} + // ErrNoTimeout is returned by [Runner] implementations if the supplied script has a timeout of zero. var ErrNoTimeout = errors.New("check has no timeout") diff --git a/internal/k6runner/local.go b/internal/k6runner/local.go index 9efa4d3b1..bbf04be2e 100644 --- a/internal/k6runner/local.go +++ b/internal/k6runner/local.go @@ -1,9 +1,12 @@ package k6runner import ( + "bufio" "bytes" "context" + "errors" "fmt" + "io" "os/exec" "time" @@ -25,6 +28,8 @@ func (r Local) WithLogger(logger *zerolog.Logger) Runner { } func (r Local) Run(ctx context.Context, script Script) (*RunResponse, error) { + logger := r.logger.With().Object("checkInfo", &script.CheckInfo).Logger() + afs := afero.Afero{Fs: r.fs} checkTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond @@ -39,7 +44,7 @@ func (r Local) Run(ctx context.Context, script Script) (*RunResponse, error) { defer func() { if err := r.fs.RemoveAll(workdir); err != nil { - r.logger.Error().Err(err).Str("severity", "critical").Msg("cannot remove temporary directory") + logger.Error().Err(err).Str("severity", "critical").Msg("cannot remove temporary directory") } }() @@ -115,36 +120,68 @@ func (r Local) Run(ctx context.Context, script Script) (*RunResponse, error) { cmd.Stderr = &stderr start := time.Now() + logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script") + err = cmd.Run() - r.logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script") + duration := time.Since(start) - if err := cmd.Run(); err != nil { - r.logger.Error().Err(err).Str("stdout", stdout.String()).Str("stderr", stderr.String()).Msg("k6 exited with error") - return nil, fmt.Errorf("cannot execute k6 script: %w", err) - } + // If context error is non-nil, incorporate it into err. + // This brings context to log lines and plays well with both errors.Is and errors.As. + err = errors.Join(err, ctx.Err()) - duration := time.Since(start) + if err != nil && !isUserError(err) { + logger.Error(). + Err(err). + Dur("duration", duration). + Msg("cannot run k6") - r.logger.Debug().Str("stdout", stdout.String()).Str("stderr", stderr.String()).Dur("duration", duration).Msg("ran k6 script") - r.logger.Info().Dur("duration", duration).Msg("ran k6 script") + dumpK6OutputStream(r.logger, zerolog.ErrorLevel, &stdout, "stream", "stdout") + dumpK6OutputStream(r.logger, zerolog.ErrorLevel, &stderr, "stream", "stderr") - var result RunResponse + logs, _ := afs.ReadFile(logsFn) + dumpK6OutputStream(r.logger, zerolog.InfoLevel, bytes.NewReader(logs), "stream", "logs") - result.Metrics, err = afs.ReadFile(metricsFn) + return nil, fmt.Errorf("executing k6 script: %w", err) + } + + // 256KiB is the maximum payload size for Loki. Set our limit slightly below that to avoid tripping the limit in + // case we inject some messages down the line. + const maxLogsSizeBytes = 255 * 1024 + + // Mimir can also ingest up to 256KiB, but that's JSON-encoded, not promhttp encoded. + // To be safe, we limit it to 100KiB promhttp-encoded, hoping than the more verbose json encoding overhead is less + // than 2.5x. + const maxMetricsSizeBytes = 100 * 1024 + + logs, truncated, err := readFileLimit(afs.Fs, logsFn, maxLogsSizeBytes) if err != nil { - r.logger.Error().Err(err).Str("filename", metricsFn).Msg("cannot read metrics file") - return nil, fmt.Errorf("cannot read metrics: %w", err) + return nil, fmt.Errorf("reading k6 logs: %w", err) + } + if truncated { + logger.Warn(). + Str("filename", logsFn). + Int("limitBytes", maxLogsSizeBytes). + Msg("Logs output larger than limit, truncating") + + // Leave a truncation notice at the end. + fmt.Fprintf(logs, `level=error msg="Log output truncated at %d bytes"`+"\n", maxLogsSizeBytes) } - result.Logs, err = afs.ReadFile(logsFn) + metrics, truncated, err := readFileLimit(afs.Fs, metricsFn, maxMetricsSizeBytes) if err != nil { - r.logger.Error().Err(err).Str("filename", logsFn).Msg("cannot read metrics file") - return nil, fmt.Errorf("cannot read logs: %w", err) + return nil, fmt.Errorf("reading k6 metrics: %w", err) } + if truncated { + logger.Warn(). + Str("filename", metricsFn). + Int("limitBytes", maxMetricsSizeBytes). + Msg("Metrics output larger than limit, truncating") - r.logger.Debug().Bytes("metrics", result.Metrics).Bytes("logs", result.Logs).Msg("k6 result") + // If we truncate metrics, also leave a truncation notice at the end of the logs. + fmt.Fprintf(logs, `level=error msg="Metrics output truncated at %d bytes"`+"\n", maxMetricsSizeBytes) + } - return &result, nil + return &RunResponse{Metrics: metrics.Bytes(), Logs: logs.Bytes()}, errors.Join(err, errorFromLogs(logs.Bytes())) } func mktemp(fs afero.Fs, dir, pattern string) (string, error) { @@ -157,3 +194,68 @@ func mktemp(fs afero.Fs, dir, pattern string) (string, error) { } return f.Name(), nil } + +func dumpK6OutputStream(logger *zerolog.Logger, lvl zerolog.Level, stream io.Reader, fields ...any) { + scanner := bufio.NewScanner(stream) + + for scanner.Scan() { + logger.WithLevel(lvl).Fields(fields).Str("line", scanner.Text()).Msg("k6 output") + } + + if err := scanner.Err(); err != nil { + logger.Error().Fields(fields).Err(err).Msg("reading k6 output") + } +} + +// isUserError returns whether we attribute this error to the user, i.e. to a combination of the k6 script contents and +// settings. This includes timeouts and exit codes returned by k6. +func isUserError(err error) bool { + if errors.Is(err, context.DeadlineExceeded) { + return true + } + + if exitErr := (&exec.ExitError{}); errors.As(err, &exitErr) && exitErr.ExitCode() < 127 { + // If this is an ExitError and the result code is < 127, this is a user error. + // https://github.com/grafana/k6/blob/v0.50.0/errext/exitcodes/codes.go + return true + } + + return false +} + +// readFileLimit reads up to limit bytes from the specified file using the specified FS. The limit respects newline +// boundaries: If the limit is reached, the portion between the last '\n' character and the limit will not be returned. +// A boolean is returned indicating whether the limit was reached. +func readFileLimit(f afero.Fs, name string, limit int64) (*bytes.Buffer, bool, error) { + file, err := f.Open(name) + if err != nil { + return nil, false, fmt.Errorf("opening file: %w", err) + } + defer file.Close() + + buf := &bytes.Buffer{} + copied, err := io.Copy(buf, io.LimitReader(file, limit)) + if err != nil && !errors.Is(err, io.EOF) { + return nil, false, fmt.Errorf("reading file: %w", err) + } + + if copied < limit { + // Copied less than budget, we haven't truncated anything. + return buf, false, nil + } + + peek := make([]byte, 1) + _, err = file.Read(peek) + if errors.Is(err, io.EOF) { + // Jackpot, file fit exactly within the limit. + return buf, false, nil + } + + // Rewind until last newline + lastNewline := bytes.LastIndexByte(buf.Bytes(), '\n') + if lastNewline != -1 { + buf.Truncate(lastNewline + 1) + } + + return buf, true, nil +}