Skip to content

Commit

Permalink
x-pack/filebeat/input/{cel,httpjson,http_endpoint,internal/httplog}: …
Browse files Browse the repository at this point in the history
…add debugging bread crumbs (elastic#38636)

When debugging issues in cel and httpjson the request trace log facility
can be very useful, but the separated nature of the log can add friction
since the only tie between the agent logs and the request trace logs is
the time stamp. This adds an additional log message to the agent logs at
DEBUG that marks the creation of a request transaction, noting the
transaction ID into the agent logs.

A similar addition is made to http_endpoint.
  • Loading branch information
efd6 authored Mar 29, 2024
1 parent 7648a2a commit dafec8d
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Parse more fields from Elasticsearch slowlogs {pull}38295[38295]
- Update CEL mito extensions to v1.10.0 to add keys/values helper. {pull}38504[38504]
- Add support for Active Directory an entity analytics provider. {pull}37919[37919]
- Add debugging breadcrumb to logs when writing request trace log. {pull}38636[38636]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,

const margin = 1e3 // 1OkB ought to be enough room for all the remainder of the trace details.
maxSize := cfg.Resource.Tracer.MaxSize * 1e6
trace = httplog.NewLoggingRoundTripper(c.Transport, traceLogger, max(0, maxSize-margin))
trace = httplog.NewLoggingRoundTripper(c.Transport, traceLogger, max(0, maxSize-margin), log)
c.Transport = trace
}

Expand Down
25 changes: 20 additions & 5 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"net"
"net/http"
"reflect"
"strconv"
"time"

"github.com/google/cel-go/cel"
"github.com/google/cel-go/checker/decls"
"github.com/google/cel-go/common/types"
"github.com/google/cel-go/common/types/ref"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/structpb"
Expand All @@ -42,10 +44,12 @@ var (
)

type handler struct {
metrics *inputMetrics
publisher stateless.Publisher
log *logp.Logger
validator apiValidator
metrics *inputMetrics
publisher stateless.Publisher
log *logp.Logger
validator apiValidator
txBaseID string // Random value to make transaction IDs unique.
txIDCounter *atomic.Uint64 // Transaction ID counter that is incremented for each request.

reqLogger *zap.Logger
host, scheme string
Expand Down Expand Up @@ -185,9 +189,11 @@ func (h *handler) logRequest(r *http.Request, status int, respBody []byte) {
zap.ByteString("http.response.body.content", respBody),
)
}
txID := h.nextTxID()
h.log.Debugw("new request trace transaction", "id", txID)
// Limit request logging body size to 10kiB.
const maxBodyLen = 10 * (1 << 10)
httplog.LogRequest(h.reqLogger, r, maxBodyLen, extra...)
httplog.LogRequest(h.reqLogger.With(zap.String("transaction.id", txID)), r, maxBodyLen, extra...)
if scheme != "" {
r.URL.Scheme = scheme
}
Expand All @@ -196,6 +202,15 @@ func (h *handler) logRequest(r *http.Request, status int, respBody []byte) {
}
}

func (h *handler) nextTxID() string {
count := h.txIDCounter.Inc()
return h.formatTxID(count)
}

func (h *handler) formatTxID(count uint64) string {
return h.txBaseID + "-" + strconv.FormatUint(count, 10)
}

func (h *handler) sendResponse(w http.ResponseWriter, status int, message string) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)
Expand Down
15 changes: 14 additions & 1 deletion x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package http_endpoint
import (
"context"
"crypto/tls"
"encoding/base32"
"encoding/binary"
"errors"
"fmt"
"net"
Expand All @@ -18,6 +20,7 @@ import (

"github.com/rcrowley/go-metrics"
"go.elastic.co/ecszap"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -297,7 +300,10 @@ func (s *server) getErr() error {

func newHandler(ctx context.Context, c config, prg *program, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler {
h := &handler{
log: log,
log: log,
txBaseID: newID(),
txIDCounter: atomic.NewUint64(0),

publisher: pub,
metrics: metrics,
validator: apiValidator{
Expand Down Expand Up @@ -344,6 +350,13 @@ func newHandler(ctx context.Context, c config, prg *program, pub stateless.Publi
return h
}

// newID returns an ID derived from the current time.
func newID() string {
var data [8]byte
binary.LittleEndian.PutUint64(data[:], uint64(time.Now().UnixNano()))
return base32.HexEncoding.WithPadding(base32.NoPadding).EncodeToString(data[:])
}

// inputMetrics handles the input's metric reporting.
type inputMetrics struct {
unregister func()
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func newNetHTTPClient(ctx context.Context, cfg *requestConfig, log *logp.Logger,
if maxSize < 0 {
maxSize = 0
}
netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger, maxSize)
netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger, maxSize, log)
}

if reg != nil {
Expand Down
17 changes: 11 additions & 6 deletions x-pack/filebeat/input/internal/httplog/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strconv"
"time"

"github.com/elastic/elastic-agent-libs/logp"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -31,24 +32,26 @@ const TraceIDKey = contextKey("trace.id")
type contextKey string

// NewLoggingRoundTripper returns a LoggingRoundTripper that logs requests and
// responses to the provided logger.
func NewLoggingRoundTripper(next http.RoundTripper, logger *zap.Logger, maxBodyLen int) *LoggingRoundTripper {
// responses to the provided logger. Transaction creation is logged to log.
func NewLoggingRoundTripper(next http.RoundTripper, logger *zap.Logger, maxBodyLen int, log *logp.Logger) *LoggingRoundTripper {
return &LoggingRoundTripper{
transport: next,
maxBodyLen: maxBodyLen,
logger: logger,
txLog: logger,
txBaseID: newID(),
txIDCounter: atomic.NewUint64(0),
log: log,
}
}

// LoggingRoundTripper is an http.RoundTripper that logs requests and responses.
type LoggingRoundTripper struct {
transport http.RoundTripper
maxBodyLen int // The maximum length of a body. Longer bodies will be truncated.
logger *zap.Logger // Destination logger.
txLog *zap.Logger // Destination logger.
txBaseID string // Random value to make transaction IDs unique.
txIDCounter *atomic.Uint64 // Transaction ID counter that is incremented for each request.
log *logp.Logger
}

// RoundTrip implements the http.RoundTripper interface, logging
Expand Down Expand Up @@ -80,8 +83,10 @@ type LoggingRoundTripper struct {
// event.original (the response without body from httputil.DumpResponse)
func (rt *LoggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// Create a child logger for this request.
log := rt.logger.With(
zap.String("transaction.id", rt.nextTxID()),
txID := rt.nextTxID()
rt.log.Debugw("new request trace transaction", "id", txID)
log := rt.txLog.With(
zap.String("transaction.id", txID),
)

if v := req.Context().Value(TraceIDKey); v != nil {
Expand Down

0 comments on commit dafec8d

Please sign in to comment.