diff --git a/internal/pkg/action/dispatcher.go b/internal/pkg/action/dispatcher.go index 59fb51a3c..ce149d281 100644 --- a/internal/pkg/action/dispatcher.go +++ b/internal/pkg/action/dispatcher.go @@ -15,9 +15,9 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" - "golang.org/x/time/rate" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" + "golang.org/x/time/rate" ) // Sub is an action subscription that will give a single agent all of it's actions. @@ -84,7 +84,7 @@ func (d *Dispatcher) Subscribe(agentID string, seqNo sqn.SeqNo) *Sub { sz := len(d.subs) d.mx.Unlock() - log.Trace().Str(logger.AgentID, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher") + zerolog.Ctx(context.TODO()).Trace().Str(logger.AgentID, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher") return &sub } @@ -101,7 +101,7 @@ func (d *Dispatcher) Unsubscribe(sub *Sub) { sz := len(d.subs) d.mx.Unlock() - log.Trace().Str(logger.AgentID, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher") + zerolog.Ctx(context.TODO()).Trace().Str(logger.AgentID, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher") } // process gathers actions from the monitor and dispatches them to the corresponding subscriptions. @@ -114,14 +114,14 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) { var action model.Action err := hit.Unmarshal(&action) if err != nil { - log.Error().Err(err).Msg("Failed to unmarshal action document") + zerolog.Ctx(ctx).Error().Err(err).Msg("Failed to unmarshal action document") break } numAgents := len(action.Agents) for i, agentID := range action.Agents { arr := agentActions[agentID] actionNoAgents := action - actionNoAgents.StartTime = offsetStartTime(action.StartTime, action.RolloutDurationSeconds, i, numAgents) + actionNoAgents.StartTime = offsetStartTime(ctx, action.StartTime, action.RolloutDurationSeconds, i, numAgents) actionNoAgents.Agents = nil arr = append(arr, actionNoAgents) agentActions[agentID] = arr @@ -130,7 +130,7 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) { for agentID, actions := range agentActions { if err := d.limit.Wait(ctx); err != nil { - log.Error().Err(err).Msg("action dispatcher rate limit error") + zerolog.Ctx(ctx).Error().Err(err).Msg("action dispatcher rate limit error") return } d.dispatch(ctx, agentID, actions) @@ -139,14 +139,14 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) { // offsetStartTime will return a new start time between start:start+dur based on index i and the total number of agents // As we expect i < total the latest return time will always be < start+dur -func offsetStartTime(start string, dur int64, i, total int) string { +func offsetStartTime(ctx context.Context, start string, dur int64, i, total int) string { if start == "" { return "" } startTS, err := time.Parse(time.RFC3339, start) if err != nil { - log.Error().Err(err).Msg("unable to parse start_time string") + zerolog.Ctx(ctx).Error().Err(err).Msg("unable to parse start_time string") return "" } d := time.Second * time.Duration(dur) @@ -164,10 +164,10 @@ func (d *Dispatcher) getSub(agentID string) (Sub, bool) { // dispatch passes the actions into the subscription channel as a non-blocking operation. // It may drop actions that will be re-sent to the agent on its next check in. -func (d *Dispatcher) dispatch(_ context.Context, agentID string, acdocs []model.Action) { +func (d *Dispatcher) dispatch(ctx context.Context, agentID string, acdocs []model.Action) { sub, ok := d.getSub(agentID) if !ok { - log.Debug().Str(logger.AgentID, agentID).Msg("Agent is not currently connected. Not dispatching actions.") + zerolog.Ctx(ctx).Debug().Str(logger.AgentID, agentID).Msg("Agent is not currently connected. Not dispatching actions.") return } select { diff --git a/internal/pkg/action/dispatcher_test.go b/internal/pkg/action/dispatcher_test.go index 6fd152eb2..559a11ac0 100644 --- a/internal/pkg/action/dispatcher_test.go +++ b/internal/pkg/action/dispatcher_test.go @@ -366,7 +366,7 @@ func Test_offsetStartTime(t *testing.T) { }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := offsetStartTime(tt.start, tt.dur, tt.i, tt.total) + r := offsetStartTime(context.Background(), tt.start, tt.dur, tt.i, tt.total) assert.Equal(t, tt.result, r) }) } diff --git a/internal/pkg/action/token_resolver.go b/internal/pkg/action/token_resolver.go index a3625fb3f..88b4a9888 100644 --- a/internal/pkg/action/token_resolver.go +++ b/internal/pkg/action/token_resolver.go @@ -11,7 +11,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/dl" lru "github.com/hashicorp/golang-lru/v2" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) const cacheSize = 5000 @@ -43,7 +43,7 @@ func (r *TokenResolver) Resolve(ctx context.Context, token string) (int64, error return 0, dl.ErrNotFound } if v, ok := r.cache.Get(token); ok { - log.Debug().Str("token", token).Int64("seqno", v).Msg("Found token cached") + zerolog.Ctx(ctx).Debug().Str("token", token).Int64("seqno", v).Msg("Found token cached") return v, nil } diff --git a/internal/pkg/api/handleFileDelivery.go b/internal/pkg/api/handleFileDelivery.go index 62c6b63db..45babb8f0 100644 --- a/internal/pkg/api/handleFileDelivery.go +++ b/internal/pkg/api/handleFileDelivery.go @@ -5,6 +5,7 @@ package api import ( + "context" "errors" "net/http" "strconv" @@ -16,7 +17,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/go-elasticsearch/v8" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" ) type FileDeliveryT struct { @@ -28,7 +28,7 @@ type FileDeliveryT struct { } func NewFileDeliveryT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *FileDeliveryT { - log.Info(). + zerolog.Ctx(context.TODO()).Info(). Interface("limits", cfg.Limits.ArtifactLimit). Int64("maxFileSize", maxFileSize). Msg("upload limits") diff --git a/internal/pkg/api/handleUpload.go b/internal/pkg/api/handleUpload.go index 0d5115c80..f5ce0f49d 100644 --- a/internal/pkg/api/handleUpload.go +++ b/internal/pkg/api/handleUpload.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/go-elasticsearch/v8" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "go.elastic.co/apm/v2" ) @@ -54,7 +53,7 @@ type UploadT struct { } func NewUploadT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *UploadT { - log.Info(). + zerolog.Ctx(context.TODO()).Info(). Interface("limits", cfg.Limits.ArtifactLimit). Int64("maxFileSize", maxFileSize). Msg("upload limits") diff --git a/internal/pkg/api/metrics.go b/internal/pkg/api/metrics.go index f3b7bf566..54c050be5 100644 --- a/internal/pkg/api/metrics.go +++ b/internal/pkg/api/metrics.go @@ -16,7 +16,7 @@ import ( "github.com/elastic/elastic-agent-system-metrics/report" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" apmprometheus "go.elastic.co/apm/module/apmprometheus/v2" "go.elastic.co/apm/v2" @@ -55,7 +55,7 @@ var ( func init() { err := report.SetupMetrics(logger.NewZapStub("instance-metrics"), build.ServiceName, version.DefaultVersion) if err != nil { - log.Error().Err(err).Msg("unable to initialize metrics") + zerolog.Ctx(context.TODO()).Error().Err(err).Msg("unable to initialize metrics") } registry = newMetricsRegistry("http_server") diff --git a/internal/pkg/api/metrics_integration_test.go b/internal/pkg/api/metrics_integration_test.go index 999a72ea2..df090d9b1 100644 --- a/internal/pkg/api/metrics_integration_test.go +++ b/internal/pkg/api/metrics_integration_test.go @@ -13,6 +13,8 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/build" "github.com/elastic/fleet-server/v7/internal/pkg/config" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" + "github.com/stretchr/testify/require" ) @@ -29,6 +31,7 @@ func TestMetricsEndpoints(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = testlog.SetLogger(t).WithContext(ctx) srv, err := InitMetrics(ctx, cfg, bi, nil) require.NoError(t, err, "unable to start metrics server") diff --git a/internal/pkg/api/server.go b/internal/pkg/api/server.go index 0501213f8..4b3e4e9da 100644 --- a/internal/pkg/api/server.go +++ b/internal/pkg/api/server.go @@ -21,7 +21,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/policy" "go.elastic.co/apm/v2" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) type server struct { @@ -71,7 +71,7 @@ func (s *server) Run(ctx context.Context) error { IdleTimeout: idle, MaxHeaderBytes: mhbz, BaseContext: func(net.Listener) context.Context { return ctx }, - ErrorLog: errLogger(), + ErrorLog: errLogger(ctx), ConnState: diagConn, } @@ -82,13 +82,13 @@ func (s *server) Run(ctx context.Context) error { go func() { select { case <-ctx.Done(): - log.Debug().Msg("force server close on ctx.Done()") + zerolog.Ctx(ctx).Debug().Msg("force server close on ctx.Done()") err := srv.Close() if err != nil { - log.Error().Err(err).Msg("error while closing server") + zerolog.Ctx(ctx).Error().Err(err).Msg("error while closing server") } case <-forceCh: - log.Debug().Msg("go routine forced closed on exit") + zerolog.Ctx(ctx).Debug().Msg("go routine forced closed on exit") } }() @@ -102,7 +102,7 @@ func (s *server) Run(ctx context.Context) error { defer func() { err := ln.Close() if err != nil { - log.Warn().Err(err).Msg("server.Run: error while closing listener.") + zerolog.Ctx(ctx).Warn().Err(err).Msg("server.Run: error while closing listener.") } }() @@ -127,15 +127,15 @@ func (s *server) Run(ctx context.Context) error { ln = tls.NewListener(ln, srv.TLSConfig) } else { - log.Warn().Msg("Exposed over insecure HTTP; enablement of TLS is strongly recommended") + zerolog.Ctx(ctx).Warn().Msg("Exposed over insecure HTTP; enablement of TLS is strongly recommended") } errCh := make(chan error) baseCtx, cancel := context.WithCancel(ctx) defer cancel() - go func(_ context.Context, errCh chan error, ln net.Listener) { - log.Info().Msgf("Listening on %s", s.addr) + go func(ctx context.Context, errCh chan error, ln net.Listener) { + zerolog.Ctx(ctx).Info().Msgf("Listening on %s", s.addr) if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { errCh <- err } @@ -157,7 +157,7 @@ func diagConn(c net.Conn, s http.ConnState) { return } - log.Trace(). + zerolog.Ctx(context.TODO()).Trace(). Str("local", c.LocalAddr().String()). Str("remote", c.RemoteAddr().String()). Str("state", s.String()). @@ -173,31 +173,33 @@ func diagConn(c net.Conn, s http.ConnState) { } } -func wrapConnLimitter(_ context.Context, ln net.Listener, cfg *config.Server) net.Listener { +func wrapConnLimitter(ctx context.Context, ln net.Listener, cfg *config.Server) net.Listener { hardLimit := cfg.Limits.MaxConnections if hardLimit != 0 { - log.Info(). + zerolog.Ctx(ctx).Info(). Int("hardConnLimit", hardLimit). Msg("server hard connection limiter installed") ln = limit.Listener(ln, hardLimit) } else { - log.Info().Msg("server hard connection limiter disabled") + zerolog.Ctx(ctx).Info().Msg("server hard connection limiter disabled") } return ln } type stubLogger struct { + log zerolog.Logger } func (s *stubLogger) Write(p []byte) (n int, err error) { - log.Error().Bytes(logger.ECSMessage, p).Send() + s.log.Error().Bytes(logger.ECSMessage, p).Send() return len(p), nil } -func errLogger() *slog.Logger { - stub := &stubLogger{} +func errLogger(ctx context.Context) *slog.Logger { + log := zerolog.Ctx(ctx) + stub := &stubLogger{*log} return slog.New(stub, "", 0) } diff --git a/internal/pkg/apikey/apikey_integration_test.go b/internal/pkg/apikey/apikey_integration_test.go index d96aa8b54..e0a29c71f 100644 --- a/internal/pkg/apikey/apikey_integration_test.go +++ b/internal/pkg/apikey/apikey_integration_test.go @@ -11,6 +11,8 @@ import ( "errors" "testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" + "github.com/elastic/go-elasticsearch/v8" "github.com/gofrs/uuid" "github.com/google/go-cmp/cmp" @@ -32,6 +34,7 @@ const testFleetRoles = ` func TestRead_existingKey(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) cfg := elasticsearch.Config{ Username: "elastic", @@ -83,6 +86,7 @@ func TestRead_existingKey(t *testing.T) { func TestRead_noKey(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) cfg := elasticsearch.Config{ Username: "elastic", @@ -114,6 +118,7 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) cfg := elasticsearch.Config{ Username: "elastic", diff --git a/internal/pkg/bulk/bulk_integration_test.go b/internal/pkg/bulk/bulk_integration_test.go index b4c7f5a3d..1f742ee04 100644 --- a/internal/pkg/bulk/bulk_integration_test.go +++ b/internal/pkg/bulk/bulk_integration_test.go @@ -16,12 +16,11 @@ import ( "testing" "github.com/elastic/fleet-server/v7/internal/pkg/es" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" "github.com/google/go-cmp/cmp" ) -// NOTE attempting to use testing/log here will cause the race detector to fail - func TestBulkCreate(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() @@ -94,6 +93,7 @@ func TestBulkCreate(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) sample := NewRandomSample() sampleData := sample.marshal(t) @@ -164,6 +164,7 @@ func TestBulkCreateBody(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) _, err := bulker.Create(ctx, index, "", test.Body) if !EqualElastic(test.Err, err) { t.Fatalf("expected: %v, got: %v", test.Err, err) @@ -178,6 +179,7 @@ func TestBulkCreateBody(t *testing.T) { func TestBulkIndex(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := SetupIndexWithBulk(ctx, t, testPolicy, WithFlushThresholdCount(1)) @@ -201,6 +203,7 @@ func TestBulkIndex(t *testing.T) { func TestBulkUpdate(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := SetupIndexWithBulk(ctx, t, testPolicy) @@ -239,6 +242,7 @@ func TestBulkUpdate(t *testing.T) { func TestBulkSearch(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := SetupIndexWithBulk(ctx, t, testPolicy) @@ -281,6 +285,7 @@ func TestBulkSearch(t *testing.T) { func TestBulkDelete(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := SetupIndexWithBulk(ctx, t, testPolicy) @@ -318,6 +323,7 @@ func benchmarkCreate(n int, b *testing.B) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(b).WithContext(ctx) index, bulker := SetupIndexWithBulk(ctx, b, testPolicy, WithFlushThresholdCount(n)) @@ -373,6 +379,7 @@ func benchmarkCRUD(n int, b *testing.B) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(b).WithContext(ctx) index, bulker := SetupIndexWithBulk(ctx, b, testPolicy, WithFlushThresholdCount(n)) diff --git a/internal/pkg/bulk/bulk_test.go b/internal/pkg/bulk/bulk_test.go index 6cd5eb67a..7c493f3d6 100644 --- a/internal/pkg/bulk/bulk_test.go +++ b/internal/pkg/bulk/bulk_test.go @@ -17,7 +17,6 @@ import ( "time" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" - "github.com/rs/zerolog/log" ) // TODO: @@ -255,6 +254,7 @@ func TestCancelCtx(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { ctx, cancelF := context.WithCancel(context.Background()) + ctx = testlog.SetLogger(t).WithContext(ctx) var wg sync.WaitGroup wg.Add(1) @@ -274,12 +274,11 @@ func TestCancelCtx(t *testing.T) { func benchmarkMockBulk(b *testing.B, samples [][]byte) { b.ReportAllocs() - log.Logger = testlog.SetLogger(b) - mock := &mockBulkTransport{} ctx, cancelF := context.WithCancel(context.Background()) defer cancelF() + ctx = testlog.SetLogger(b).WithContext(ctx) n := len(samples) bulker := NewBulker(mock, nil, WithFlushThresholdCount(n)) diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index d2a2da559..c6bd8f359 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -18,7 +18,7 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" "go.elastic.co/apm/v2" "golang.org/x/sync/semaphore" ) @@ -170,7 +170,7 @@ func blkToQueueType(blk *bulkT) queueType { func (b *Bulker) Run(ctx context.Context) error { var err error - log.Info().Interface("opts", &b.opts).Msg("Run bulker with options") + zerolog.Ctx(ctx).Info().Interface("opts", &b.opts).Msg("Run bulker with options") // Create timer in stopped state timer := time.NewTimer(b.opts.flushInterval) @@ -242,7 +242,7 @@ func (b *Bulker) Run(ctx context.Context) error { // Threshold test, short circuit timer on pending count if itemCnt >= b.opts.flushThresholdCnt || byteCnt >= b.opts.flushThresholdSz { - log.Trace(). + zerolog.Ctx(ctx).Trace(). Str("mod", kModBulk). Int("itemCnt", itemCnt). Int("byteCnt", byteCnt). @@ -254,7 +254,7 @@ func (b *Bulker) Run(ctx context.Context) error { } case <-timer.C: - log.Trace(). + zerolog.Ctx(ctx).Trace(). Str("mod", kModBulk). Int("itemCnt", itemCnt). Int("byteCnt", byteCnt). @@ -272,7 +272,7 @@ func (b *Bulker) Run(ctx context.Context) error { func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue queueT) error { start := time.Now() - log.Trace(). + zerolog.Ctx(ctx).Trace(). Str("mod", kModBulk). Int("cnt", queue.cnt). Int("szPending", queue.pending). @@ -283,7 +283,7 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu return err } - log.Trace(). + zerolog.Ctx(ctx).Trace(). Str("mod", kModBulk). Int("cnt", queue.cnt). Dur("tdiff", time.Since(start)). @@ -321,7 +321,7 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu apm.CaptureError(ctx, err).Send() } - log.Trace(). + zerolog.Ctx(ctx).Trace(). Err(err). Str("mod", kModBulk). Int("cnt", queue.cnt). @@ -408,7 +408,7 @@ func (b *Bulker) dispatch(ctx context.Context, blk *bulkT) respT { select { case b.ch <- blk: case <-ctx.Done(): - log.Error(). + zerolog.Ctx(ctx).Error(). Err(ctx.Err()). Str("mod", kModBulk). Str("action", blk.action.String()). @@ -421,7 +421,7 @@ func (b *Bulker) dispatch(ctx context.Context, blk *bulkT) respT { // Wait for response select { case resp := <-blk.ch: - log.Trace(). + zerolog.Ctx(ctx).Trace(). Err(resp.err). Str("mod", kModBulk). Str("action", blk.action.String()). @@ -431,7 +431,7 @@ func (b *Bulker) dispatch(ctx context.Context, blk *bulkT) respT { return resp case <-ctx.Done(): - log.Error(). + zerolog.Ctx(ctx).Error(). Err(ctx.Err()). Str("mod", kModBulk). Str("action", blk.action.String()). diff --git a/internal/pkg/bulk/helpers.go b/internal/pkg/bulk/helpers.go index 4eaa91c20..ff2922879 100644 --- a/internal/pkg/bulk/helpers.go +++ b/internal/pkg/bulk/helpers.go @@ -10,7 +10,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) type UpdateFields map[string]interface{} @@ -27,7 +27,11 @@ func (u UpdateFields) Marshal() ([]byte, error) { // Attempt to interpret the response as an elastic error, // otherwise return generic elastic error. -func parseError(res *esapi.Response) error { +func parseError(res *esapi.Response, log *zerolog.Logger) error { + if log == nil { + l := zerolog.Nop() + log = &l + } var e struct { Err json.RawMessage `json:"error"` diff --git a/internal/pkg/bulk/opApiKey.go b/internal/pkg/bulk/opApiKey.go index 377cd804f..ca3a50d16 100644 --- a/internal/pkg/bulk/opApiKey.go +++ b/internal/pkg/bulk/opApiKey.go @@ -12,7 +12,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/apikey" "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" "go.elastic.co/apm/v2" ) @@ -125,7 +125,7 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { metaMap := make(map[string]interface{}) dec := json.NewDecoder(bytes.NewReader(content)) if err := dec.Decode(&metaMap); err != nil { - log.Error(). + zerolog.Ctx(ctx).Error(). Err(err). Str("mod", kModBulk). Msg("Failed to unmarshal api key update meta map") @@ -134,7 +134,7 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { var req *apiKeyUpdateRequest if err := dec.Decode(&req); err != nil { - log.Error(). + zerolog.Ctx(ctx).Error(). Err(err). Str("mod", kModBulk). Str("request", string(content)). @@ -176,8 +176,8 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { idsPerBatch := b.getIDsCountPerBatch(len(role), maxKeySize) ids := idsPerRole[hash] if idsPerBatch <= 0 { - log.Error().Str("error.message", "request too large").Msg("No API Key ID could fit request size for bulk update") - log.Debug(). + zerolog.Ctx(ctx).Error().Str("error.message", "request too large").Msg("No API Key ID could fit request size for bulk update") + zerolog.Ctx(ctx).Debug(). RawJSON("role", role). Strs("ids", ids). Msg("IDs could not fit into a message") @@ -217,18 +217,18 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { res, err := req.Do(ctx, b.es) if err != nil { - log.Error().Err(err).Msg("Error sending bulk API Key update request to Elasticsearch") + zerolog.Ctx(ctx).Error().Err(err).Msg("Error sending bulk API Key update request to Elasticsearch") return err } if res.Body != nil { defer res.Body.Close() } if res.IsError() { - log.Error().Str("error.message", res.String()).Msg("Error in bulk API Key update result to Elasticsearch") - return parseError(res) + zerolog.Ctx(ctx).Error().Str("error.message", res.String()).Msg("Error in bulk API Key update result to Elasticsearch") + return parseError(res, zerolog.Ctx(ctx)) } - log.Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("API Keys updated.") + zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("API Keys updated.") responses[responseIdx] = res.StatusCode for _, id := range idsInBatch { diff --git a/internal/pkg/bulk/opBulk.go b/internal/pkg/bulk/opBulk.go index 9d2afd2d3..58ffc6728 100644 --- a/internal/pkg/bulk/opBulk.go +++ b/internal/pkg/bulk/opBulk.go @@ -13,7 +13,7 @@ import ( "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/mailru/easyjson" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" "go.elastic.co/apm/v2" "github.com/elastic/fleet-server/v7/internal/pkg/es" @@ -209,7 +209,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { res, err := req.Do(ctx, b.es) if err != nil { - log.Error().Err(err).Str("mod", kModBulk).Msg("Fail BulkRequest req.Do") + zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("Fail BulkRequest req.Do") return err } @@ -218,8 +218,8 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { } if res.IsError() { - log.Error().Str("mod", kModBulk).Str("error.message", res.String()).Msg("Fail BulkRequest result") - return parseError(res) + zerolog.Ctx(ctx).Error().Str("mod", kModBulk).Str("error.message", res.String()).Msg("Fail BulkRequest result") + return parseError(res, zerolog.Ctx(ctx)) } // Reuse buffer @@ -227,7 +227,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { bodySz, err := buf.ReadFrom(res.Body) if err != nil { - log.Error(). + zerolog.Ctx(ctx).Error(). Err(err). Str("mod", kModBulk). Msg("Response error") @@ -240,7 +240,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { // TODO: We're loosing information abut the errors, we should check a way // to return the full error ES returns if err = easyjson.Unmarshal(buf.Bytes(), &blk); err != nil { - log.Err(err). + zerolog.Ctx(ctx).Error().Err(err). Str("mod", kModBulk). Msg("flushBulk failed, could not unmarshal ES response") return fmt.Errorf("flushBulk failed, could not unmarshal ES response: %w", err) @@ -248,10 +248,10 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { if blk.HasErrors { // We lack information to properly correlate this error with what has failed. // Thus, for now it'd be more noise than information outside an investigation. - log.Debug().Err(errors.New(buf.String())).Msg("Bulk call: Es returned an error") + zerolog.Ctx(ctx).Debug().Err(errors.New(buf.String())).Msg("Bulk call: Es returned an error") } - log.Trace(). + zerolog.Ctx(ctx).Trace(). Err(err). Bool("refresh", queue.ty == kQueueRefreshBulk). Str("mod", kModBulk). diff --git a/internal/pkg/bulk/opMulti_integration_test.go b/internal/pkg/bulk/opMulti_integration_test.go index 556761887..6390ac28c 100644 --- a/internal/pkg/bulk/opMulti_integration_test.go +++ b/internal/pkg/bulk/opMulti_integration_test.go @@ -13,18 +13,17 @@ import ( "time" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" - "github.com/rs/zerolog/log" ) // This runs a series of CRUD operations through elastic. // Not a particularly useful benchmark, but gives some idea of memory overhead. func benchmarkMultiUpdate(n int, b *testing.B) { - log.Logger = testlog.SetLogger(b) b.ReportAllocs() ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(b).WithContext(ctx) index, bulker := SetupIndexWithBulk(ctx, b, testPolicy, WithFlushThresholdCount(n), WithFlushInterval(time.Millisecond*10)) diff --git a/internal/pkg/bulk/opMulti_test.go b/internal/pkg/bulk/opMulti_test.go index b05289518..5d080eac1 100644 --- a/internal/pkg/bulk/opMulti_test.go +++ b/internal/pkg/bulk/opMulti_test.go @@ -10,15 +10,12 @@ import ( "testing" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" - "github.com/rs/zerolog/log" ) const payload = `{"_id" : "1", "_index" : "test"}` // Test throughput of creating multiOps func BenchmarkMultiUpdateMock(b *testing.B) { - log.Logger = testlog.SetLogger(b) - // Allocate, but don't run. Stub the client. bulker := NewBulker(nil, nil) defer close(bulker.ch) @@ -54,7 +51,7 @@ func BenchmarkMultiUpdateMock(b *testing.B) { for _, n := range benchmarks { b.Run(strconv.Itoa(n), func(b *testing.B) { b.ReportAllocs() - ctx := context.Background() + ctx := testlog.SetLogger(b).WithContext(context.Background()) for i := 0; i < b.N; i++ { if _, err := bulker.MUpdate(ctx, ops[:n]); err != nil { b.Fatal(err) diff --git a/internal/pkg/bulk/opRead.go b/internal/pkg/bulk/opRead.go index 2e43df535..4c4240e52 100644 --- a/internal/pkg/bulk/opRead.go +++ b/internal/pkg/bulk/opRead.go @@ -12,7 +12,7 @@ import ( "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/mailru/easyjson" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" "go.elastic.co/apm/v2" ) @@ -101,7 +101,7 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error { res, err := req.Do(ctx, b.es) if err != nil { - log.Warn().Err(err).Str("mod", kModBulk).Msg("bulker.flushRead: Error sending mget request to Elasticsearch") + zerolog.Ctx(ctx).Warn().Err(err).Str("mod", kModBulk).Msg("bulker.flushRead: Error sending mget request to Elasticsearch") return err } @@ -110,8 +110,8 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error { } if res.IsError() { - log.Warn().Str("mod", kModBulk).Str("error.message", res.String()).Msg("bulker.flushRead: Error in mget request result to Elasticsearch") - return parseError(res) + zerolog.Ctx(ctx).Warn().Str("mod", kModBulk).Str("error.message", res.String()).Msg("bulker.flushRead: Error in mget request result to Elasticsearch") + return parseError(res, zerolog.Ctx(ctx)) } // Reuse buffer @@ -119,7 +119,7 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error { bodySz, err := buf.ReadFrom(res.Body) if err != nil { - log.Error().Err(err).Str("mod", kModBulk).Msg("Response error") + zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("Response error") } // prealloc slice @@ -127,11 +127,11 @@ func (b *Bulker) flushRead(ctx context.Context, queue queueT) error { blk.Items = make([]MgetResponseItem, 0, queueCnt) if err = easyjson.Unmarshal(buf.Bytes(), &blk); err != nil { - log.Error().Err(err).Str("mod", kModBulk).Msg("Unmarshal error") + zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("Unmarshal error") return err } - log.Trace(). + zerolog.Ctx(ctx).Trace(). Err(err). Bool("refresh", refresh). Str("mod", kModBulk). diff --git a/internal/pkg/bulk/opSearch.go b/internal/pkg/bulk/opSearch.go index 7cdf5f570..16f9a8f58 100644 --- a/internal/pkg/bulk/opSearch.go +++ b/internal/pkg/bulk/opSearch.go @@ -15,7 +15,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/sqn" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/mailru/easyjson" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" "go.elastic.co/apm/v2" ) @@ -169,8 +169,8 @@ func (b *Bulker) flushSearch(ctx context.Context, queue queueT) error { } if res.IsError() { - log.Warn().Str("mod", kModBulk).Str("error.message", res.String()).Msg("bulker.flushSearch: Fail writeMsearchBody") - return parseError(res) + zerolog.Ctx(ctx).Warn().Str("mod", kModBulk).Str("error.message", res.String()).Msg("bulker.flushSearch: Fail writeMsearchBody") + return parseError(res, zerolog.Ctx(ctx)) } // Reuse buffer @@ -178,7 +178,7 @@ func (b *Bulker) flushSearch(ctx context.Context, queue queueT) error { bodySz, err := buf.ReadFrom(res.Body) if err != nil { - log.Error().Err(err).Str("mod", kModBulk).Msg("MsearchResponse error") + zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("MsearchResponse error") return err } @@ -187,11 +187,11 @@ func (b *Bulker) flushSearch(ctx context.Context, queue queueT) error { blk.Responses = make([]MsearchResponseItem, 0, queueCnt) if err = easyjson.Unmarshal(buf.Bytes(), &blk); err != nil { - log.Error().Err(err).Str("mod", kModBulk).Msg("Unmarshal error") + zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("Unmarshal error") return err } - log.Trace(). + zerolog.Ctx(ctx).Trace(). Err(err). Str("mod", kModBulk). Dur("rtt", time.Since(start)). diff --git a/internal/pkg/cache/cache.go b/internal/pkg/cache/cache.go index d0128bc2f..ade556957 100644 --- a/internal/pkg/cache/cache.go +++ b/internal/pkg/cache/cache.go @@ -6,12 +6,13 @@ package cache import ( + "context" "fmt" "math/rand" "sync" "time" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" "github.com/elastic/fleet-server/v7/internal/pkg/config" @@ -105,7 +106,7 @@ func (c *CacheT) SetAction(action model.Action) { cost := len(action.ActionID) + len(action.Type) ttl := c.cfg.ActionTTL ok := c.cache.SetWithTTL(scopedKey, v, int64(cost), ttl) - log.Trace(). + zerolog.Ctx(context.TODO()).Trace(). Bool("ok", ok). Str("id", action.ActionID). Int("cost", cost). @@ -120,6 +121,7 @@ func (c *CacheT) GetAction(id string) (model.Action, bool) { c.mut.RLock() defer c.mut.RUnlock() + log := zerolog.Ctx(context.TODO()) scopedKey := "action:" + id if v, ok := c.cache.Get(scopedKey); ok { log.Trace().Str("id", id).Msg("Action cache HIT") @@ -166,7 +168,7 @@ func (c *CacheT) SetAPIKey(key APIKey, enabled bool) { cost := len(scopedKey) + len(val) ok := c.cache.SetWithTTL(scopedKey, val, int64(cost), ttl) - log.Trace(). + zerolog.Ctx(context.TODO()).Trace(). Bool("ok", ok). Bool("enabled", enabled). Str("key", key.ID). @@ -180,6 +182,7 @@ func (c *CacheT) ValidAPIKey(key APIKey) bool { c.mut.RLock() defer c.mut.RUnlock() + log := zerolog.Ctx(context.TODO()) scopedKey := "api:" + key.ID v, ok := c.cache.Get(scopedKey) if ok { @@ -203,6 +206,7 @@ func (c *CacheT) GetEnrollmentAPIKey(id string) (model.EnrollmentAPIKey, bool) { c.mut.RLock() defer c.mut.RUnlock() + log := zerolog.Ctx(context.TODO()) scopedKey := "record:" + id if v, ok := c.cache.Get(scopedKey); ok { log.Trace().Str("id", id).Msg("Enrollment cache HIT") @@ -227,7 +231,7 @@ func (c *CacheT) SetEnrollmentAPIKey(id string, key model.EnrollmentAPIKey, cost scopedKey := "record:" + id ttl := c.cfg.EnrollKeyTTL ok := c.cache.SetWithTTL(scopedKey, key, cost, ttl) - log.Trace(). + zerolog.Ctx(context.TODO()).Trace(). Bool("ok", ok). Str("id", id). Int64("cost", cost). @@ -243,6 +247,7 @@ func (c *CacheT) GetArtifact(ident, sha2 string) (model.Artifact, bool) { c.mut.RLock() defer c.mut.RUnlock() + log := zerolog.Ctx(context.TODO()) scopedKey := makeArtifactKey(ident, sha2) if v, ok := c.cache.Get(scopedKey); ok { log.Trace().Str("key", scopedKey).Msg("Artifact cache HIT") @@ -270,7 +275,7 @@ func (c *CacheT) SetArtifact(artifact model.Artifact) { ttl := c.cfg.ArtifactTTL ok := c.cache.SetWithTTL(scopedKey, artifact, cost, ttl) - log.Trace(). + zerolog.Ctx(context.TODO()).Trace(). Bool("ok", ok). Str("key", scopedKey). Int64("cost", cost). @@ -287,7 +292,7 @@ func (c *CacheT) SetUpload(id string, info file.Info) { // cache cost for other entries use bytes as the unit. Add up the string lengths and the size of the int64s in the upload.Info struct, as a manual 'sizeof' cost := int64(len(info.ID) + len(info.DocID) + len(info.ActionID) + len(info.AgentID) + len(info.Source) + len(info.Status) + 8*4) ok := c.cache.SetWithTTL(scopedKey, info, cost, ttl) - log.Trace(). + zerolog.Ctx(context.TODO()).Trace(). Bool("ok", ok). Str("id", id). Int64("cost", cost). @@ -298,6 +303,7 @@ func (c *CacheT) GetUpload(id string) (file.Info, bool) { //nolint:dupl // a lit c.mut.RLock() defer c.mut.RUnlock() + log := zerolog.Ctx(context.TODO()) scopedKey := "upload:" + id if v, ok := c.cache.Get(scopedKey); ok { log.Trace().Str("id", id).Msg("upload info cache HIT") @@ -320,7 +326,7 @@ func (c *CacheT) SetPGPKey(id string, p []byte) { scopedKey := "pgp:" + id ttl := 30 * time.Minute // @todo: add to configurable ok := c.cache.SetWithTTL(scopedKey, p, int64(len(p)), ttl) - log.Trace(). + zerolog.Ctx(context.TODO()).Trace(). Bool("ok", ok). Str("id", id). Int("cost", len(p)). @@ -332,6 +338,7 @@ func (c *CacheT) GetPGPKey(id string) ([]byte, bool) { c.mut.RLock() defer c.mut.RUnlock() + log := zerolog.Ctx(context.TODO()) scopedKey := "pgp:" + id if v, ok := c.cache.Get(scopedKey); ok { log.Trace().Str("id", id).Msg("PGP key cache HIT") diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index 8eb67cba7..cab3caf3f 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -15,7 +15,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) const defaultFlushInterval = 10 * time.Second @@ -139,7 +139,7 @@ LOOP: select { case <-tick.C: if err = bc.flush(ctx); err != nil { - log.Error().Err(err).Msg("Eat bulk checkin error; Keep on truckin'") + zerolog.Ctx(ctx).Error().Err(err).Msg("Eat bulk checkin error; Keep on truckin'") } case <-ctx.Done(): @@ -251,7 +251,7 @@ func (bc *Bulk) flush(ctx context.Context) error { _, err = bc.bulker.MUpdate(ctx, updates, opts...) - log.Trace(). + zerolog.Ctx(ctx).Trace(). Err(err). Dur("rtt", time.Since(start)). Int("cnt", len(updates)). diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go index 8e5f38fa2..fd5b497b4 100644 --- a/internal/pkg/checkin/bulk_test.go +++ b/internal/pkg/checkin/bulk_test.go @@ -19,7 +19,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/rs/xid" - "github.com/rs/zerolog/log" "github.com/stretchr/testify/mock" ) @@ -180,7 +179,7 @@ func TestBulkSimple(t *testing.T) { for _, c := range cases { t.Run(c.desc, func(t *testing.T) { - log.Logger = testlog.SetLogger(t) + ctx := testlog.SetLogger(t).WithContext(context.Background()) mockBulk := ftesting.NewMockBulk() mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchOp(t, c, start)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once() bc := NewBulk(mockBulk) @@ -189,7 +188,7 @@ func TestBulkSimple(t *testing.T) { t.Fatal(err) } - if err := bc.flush(context.Background()); err != nil { + if err := bc.flush(ctx); err != nil { t.Fatal(err) } @@ -207,7 +206,7 @@ func validateTimestamp(tb testing.TB, start time.Time, ts string) { } func benchmarkBulk(n int, flush bool, b *testing.B) { - log.Logger = testlog.SetLogger(b) + ctx := testlog.SetLogger(b).WithContext(context.Background()) b.ReportAllocs() mockBulk := ftesting.NewMockBulk() @@ -231,7 +230,7 @@ func benchmarkBulk(n int, flush bool, b *testing.B) { } if flush { - err := bc.flush(context.Background()) + err := bc.flush(ctx) if err != nil { b.Fatal(err) } diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 50bde2c29..00c5c9cef 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -6,6 +6,7 @@ package config import ( + "context" "errors" "sync" @@ -14,7 +15,7 @@ import ( "github.com/elastic/go-ucfg/flag" "github.com/elastic/go-ucfg/yaml" "github.com/gofrs/uuid" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) // DefaultOptions defaults options used to read the configuration @@ -85,7 +86,7 @@ func (c *Config) LoadServerLimits() error { defer c.m.Unlock() err := c.Validate() if err != nil { - log.Error().Msgf("failed to validate while calculating limits, %s", err.Error()) + zerolog.Ctx(context.TODO()).Error().Err(err).Msgf("failed to validate while calculating limits") return err } @@ -198,7 +199,7 @@ func (c *Config) Redact() *Config { func checkDeprecatedOptions(deprecatedOpts map[string]string, c *ucfg.Config) { for opt, message := range deprecatedOpts { if c.HasField(opt) { - log.Warn().Msg(message) + zerolog.Ctx(context.TODO()).Warn().Msg(message) } } } diff --git a/internal/pkg/config/env_defaults.go b/internal/pkg/config/env_defaults.go index b93be0219..35c9fad88 100644 --- a/internal/pkg/config/env_defaults.go +++ b/internal/pkg/config/env_defaults.go @@ -5,6 +5,7 @@ package config import ( + "context" "embed" "fmt" "io" @@ -16,7 +17,7 @@ import ( "github.com/elastic/go-ucfg/yaml" "github.com/pbnjay/memory" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) const ( @@ -255,6 +256,7 @@ func loadLimitsForAgents(agentLimit int) *envLimits { if agentLimit == 0 { return defaultEnvLimits() } + log := zerolog.Ctx(context.TODO()) for _, l := range defaults { // get nearest limits for configured agent numbers if l.Agents.Min < agentLimit && agentLimit <= l.Agents.Max { diff --git a/internal/pkg/config/env_defaults_test.go b/internal/pkg/config/env_defaults_test.go index 00322ffa3..92cb718df 100644 --- a/internal/pkg/config/env_defaults_test.go +++ b/internal/pkg/config/env_defaults_test.go @@ -12,7 +12,7 @@ import ( testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" "github.com/stretchr/testify/require" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) func TestLoadLimits(t *testing.T) { @@ -31,7 +31,8 @@ func TestLoadLimits(t *testing.T) { for _, tc := range testCases { t.Run(tc.Name, func(t *testing.T) { - log.Logger = testlog.SetLogger(t) + log := testlog.SetLogger(t) + zerolog.DefaultContextLogger = &log l := loadLimitsForAgents(tc.ConfiguredAgentLimit) require.Equal(t, tc.ExpectedAgentLimit, l.Agents.Max) diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 7261e590a..692292fa2 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -15,7 +15,6 @@ import ( "time" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "go.elastic.co/apm/v2" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" @@ -47,8 +46,6 @@ type policyT struct { } type monitorT struct { - log zerolog.Logger - bulker bulk.Bulk monitor monitor.Monitor factory Factory @@ -77,7 +74,6 @@ type monitorT struct { // NewMonitor creates a new coordinator policy monitor. func NewMonitor(fleet config.Fleet, version string, bulker bulk.Bulk, monitor monitor.Monitor, factory Factory) Monitor { return &monitorT{ - log: log.With().Str("ctx", "policy leader manager").Logger(), version: version, fleet: fleet, bulker: bulker, @@ -98,12 +94,13 @@ func NewMonitor(fleet config.Fleet, version string, bulker bulk.Bulk, monitor mo // Run runs the monitor. func (m *monitorT) Run(ctx context.Context) (err error) { + log := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Logger() // When ID of the Agent is not provided to Fleet Server then the Agent // has not enrolled. The Fleet Server cannot become a leader until the // Agent it is running under has been enrolled. - m.calcMetadata() + m.calcMetadata(ctx) if m.agentMetadata.ID == "" { - m.log.Warn().Msg("missing config fleet.agent.id; acceptable until Elastic Agent has enrolled") + log.Warn().Msg("missing config fleet.agent.id; acceptable until Elastic Agent has enrolled") <-ctx.Done() return ctx.Err() } @@ -116,7 +113,7 @@ func (m *monitorT) Run(ctx context.Context) (err error) { for { err = m.ensureLeadership(ctx) if err != nil { - m.log.Warn().Err(err).Msg("error ensuring leadership, will retry") + log.Warn().Err(err).Msg("error ensuring leadership, will retry") select { case <-lT.C: lT.Reset(m.checkInterval) @@ -147,17 +144,17 @@ func (m *monitorT) Run(ctx context.Context) (err error) { if err != nil { erroredOnLastRequest = true numFailedRequests++ - m.log.Warn().Err(err).Msgf("Encountered an error while policy leadership changes; continuing to retry.") + log.Warn().Err(err).Msgf("Encountered an error while policy leadership changes; continuing to retry.") } case <-mT.C: - m.calcMetadata() + m.calcMetadata(ctx) mT.Reset(m.metadataInterval) case <-lT.C: err = m.ensureLeadership(ctx) if err != nil { erroredOnLastRequest = true numFailedRequests++ - m.log.Warn().Err(err).Msgf("Encountered an error while checking/assigning policy leaders; continuing to retry.") + log.Warn().Err(err).Msgf("Encountered an error while checking/assigning policy leaders; continuing to retry.") } lT.Reset(m.checkInterval) case <-ctx.Done(): @@ -166,7 +163,7 @@ func (m *monitorT) Run(ctx context.Context) (err error) { } if err == nil && erroredOnLastRequest { erroredOnLastRequest = false - m.log.Info().Msgf("Policy leader monitor successfully recovered after %d attempts", numFailedRequests) + log.Info().Msgf("Policy leader monitor successfully recovered after %d attempts", numFailedRequests) numFailedRequests = 0 } } @@ -174,12 +171,13 @@ func (m *monitorT) Run(ctx context.Context) (err error) { // handlePolicies handles new policies or policy changes. func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { + log := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Logger() new := false for _, hit := range hits { var policy model.Policy err := hit.Unmarshal(&policy) if err != nil { - m.log.Debug().Err(err).Msg("Failed to deserialize policy json") + log.Debug().Err(err).Msg("Failed to deserialize policy json") return err } if policy.CoordinatorIdx != 0 { @@ -193,7 +191,7 @@ func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { // current leader send to its coordinator err = p.cord.Update(ctx, policy) if err != nil { - m.log.Info().Err(err).Msg("Failed to update policy leader") + log.Info().Err(err).Msg("Failed to update policy leader") return err } } @@ -218,7 +216,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { ctx = apm.ContextWithTransaction(ctx, trans) defer trans.End() } - m.log.Debug().Msg("ensuring leadership of policies") + zerolog.Ctx(ctx).Debug().Str("ctx", "policy leader manager").Msg("ensuring leadership of policies") err := dl.EnsureServer(ctx, m.bulker, m.version, m.agentMetadata, m.hostMetadata, dl.WithIndexName(m.serversIndex)) if err != nil { @@ -230,7 +228,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { policies, err := dl.QueryLatestPolicies(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { - m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) + zerolog.Ctx(ctx).Debug().Str("ctx", "policy leader manager").Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) return nil } return fmt.Errorf("encountered error while querying policies: %w", err) @@ -278,7 +276,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { res <- pt }() - l := m.log.With().Str(dl.FieldPolicyID, pt.id).Logger() + l := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Str(dl.FieldPolicyID, pt.id).Logger() err := dl.TakePolicyLeadership(ctx, m.bulker, pt.id, m.agentMetadata.ID, m.version, dl.WithIndexName(m.leadersIndex)) if err != nil { l.Warn().Err(err).Msg("monitor.ensureLeadership: failed to take ownership") @@ -346,7 +344,7 @@ func (m *monitorT) releaseLeadership() { defer cancel() err := dl.ReleasePolicyLeadership(ctx, m.bulker, pt.id, m.agentMetadata.ID, m.leaderInterval, dl.WithIndexName(m.leadersIndex)) if err != nil { - l := m.log.With().Str(dl.FieldPolicyID, pt.id).Logger() + l := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Str(dl.FieldPolicyID, pt.id).Logger() l.Warn().Err(err).Msg("monitor.releaseLeadership: failed to release leadership") } wg.Done() @@ -355,7 +353,7 @@ func (m *monitorT) releaseLeadership() { wg.Wait() } -func (m *monitorT) calcMetadata() { +func (m *monitorT) calcMetadata(ctx context.Context) { m.agentMetadata = model.AgentMetadata{ ID: m.fleet.Agent.ID, Version: m.fleet.Agent.Version, @@ -364,13 +362,13 @@ func (m *monitorT) calcMetadata() { if hostname == "" { h, err := os.Hostname() if err != nil { - m.log.Err(err).Msg("failed to get hostname") + zerolog.Ctx(ctx).Error().Str("ctx", "policy leader manager").Err(err).Msg("failed to get hostname") } hostname = h } ips, err := m.getIPs() if err != nil { - m.log.Err(err).Msg("failed to get ip addresses") + zerolog.Ctx(ctx).Error().Str("ctx", "policy leader manager").Err(err).Msg("failed to get ip addresses") } m.hostMetadata = model.HostMetadata{ ID: m.fleet.Host.ID, diff --git a/internal/pkg/coordinator/monitor_integration_test.go b/internal/pkg/coordinator/monitor_integration_test.go index 902d1d708..8fa29b8ee 100644 --- a/internal/pkg/coordinator/monitor_integration_test.go +++ b/internal/pkg/coordinator/monitor_integration_test.go @@ -23,10 +23,12 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func TestMonitorLeadership(t *testing.T) { parentCtx := context.Background() + parentCtx = testlog.SetLogger(t).WithContext(parentCtx) bulkCtx, bulkCn := context.WithCancel(parentCtx) defer bulkCn() ctx, cn := context.WithCancel(parentCtx) diff --git a/internal/pkg/coordinator/v0.go b/internal/pkg/coordinator/v0.go index 28ec56918..dcf343168 100644 --- a/internal/pkg/coordinator/v0.go +++ b/internal/pkg/coordinator/v0.go @@ -7,17 +7,13 @@ package coordinator import ( "context" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - - "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" + + "github.com/rs/zerolog" ) // coordinatorZeroT is V0 coordinator that just takes a subscribed policy and outputs the same policy. type coordinatorZeroT struct { - log zerolog.Logger - policy model.Policy in chan model.Policy out chan model.Policy @@ -26,7 +22,6 @@ type coordinatorZeroT struct { // NewCoordinatorZero creates a V0 coordinator. func NewCoordinatorZero(policy model.Policy) (Coordinator, error) { return &coordinatorZeroT{ - log: log.With().Str("ctx", "coordinator v0").Str(logger.PolicyID, policy.PolicyID).Logger(), policy: policy, in: make(chan model.Policy), out: make(chan model.Policy), @@ -42,7 +37,7 @@ func (c *coordinatorZeroT) Name() string { func (c *coordinatorZeroT) Run(ctx context.Context) error { err := c.updatePolicy(c.policy) if err != nil { - c.log.Err(err).Msg("failed to handle policy") + zerolog.Ctx(ctx).Error().Err(err).Msg("failed to handle policy") } for { @@ -50,7 +45,7 @@ func (c *coordinatorZeroT) Run(ctx context.Context) error { case p := <-c.in: err = c.updatePolicy(p) if err != nil { - c.log.Err(err).Msg("failed to handle policy") + zerolog.Ctx(ctx).Error().Err(err).Msg("failed to handle policy") continue } case <-ctx.Done(): diff --git a/internal/pkg/dl/action_results.go b/internal/pkg/dl/action_results.go index 933a389a9..d4389af29 100644 --- a/internal/pkg/dl/action_results.go +++ b/internal/pkg/dl/action_results.go @@ -13,7 +13,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) func CreateActionResult(ctx context.Context, bulker bulk.Bulk, acr model.ActionResult) error { @@ -33,7 +33,7 @@ func createActionResult(ctx context.Context, bulker bulk.Bulk, index string, acr _, err = bulker.Create(ctx, index, id, body, bulk.WithRefresh()) // ignoring version conflict in case the same action result is tried to be created multiple times (unique id with actionID and agentID) if errors.Is(err, es.ErrElasticVersionConflict) { - log.Debug().Err(err).Str("id", id).Msg("action result already exists, ignoring") + zerolog.Ctx(ctx).Debug().Err(err).Str("id", id).Msg("action result already exists, ignoring") return nil } return err diff --git a/internal/pkg/dl/action_results_integration_test.go b/internal/pkg/dl/action_results_integration_test.go index a0dde5f59..6ad197c4c 100644 --- a/internal/pkg/dl/action_results_integration_test.go +++ b/internal/pkg/dl/action_results_integration_test.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" "github.com/elastic/fleet-server/v7/internal/pkg/testing/rnd" ) @@ -102,6 +103,7 @@ func (acrs ActionsResults) find(ar model.ActionResult) *model.ActionResult { func TestActionsResultsStored(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker, acrs := setupActionResults(ctx, t) diff --git a/internal/pkg/dl/actions.go b/internal/pkg/dl/actions.go index 4f5b0b445..717e4a2d2 100644 --- a/internal/pkg/dl/actions.go +++ b/internal/pkg/dl/actions.go @@ -16,7 +16,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) const ( @@ -155,7 +155,7 @@ func DeleteExpiredForIndex(ctx context.Context, index string, bulker bulk.Bulk, err = es.TranslateError(res.StatusCode, esres.Error) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { - log.Debug().Str("index", index).Msg(es.ErrIndexNotFound.Error()) + zerolog.Ctx(ctx).Debug().Str("index", index).Msg(es.ErrIndexNotFound.Error()) err = nil } return @@ -189,7 +189,7 @@ func findActionsHits(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, inde res, err := Search(ctx, bulker, tmpl, index, params, ops...) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { - log.Debug().Str("index", index).Msg(es.ErrIndexNotFound.Error()) + zerolog.Ctx(ctx).Debug().Str("index", index).Msg(es.ErrIndexNotFound.Error()) err = nil } return nil, err diff --git a/internal/pkg/dl/actions_integration_test.go b/internal/pkg/dl/actions_integration_test.go index 6d7bee11f..11bb1587d 100644 --- a/internal/pkg/dl/actions_integration_test.go +++ b/internal/pkg/dl/actions_integration_test.go @@ -15,11 +15,13 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/gcheckpt" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func TestSearchActionsQuery(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) now := time.Now().UTC() diff --git a/internal/pkg/dl/agent_integration_test.go b/internal/pkg/dl/agent_integration_test.go index 0684fe80d..8266cb7b6 100644 --- a/internal/pkg/dl/agent_integration_test.go +++ b/internal/pkg/dl/agent_integration_test.go @@ -19,10 +19,15 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func TestFindAgent_NewModel(t *testing.T) { - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + ctx, cn := context.WithCancel(context.Background()) + defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) + + index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetAgents) now := time.Now().UTC() nowStr := now.Format(time.RFC3339) @@ -55,11 +60,11 @@ func TestFindAgent_NewModel(t *testing.T) { require.NoError(t, err) _, err = bulker.Create( - context.Background(), index, agentID, body, bulk.WithRefresh()) + ctx, index, agentID, body, bulk.WithRefresh()) require.NoError(t, err) agent, err := FindAgent( - context.Background(), bulker, QueryAgentByID, FieldID, agentID, WithIndexName(index)) + ctx, bulker, QueryAgentByID, FieldID, agentID, WithIndexName(index)) require.NoError(t, err) assert.Equal(t, agentID, agent.Id) diff --git a/internal/pkg/dl/artifact.go b/internal/pkg/dl/artifact.go index 2f96e0380..abb1729a3 100644 --- a/internal/pkg/dl/artifact.go +++ b/internal/pkg/dl/artifact.go @@ -8,11 +8,10 @@ import ( "context" "encoding/json" - "github.com/rs/zerolog/log" - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dsl" "github.com/elastic/fleet-server/v7/internal/pkg/model" + "github.com/rs/zerolog" ) var ( @@ -54,7 +53,7 @@ func FindArtifact(ctx context.Context, bulker bulk.Bulk, ident, sha2 string) (*m } if len(res.Hits) > 1 { - log.Warn(). + zerolog.Ctx(ctx).Warn(). Str("ident", ident). Str("sha2", sha2). Int("cnt", len(res.Hits)). diff --git a/internal/pkg/dl/enrollment_api_key_integration_test.go b/internal/pkg/dl/enrollment_api_key_integration_test.go index 65444fdf7..39e064df9 100644 --- a/internal/pkg/dl/enrollment_api_key_integration_test.go +++ b/internal/pkg/dl/enrollment_api_key_integration_test.go @@ -19,6 +19,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func createRandomEnrollmentAPIKey(policyID string, active bool) model.EnrollmentAPIKey { @@ -54,6 +55,7 @@ func storeRandomEnrollmentAPIKey(ctx context.Context, bulker bulk.Bulk, index st func TestSearchEnrollmentAPIKeyByID(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetEnrollmentAPIKeys) @@ -86,6 +88,7 @@ func TestSearchEnrollmentAPIKeyByPolicyID(t *testing.T) { t.Skip("Flaky test see https://github.com/elastic/fleet-server/issues/1289") ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetEnrollmentAPIKeys) @@ -117,6 +120,7 @@ func TestSearchEnrollmentAPIKeyByPolicyID(t *testing.T) { func TestSearchEnrollmentAPIKeyByPolicyIDWithInactiveIDs(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetEnrollmentAPIKeys) diff --git a/internal/pkg/dl/migration.go b/internal/pkg/dl/migration.go index 4beebda79..82e20a67b 100644 --- a/internal/pkg/dl/migration.go +++ b/internal/pkg/dl/migration.go @@ -13,7 +13,7 @@ import ( "time" "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dsl" @@ -70,7 +70,7 @@ func migrate(ctx context.Context, bulker bulk.Bulk, fn migrationBodyFn) (int, er resp, err := applyMigration(ctx, name, index, bulker, body) if err != nil { - log.Err(err). + zerolog.Ctx(ctx).Err(err). Bytes("http.request.body.content", body). Msgf("migration %s failed", name) return updatedDocs, fmt.Errorf("failed to apply migration %q: %w", @@ -121,7 +121,7 @@ func applyMigration(ctx context.Context, name string, index string, bulker bulk. return migrationResponse{}, fmt.Errorf("decode UpdateByQuery response: %w", err) } - log.Info(). + zerolog.Ctx(ctx).Info(). Str("fleet.migration.name", name). Int("fleet.migration.es.took", resp.Took). Bool("fleet.migration.es.timed_out", resp.TimedOut). @@ -137,7 +137,7 @@ func applyMigration(ctx context.Context, name string, index string, bulker bulk. Msgf("migration %s done", name) for _, fail := range resp.Failures { - log.Error().RawJSON("failure", fail).Msgf("failed applying %s migration", name) + zerolog.Ctx(ctx).Error().RawJSON("failure", fail).Msgf("failed applying %s migration", name) } return resp, err @@ -145,7 +145,7 @@ func applyMigration(ctx context.Context, name string, index string, bulker bulk. // ============================== V7.15 migration ============================== func migrateTov7_15(ctx context.Context, bulker bulk.Bulk) error { - log.Debug().Msg("applying migration to v7.15") + zerolog.Ctx(ctx).Debug().Msg("applying migration to v7.15") _, err := migrate(ctx, bulker, migrateAgentMetadata) if err != nil { return fmt.Errorf("v7.15.0 data migration failed: %w", err) @@ -185,7 +185,7 @@ func migrateAgentMetadata() (string, string, []byte, error) { // https://github.com/elastic/fleet-server/issues/1672 func migrateToV8_5(ctx context.Context, bulker bulk.Bulk) error { - log.Debug().Msg("applying migration to v8.5.0") + zerolog.Ctx(ctx).Debug().Msg("applying migration to v8.5.0") migrated, err := migrate(ctx, bulker, migrateAgentOutputs) if err != nil { return fmt.Errorf("v8.5.0 data migration failed: %w", err) @@ -288,7 +288,7 @@ func migratePolicyCoordinatorIdx() (string, string, []byte, error) { body, err := query.MarshalJSON() if err != nil { - log.Debug().Str("painlessScript", painless). + zerolog.Ctx(context.TODO()).Debug().Str("painlessScript", painless). Msgf("%s: failed painless script", migrationName) return migrationName, FleetPolicies, nil, fmt.Errorf("could not marshal ES query: %w", err) } diff --git a/internal/pkg/dl/migration_integration_test.go b/internal/pkg/dl/migration_integration_test.go index fdbfd8a7e..cd1bc4506 100644 --- a/internal/pkg/dl/migration_integration_test.go +++ b/internal/pkg/dl/migration_integration_test.go @@ -21,11 +21,12 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) const nowStr = "2022-08-12T16:50:05Z" -func createSomeAgents(t *testing.T, n int, apiKey bulk.APIKey, index string, bulker bulk.Bulk) []string { +func createSomeAgents(ctx context.Context, t *testing.T, n int, apiKey bulk.APIKey, index string, bulker bulk.Bulk) []string { t.Helper() var createdAgents []string @@ -61,7 +62,7 @@ func createSomeAgents(t *testing.T, n int, apiKey bulk.APIKey, index string, bul require.NoError(t, err) _, err = bulker.Create( - context.Background(), index, agentID, body, bulk.WithRefresh()) + ctx, index, agentID, body, bulk.WithRefresh()) require.NoError(t, err) createdAgents = append(createdAgents, agentID) @@ -70,7 +71,7 @@ func createSomeAgents(t *testing.T, n int, apiKey bulk.APIKey, index string, bul return createdAgents } -func createSomePolicies(t *testing.T, n int, index string, bulker bulk.Bulk) []string { +func createSomePolicies(ctx context.Context, t *testing.T, n int, index string, bulker bulk.Bulk) []string { t.Helper() var created []string @@ -94,7 +95,7 @@ func createSomePolicies(t *testing.T, n int, index string, bulker bulk.Bulk) []s require.NoError(t, err) policyDocID, err := bulker.Create( - context.Background(), index, "", body, bulk.WithRefresh()) + ctx, index, "", body, bulk.WithRefresh()) require.NoError(t, err) created = append(created, policyDocID) @@ -104,18 +105,22 @@ func createSomePolicies(t *testing.T, n int, index string, bulker bulk.Bulk) []s } func TestPolicyCoordinatorIdx(t *testing.T) { - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetPolicies) + ctx, cn := context.WithCancel(context.Background()) + defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) - docIDs := createSomePolicies(t, 25, index, bulker) + index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPolicies) - migrated, err := migrate(context.Background(), bulker, migratePolicyCoordinatorIdx) + docIDs := createSomePolicies(ctx, t, 25, index, bulker) + + migrated, err := migrate(ctx, bulker, migratePolicyCoordinatorIdx) require.NoError(t, err) require.Equal(t, len(docIDs), migrated) for i := range docIDs { policies, err := QueryLatestPolicies( - context.Background(), bulker, WithIndexName(index)) + ctx, bulker, WithIndexName(index)) if err != nil { assert.NoError(t, err, "failed to query latest policies") // we want to continue even if a single agent fails continue @@ -133,21 +138,25 @@ func TestPolicyCoordinatorIdx(t *testing.T) { } func TestMigrateOutputs_withDefaultAPIKeyHistory(t *testing.T) { + ctx, cn := context.WithCancel(context.Background()) + defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) + now, err := time.Parse(time.RFC3339, nowStr) require.NoError(t, err, "could not parse time "+nowStr) timeNow = func() time.Time { return now } - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetAgents) apiKey := bulk.APIKey{ ID: "testAgent_", Key: "testAgent_key_", } - agentIDs := createSomeAgents(t, 25, apiKey, index, bulker) + agentIDs := createSomeAgents(ctx, t, 25, apiKey, index, bulker) - migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) + migratedAgents, err := migrate(ctx, bulker, migrateAgentOutputs) require.NoError(t, err) assert.Equal(t, len(agentIDs), migratedAgents) @@ -155,7 +164,7 @@ func TestMigrateOutputs_withDefaultAPIKeyHistory(t *testing.T) { for i, id := range agentIDs { wantOutputType := "elasticsearch" //nolint:goconst // test cases have some duplication - res, err := SearchWithOneParam(context.Background(), bulker, QueryAgentByID, index, FieldID, id) + res, err := SearchWithOneParam(ctx, bulker, QueryAgentByID, index, FieldID, id) require.NoError(t, err) require.Len(t, res.Hits, 1) @@ -221,31 +230,39 @@ func TestMigrateOutputs_withDefaultAPIKeyHistory(t *testing.T) { } func TestMigrateOutputs_dontMigrateTwice(t *testing.T) { + ctx, cn := context.WithCancel(context.Background()) + defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) + now, err := time.Parse(time.RFC3339, nowStr) require.NoError(t, err, "could not parse time "+nowStr) timeNow = func() time.Time { return now } - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetAgents) apiKey := bulk.APIKey{ ID: "testAgent_", Key: "testAgent_key_", } - agentIDs := createSomeAgents(t, 25, apiKey, index, bulker) + agentIDs := createSomeAgents(ctx, t, 25, apiKey, index, bulker) - migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) + migratedAgents, err := migrate(ctx, bulker, migrateAgentOutputs) require.NoError(t, err) assert.Equal(t, len(agentIDs), migratedAgents) - migratedAgents2, err := migrate(context.Background(), bulker, migrateAgentOutputs) + migratedAgents2, err := migrate(ctx, bulker, migrateAgentOutputs) require.NoError(t, err) assert.Equal(t, 0, migratedAgents2) } func TestMigrateOutputs_nil_DefaultAPIKeyHistory(t *testing.T) { + ctx, cn := context.WithCancel(context.Background()) + defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) + wantOutputType := "elasticsearch" now, err := time.Parse(time.RFC3339, nowStr) @@ -254,7 +271,7 @@ func TestMigrateOutputs_nil_DefaultAPIKeyHistory(t *testing.T) { return now } - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetAgents) apiKey := bulk.APIKey{ ID: "testAgent_", Key: "testAgent_key_", @@ -285,14 +302,14 @@ func TestMigrateOutputs_nil_DefaultAPIKeyHistory(t *testing.T) { require.NoError(t, err) _, err = bulker.Create( - context.Background(), index, agentID, body, bulk.WithRefresh()) + ctx, index, agentID, body, bulk.WithRefresh()) require.NoError(t, err) - migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) + migratedAgents, err := migrate(ctx, bulker, migrateAgentOutputs) require.NoError(t, err) res, err := SearchWithOneParam( - context.Background(), bulker, QueryAgentByID, index, FieldID, agentID) + ctx, bulker, QueryAgentByID, index, FieldID, agentID) require.NoError(t, err, "failed to find agent ID %q", agentID) require.Len(t, res.Hits, 1) @@ -343,15 +360,19 @@ func TestMigrateOutputs_nil_DefaultAPIKeyHistory(t *testing.T) { } func TestMigrateOutputs_no_agent_document(t *testing.T) { + ctx, cn := context.WithCancel(context.Background()) + defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) + now, err := time.Parse(time.RFC3339, nowStr) require.NoError(t, err, "could not parse time "+nowStr) timeNow = func() time.Time { return now } - _, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + _, bulker := ftesting.SetupCleanIndex(ctx, t, FleetAgents) - migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) + migratedAgents, err := migrate(ctx, bulker, migrateAgentOutputs) require.NoError(t, err) assert.Equal(t, 0, migratedAgents) diff --git a/internal/pkg/dl/policies_integration_test.go b/internal/pkg/dl/policies_integration_test.go index 18cdf4011..e39edb479 100644 --- a/internal/pkg/dl/policies_integration_test.go +++ b/internal/pkg/dl/policies_integration_test.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func createRandomPolicy(id string, revisionIdx int) model.Policy { @@ -47,6 +48,7 @@ func storeRandomPolicy(ctx context.Context, bulker bulk.Bulk, index string) (mod func TestQueryLatestPolicies(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPolicies) @@ -77,6 +79,7 @@ func TestQueryLatestPolicies(t *testing.T) { func TestCreatePolicy(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPolicies) diff --git a/internal/pkg/dl/policies_leader.go b/internal/pkg/dl/policies_leader.go index 157681ae7..3fe83602b 100644 --- a/internal/pkg/dl/policies_leader.go +++ b/internal/pkg/dl/policies_leader.go @@ -15,7 +15,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/dsl" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) var ( @@ -52,7 +52,7 @@ func SearchPolicyLeaders(ctx context.Context, bulker bulk.Bulk, ids []string, op res, err := bulker.Search(ctx, o.indexName, data) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { - log.Debug().Str("index", o.indexName).Msg(es.ErrIndexNotFound.Error()) + zerolog.Ctx(ctx).Debug().Str("index", o.indexName).Msg(es.ErrIndexNotFound.Error()) err = nil } return diff --git a/internal/pkg/dl/policies_leader_integration_test.go b/internal/pkg/dl/policies_leader_integration_test.go index c147d1e61..73194b6c3 100644 --- a/internal/pkg/dl/policies_leader_integration_test.go +++ b/internal/pkg/dl/policies_leader_integration_test.go @@ -15,6 +15,8 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" + "github.com/gofrs/uuid" ) @@ -23,6 +25,7 @@ const testVer = "1.0.0" func TestSearchPolicyLeaders(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) @@ -56,6 +59,7 @@ func TestSearchPolicyLeaders(t *testing.T) { func TestTakePolicyLeadership(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) @@ -91,6 +95,7 @@ func TestTakePolicyLeadership(t *testing.T) { func TestReleasePolicyLeadership(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) @@ -131,6 +136,7 @@ func TestReleasePolicyLeadership(t *testing.T) { func TestReleasePolicyLeadership_NothingIfNotLeader(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) diff --git a/internal/pkg/dl/servers_integration_test.go b/internal/pkg/dl/servers_integration_test.go index 85066ac62..da2523480 100644 --- a/internal/pkg/dl/servers_integration_test.go +++ b/internal/pkg/dl/servers_integration_test.go @@ -16,11 +16,13 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func TestEnsureServer(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetServers) diff --git a/internal/pkg/es/client.go b/internal/pkg/es/client.go index aaf9b40f5..96b53cf62 100644 --- a/internal/pkg/es/client.go +++ b/internal/pkg/es/client.go @@ -14,9 +14,9 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/build" "github.com/elastic/fleet-server/v7/internal/pkg/config" + "github.com/rs/zerolog" "github.com/elastic/go-elasticsearch/v8" - "github.com/rs/zerolog/log" ) type ConfigOption func(config *elasticsearch.Config) @@ -34,7 +34,7 @@ func NewClient(ctx context.Context, cfg *config.Config, longPoll bool, opts ...C opt(&escfg) } - zlog := log.With(). + zlog := zerolog.Ctx(ctx).With(). Strs("cluster.addr", addr). Int("cluster.maxConnsPersHost", mcph). Logger() diff --git a/internal/pkg/file/uploader/es.go b/internal/pkg/file/uploader/es.go index 18ec012c3..7da621d66 100644 --- a/internal/pkg/file/uploader/es.go +++ b/internal/pkg/file/uploader/es.go @@ -18,7 +18,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/file/cbor" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" "go.elastic.co/apm/v2" ) @@ -106,7 +106,7 @@ func UpdateFileDoc(ctx context.Context, bulker bulk.Bulk, source string, fileID if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { return err } - log.Trace().Int("statuscode", resp.StatusCode).Interface("response", response).Msg("updated file metadata document") + zerolog.Ctx(ctx).Trace().Int("status_code", resp.StatusCode).Interface("response", response).Msg("updated file metadata document") if response.Error.Type != "" { return fmt.Errorf("%s: %s caused by %s: %s", response.Error.Type, response.Error.Reason, response.Error.Cause.Type, response.Error.Cause.Reason) @@ -140,7 +140,7 @@ func IndexChunk(ctx context.Context, client *elasticsearch.Client, body *cbor.Ch if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { return err } - log.Trace().Int("statuscode", resp.StatusCode).Interface("chunk-response", response).Msg("uploaded chunk") + zerolog.Ctx(ctx).Trace().Int("status_code", resp.StatusCode).Interface("chunk-response", response).Msg("uploaded chunk") if response.Error.Type != "" { return fmt.Errorf("%s: %s caused by %s: %s", response.Error.Type, response.Error.Reason, response.Error.Cause.Type, response.Error.Cause.Reason) diff --git a/internal/pkg/file/uploader/finalize.go b/internal/pkg/file/uploader/finalize.go index b7a283dd9..63818edd8 100644 --- a/internal/pkg/file/uploader/finalize.go +++ b/internal/pkg/file/uploader/finalize.go @@ -13,7 +13,7 @@ import ( "strings" "github.com/elastic/fleet-server/v7/internal/pkg/file" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" "go.elastic.co/apm/v2" ) @@ -52,10 +52,10 @@ func (u *Uploader) Complete(ctx context.Context, id string, transitHash string) } if !u.verifyChunkInfo(info, chunks, transitHash) { if err := SetStatus(ctx, u.bulker, info, file.StatusFail); err != nil { - log.Warn().Err(err).Str("fileID", info.DocID).Str("uploadID", info.ID).Msg("file upload failed chunk validation, but encountered an error setting the upload status to failure") + zerolog.Ctx(ctx).Warn().Err(err).Str("fileID", info.DocID).Str("uploadID", info.ID).Msg("file upload failed chunk validation, but encountered an error setting the upload status to failure") } if err := DeleteAllChunksForFile(ctx, u.bulker, info.Source, info.DocID); err != nil { - log.Warn().Err(err).Str("fileID", info.DocID).Str("uploadID", info.ID).Msg("file upload failed chunk validation, but encountered an error deleting left-behind chunk data") + zerolog.Ctx(ctx).Warn().Err(err).Str("fileID", info.DocID).Str("uploadID", info.ID).Msg("file upload failed chunk validation, but encountered an error deleting left-behind chunk data") } vSpan.End() return info, ErrFailValidation @@ -74,6 +74,7 @@ func (u *Uploader) Complete(ctx context.Context, id string, transitHash string) } func (u *Uploader) allChunksPresent(info file.Info, chunks []file.ChunkInfo) bool { + log := zerolog.Ctx(context.TODO()) // check overall count if len(chunks) != info.Count { log.Warn().Int("expectedCount", info.Count).Int("received", len(chunks)).Interface("chunks", chunks).Msg("mismatch number of chunks") @@ -96,6 +97,7 @@ func (u *Uploader) allChunksPresent(info file.Info, chunks []file.ChunkInfo) boo } func (u *Uploader) verifyChunkInfo(info file.Info, chunks []file.ChunkInfo, transitHash string) bool { + log := zerolog.Ctx(context.TODO()) // verify all chunks except last are info.ChunkSize size // verify last: false (or field excluded) for all except final chunk // verify final chunk is last: true diff --git a/internal/pkg/gc/actions.go b/internal/pkg/gc/actions.go index 49feb3055..01b8fac94 100644 --- a/internal/pkg/gc/actions.go +++ b/internal/pkg/gc/actions.go @@ -9,11 +9,11 @@ import ( "strconv" "strings" - "github.com/rs/zerolog/log" - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/scheduler" + + "github.com/rs/zerolog" ) type ActionsCleanupConfig struct { @@ -70,7 +70,7 @@ func cleanupActions(ctx context.Context, index string, bulker bulk.Bulk, opts .. opt(&c) } - log := log.With().Str("ctx", "fleet actions cleanup").Str("interval", "now-"+c.cleanupIntervalAfterExpired).Logger() + log := zerolog.Ctx(ctx).With().Str("ctx", "fleet actions cleanup").Str("interval", "now-"+c.cleanupIntervalAfterExpired).Logger() log.Debug().Msg("delete expired actions") diff --git a/internal/pkg/gc/actions_integration_test.go b/internal/pkg/gc/actions_integration_test.go index f1e5e5a33..fe6fe39bd 100644 --- a/internal/pkg/gc/actions_integration_test.go +++ b/internal/pkg/gc/actions_integration_test.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func TestCleanupActions(t *testing.T) { @@ -52,6 +53,7 @@ func testCleanupActionsWithSelectSize(t *testing.T, _ int) { ) ctx := context.Background() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, dl.FleetActions) diff --git a/internal/pkg/limit/error.go b/internal/pkg/limit/error.go index 65bea753b..399e04706 100644 --- a/internal/pkg/limit/error.go +++ b/internal/pkg/limit/error.go @@ -9,7 +9,7 @@ import ( "errors" "net/http" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) var ( @@ -19,7 +19,7 @@ var ( // writeError recreates the behaviour of api/error.go. // It is defined separately here to stop a circular import -func writeError(w http.ResponseWriter, err error) error { +func writeError(log *zerolog.Logger, w http.ResponseWriter, err error) error { resp := struct { Status int `json:"statusCode"` Error string `json:"error"` diff --git a/internal/pkg/limit/error_test.go b/internal/pkg/limit/error_test.go index 829e99e79..4faccbc6d 100644 --- a/internal/pkg/limit/error_test.go +++ b/internal/pkg/limit/error_test.go @@ -11,6 +11,8 @@ import ( "net/http/httptest" "testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" + "github.com/stretchr/testify/require" ) @@ -36,7 +38,8 @@ func TestWriteError(t *testing.T) { t.Run(tt.name, func(t *testing.T) { w := httptest.NewRecorder() - err := writeError(w, tt.err) + log := testlog.SetLogger(t) + err := writeError(&log, w, tt.err) require.NoError(t, err) resp := w.Result() defer resp.Body.Close() diff --git a/internal/pkg/limit/limiter.go b/internal/pkg/limit/limiter.go index 89ab5fa0d..5270f7e2e 100644 --- a/internal/pkg/limit/limiter.go +++ b/internal/pkg/limit/limiter.go @@ -82,7 +82,7 @@ func (l *Limiter) Wrap(name string, si StatIncer, ll zerolog.Level) func(http.Ha lf, err := l.acquire() if err != nil { hlog.FromRequest(r).WithLevel(ll).Str("route", name).Err(err).Msg("limit reached") - if wErr := writeError(w, err); wErr != nil { + if wErr := writeError(hlog.FromRequest(r), w, err); wErr != nil { hlog.FromRequest(r).Error().Err(wErr).Msg("fail writing error response") } if si != nil { diff --git a/internal/pkg/limit/listener.go b/internal/pkg/limit/listener.go index fcefa65ed..55d9e75be 100644 --- a/internal/pkg/limit/listener.go +++ b/internal/pkg/limit/listener.go @@ -5,12 +5,12 @@ package limit import ( + "context" "net" "sync" "github.com/elastic/fleet-server/v7/internal/pkg/logger" - - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) // Derived from netutil.LimitListener but works slightly differently. @@ -59,7 +59,7 @@ func (l *limitListener) Accept() (net.Conn, error) { // If we cannot acquire the semaphore, close the connection if acquired := l.acquire(); !acquired { - zlog := log.Warn() + zlog := zerolog.Ctx(context.TODO()).Warn() var err error if c != nil { diff --git a/internal/pkg/logger/http.go b/internal/pkg/logger/http.go index 907c5da5b..c2921c6b3 100644 --- a/internal/pkg/logger/http.go +++ b/internal/pkg/logger/http.go @@ -18,7 +18,6 @@ import ( "github.com/gofrs/uuid" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" "go.elastic.co/apm/module/apmzerolog/v2" @@ -220,7 +219,7 @@ func Middleware(next http.Handler) http.Handler { // Add trace correlation fields ctx := r.Context() - zlog := log.Hook(apmzerolog.TraceContextHook(ctx)) + zlog := zerolog.Ctx(ctx).Hook(apmzerolog.TraceContextHook(ctx)) // Update request context // NOTE this injects the request id and addr into all logs that use the request logger zlog = zlog.With().Str(ECSHTTPRequestID, reqID).Str(ECSServerAddress, addr).Logger() @@ -240,7 +239,7 @@ func Middleware(next http.Handler) http.Handler { wrCounter := NewResponseCounter(w) - if log.Debug().Enabled() { + if zlog.Debug().Enabled() { d := zlog.Debug() httpMeta(r, d) httpDebug(r, d) @@ -250,7 +249,7 @@ func Middleware(next http.Handler) http.Handler { next.ServeHTTP(wrCounter, r) httpMeta(r, e) - if log.Debug().Enabled() || (wrCounter.statusCode < 200 && wrCounter.statusCode >= 300) { + if zlog.Debug().Enabled() || (wrCounter.statusCode < 200 && wrCounter.statusCode >= 300) { e.Uint64(ECSHTTPRequestBodyBytes, rdCounter.Count()) e.Uint64(ECSHTTPResponseBodyBytes, wrCounter.Count()) e.Int(ECSHTTPResponseCode, wrCounter.statusCode) diff --git a/internal/pkg/logger/http_test.go b/internal/pkg/logger/http_test.go index 174c770af..640c2725c 100644 --- a/internal/pkg/logger/http_test.go +++ b/internal/pkg/logger/http_test.go @@ -5,15 +5,19 @@ package logger import ( + "context" "net/http" "net/http/httptest" "testing" "time" "github.com/stretchr/testify/require" + + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func TestMiddleware(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(context.Background()) h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts, ok := CtxStartTime(r.Context()) require.True(t, ok, "expected context to have start time") @@ -24,7 +28,7 @@ func TestMiddleware(t *testing.T) { }) w := httptest.NewRecorder() - req := httptest.NewRequest("GET", "/", nil) + req := httptest.NewRequest("GET", "/", nil).WithContext(ctx) Middleware(h).ServeHTTP(w, req) res := w.Result() diff --git a/internal/pkg/logger/logger.go b/internal/pkg/logger/logger.go index bae6110d2..0a96c1f1c 100644 --- a/internal/pkg/logger/logger.go +++ b/internal/pkg/logger/logger.go @@ -39,6 +39,7 @@ type Logger struct { cfg *config.Config sync WriterSync name string + log zerolog.Logger } // Reload reloads the logger configuration. @@ -48,16 +49,18 @@ func (l *Logger) Reload(_ context.Context, cfg *config.Config) error { zerolog.SetGlobalLevel(level(cfg)) } if !l.cfg.Logging.EqualExcludeLevel(cfg.Logging) { - // sync before reload + // sync before set l.Sync() - // reload the logger - logger, w, err := configure(cfg, l.name) + out, wr, err := getOutput(cfg) if err != nil { return err } - log.Logger = logger - l.sync = w + l.log = l.log.Output(out) + l.sync = wr + + log.Logger = l.log + zerolog.DefaultContextLogger = &l.log // introduces race conditions in integration test? } l.cfg = cfg return nil @@ -76,18 +79,22 @@ func Init(cfg *config.Config, svcName string) (*Logger, error) { once.Do(func() { zerolog.SetGlobalLevel(level(cfg)) - var l zerolog.Logger - var w WriterSync - l, w, err = configure(cfg, svcName) + out, wr, err := getOutput(cfg) if err != nil { return } + l := ecszerolog.New(out) + if svcName != "" { + l = l.With().Str(ECSServiceName, svcName).Str(ECSServiceType, svcName).Logger() + } log.Logger = l + zerolog.DefaultContextLogger = &l gLogger = &Logger{ cfg: cfg, - sync: w, + sync: wr, name: svcName, + log: l, } }) return gLogger, err @@ -107,16 +114,16 @@ func level(cfg *config.Config) zerolog.Level { return cfg.Logging.LogLevel() } -func configureStderrLogger(cfg *config.Config) (zerolog.Logger, WriterSync) { +func stderrOut(cfg *config.Config) (io.Writer, WriterSync) { out := io.Writer(os.Stderr) if cfg.Logging.Pretty { out = zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "15:04:05.000"} } - return ecszerolog.New(out), os.Stderr + return out, os.Stderr } -func configureFileRotatorLogger(cfg *config.Config) (zerolog.Logger, WriterSync, error) { +func fileRotatorOut(cfg *config.Config) (io.Writer, WriterSync, error) { files := cfg.Logging.Files if files == nil { files = &config.LoggingFiles{} @@ -132,26 +139,21 @@ func configureFileRotatorLogger(cfg *config.Config) (zerolog.Logger, WriterSync, file.RedirectStderr(files.RedirectStderr), ) if err != nil { - return zerolog.Logger{}, nil, err + return nil, nil, err } - return ecszerolog.New(rotator), rotator, nil + return rotator, rotator, nil } -func configure(cfg *config.Config, svcName string) (lg zerolog.Logger, wr WriterSync, err error) { +func getOutput(cfg *config.Config) (out io.Writer, wr WriterSync, err error) { switch { case cfg.Logging.ToStderr: - lg, wr = configureStderrLogger(cfg) + out, wr = stderrOut(cfg) case cfg.Logging.ToFiles: - lg, wr, err = configureFileRotatorLogger(cfg) + out, wr, err = fileRotatorOut(cfg) default: - lg = ecszerolog.New(io.Discard) + out = io.Discard wr = &nopSync{} } - lg = lg.Level(zerolog.TraceLevel) - - if svcName != "" { - lg = lg.With().Str(ECSServiceName, svcName).Str(ECSServiceType, svcName).Logger() - } return //nolint:nakedret // short function } diff --git a/internal/pkg/logger/logger_test.go b/internal/pkg/logger/logger_test.go index 5bc92d0de..fcb0c2abc 100644 --- a/internal/pkg/logger/logger_test.go +++ b/internal/pkg/logger/logger_test.go @@ -91,12 +91,32 @@ func Test_Logger_Reload(t *testing.T) { cfg := stderrcfg() cfg.Logging.ToStderr = false - cfg.Logging.Level = "debug" + cfg.Logging.Level = "warn" err := logger.Reload(context.Background(), cfg) require.NoError(t, err) log.Info().Msg("Hello, World!") - assert.Equal(t, zerolog.DebugLevel, zerolog.GlobalLevel()) + assert.Equal(t, zerolog.WarnLevel, zerolog.GlobalLevel()) assert.Empty(t, b, "write went to original logger") }) + t.Run("Check context logger", func(t *testing.T) { + var b bytes.Buffer + l := zerolog.New(&b) + zerolog.DefaultContextLogger = &l + logger.cfg = stderrcfg() + logger.sync = &nopSync{} + + zerolog.Ctx(context.Background()).Error().Msg("Hello, World!") + assert.NotEmpty(t, b, "expected something to be written") + b.Reset() + + cfg := stderrcfg() + cfg.Logging.Level = "debug" + err := logger.Reload(context.Background(), cfg) + require.NoError(t, err) + zerolog.Ctx(context.Background()).Error().Msg("Hello, World!") + + assert.Equal(t, zerolog.DebugLevel, zerolog.GlobalLevel()) + assert.NotEmpty(t, b, "expected something to be written") + }) } diff --git a/internal/pkg/logger/zapStub.go b/internal/pkg/logger/zapStub.go index 082cb0b09..5874ca9b8 100644 --- a/internal/pkg/logger/zapStub.go +++ b/internal/pkg/logger/zapStub.go @@ -35,7 +35,6 @@ type zapStub struct { } func (z zapStub) Enabled(zapLevel zapcore.Level) bool { - zeroLevel := log.Logger.GetLevel() switch zapLevel { @@ -61,23 +60,22 @@ func (z zapStub) Sync() error { } func (z zapStub) Write(p []byte) (n int, err error) { - // Unwrap the zap object for logging m := make(map[string]interface{}) if err := json.Unmarshal(p, &m); err != nil { return 0, err } - ctx := log.Log() + e := log.Log() for key, val := range m { // Don't dupe the timestamp, use the fleet formatted timestamp. if key != ECSTimestamp { - ctx.Interface(key, val) + e.Interface(key, val) } } - ctx.Send() + e.Send() return 0, nil } diff --git a/internal/pkg/monitor/monitor.go b/internal/pkg/monitor/monitor.go index fa612d476..3d815428e 100644 --- a/internal/pkg/monitor/monitor.go +++ b/internal/pkg/monitor/monitor.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "go.elastic.co/apm/v2" ) @@ -120,8 +119,6 @@ func NewSimple(index string, esCli, monCli *elasticsearch.Client, opts ...Option opt(m) } - m.log = log.With().Str("index", m.index).Str("ctx", "index monitor").Logger() - tmplCheck, err := m.prepareCheckQuery() if err != nil { return nil, err @@ -198,6 +195,7 @@ func (m *simpleMonitorT) loadCheckpoint() sqn.SeqNo { // Run runs monitor. func (m *simpleMonitorT) Run(ctx context.Context) (err error) { + m.log = zerolog.Ctx(ctx).With().Str("index", m.index).Str("ctx", "index monitor").Logger() m.log.Info().Msg("starting index monitor") defer func() { if errors.Is(err, context.Canceled) { diff --git a/internal/pkg/monitor/monitor_integration_test.go b/internal/pkg/monitor/monitor_integration_test.go index 9869ab389..684a21fc4 100644 --- a/internal/pkg/monitor/monitor_integration_test.go +++ b/internal/pkg/monitor/monitor_integration_test.go @@ -30,6 +30,7 @@ import ( func TestSimpleMonitorEmptyIndex(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, dl.FleetActions) @@ -39,6 +40,7 @@ func TestSimpleMonitorEmptyIndex(t *testing.T) { func TestSimpleMonitorNonEmptyIndex(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker, _ := ftesting.SetupActions(ctx, t, 1, 12) @@ -46,9 +48,9 @@ func TestSimpleMonitorNonEmptyIndex(t *testing.T) { } func TestSimpleMonitorCheckpointOutOfSync(t *testing.T) { - log := testlog.SetLogger(t) ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker, _ := ftesting.SetupActions(ctx, t, 1, 12) @@ -90,8 +92,7 @@ func TestSimpleMonitorCheckpointOutOfSync(t *testing.T) { checkpoint, err = gcheckpt.Query(ctx, bulker.Client(), index) require.NoError(t, err) - log.Debug().Int64("checkpoint", checkpoint.Value()).Msg("checkpoint before test action delete") - + t.Logf("Checkpoint before test action delete checkpoint=%d", checkpoint.Value()) // Delete an action to emulate the gap between the fleet server tracking checkpoint and the index checkpoint // The delete causes the checkpoint increment and the fleet-server was not updating it's checkpoint tracked value correctly // in these cases. @@ -101,7 +102,7 @@ func TestSimpleMonitorCheckpointOutOfSync(t *testing.T) { checkpoint, err = gcheckpt.Query(ctx, bulker.Client(), index) require.NoError(t, err) - log.Debug().Int64("checkpoint", checkpoint.Value()).Msg("checkpoint after test action delete") + t.Logf("Checkpoint after test action delete checkpoint=%d", checkpoint.Value()) // Wait for fleet server monitor checkpoint to be incremented after delete m, _ := mon.(*simpleMonitorT) @@ -109,7 +110,7 @@ func TestSimpleMonitorCheckpointOutOfSync(t *testing.T) { start := time.Now() for { monCheckpoint = m.loadCheckpoint() - log.Debug().Int64("wait checkpoint", checkpoint.Value()).Int64("got checkpoint", monCheckpoint.Value()).Msg("monitor checkpoint") + t.Logf("Monitor checkpoint wait_checkpoint=%d got_checkpoint=%d", checkpoint.Value(), monCheckpoint.Value()) if checkpoint.Value() == monCheckpoint.Value() { break } diff --git a/internal/pkg/monitor/subscription_monitor.go b/internal/pkg/monitor/subscription_monitor.go index 2cf5664b8..59f41bf49 100644 --- a/internal/pkg/monitor/subscription_monitor.go +++ b/internal/pkg/monitor/subscription_monitor.go @@ -12,9 +12,9 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" + "github.com/rs/zerolog" "github.com/elastic/go-elasticsearch/v8" - "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" ) @@ -148,7 +148,7 @@ func (m *monitorT) notify(ctx context.Context, hits []es.HitT) { select { case s.c <- hits: case <-lc.Done(): - log.Error(). + zerolog.Ctx(ctx).Error(). Err(lc.Err()). Str("ctx", "subscription monitor"). Dur("timeout", m.subTimeout). diff --git a/internal/pkg/monitor/subscription_monitor_integration_test.go b/internal/pkg/monitor/subscription_monitor_integration_test.go index 3db7cd222..220e62b2f 100644 --- a/internal/pkg/monitor/subscription_monitor_integration_test.go +++ b/internal/pkg/monitor/subscription_monitor_integration_test.go @@ -19,11 +19,13 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func TestMonitorEmptyIndex(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, dl.FleetActions) runMonitorTest(t, ctx, index, bulker) @@ -32,6 +34,7 @@ func TestMonitorEmptyIndex(t *testing.T) { func TestMonitorNonEmptyIndex(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker, _ := ftesting.SetupActions(ctx, t, 1, 12) runMonitorTest(t, ctx, index, bulker) diff --git a/internal/pkg/policy/monitor.go b/internal/pkg/policy/monitor.go index ef6d8e0f7..187b8ab67 100644 --- a/internal/pkg/policy/monitor.go +++ b/internal/pkg/policy/monitor.go @@ -11,7 +11,6 @@ import ( "time" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "go.elastic.co/apm/v2" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" @@ -93,7 +92,6 @@ type monitorT struct { // NewMonitor creates the policy monitor for subscribing agents. func NewMonitor(bulker bulk.Bulk, monitor monitor.Monitor, throttle time.Duration) Monitor { return &monitorT{ - log: log.With().Str("ctx", "policy agent monitor").Logger(), bulker: bulker, monitor: monitor, kickCh: make(chan struct{}, 1), @@ -109,6 +107,7 @@ func NewMonitor(bulker bulk.Bulk, monitor monitor.Monitor, throttle time.Duratio // Run runs the monitor. func (m *monitorT) Run(ctx context.Context) error { + m.log = zerolog.Ctx(ctx).With().Str("ctx", "policy agent monitor").Logger() m.log.Info(). Dur("throttle", m.throttle). Msg("run policy monitor") @@ -187,14 +186,16 @@ func unmarshalHits(hits []es.HitT) ([]model.Policy, error) { func (m *monitorT) processHits(ctx context.Context, hits []es.HitT) error { policies, err := unmarshalHits(hits) if err != nil { - m.log.Error().Err(err).Msg("fail unmarshal hits") + zerolog.Ctx(ctx).Error().Err(err).Msg("fail unmarshal hits") return err } return m.processPolicies(ctx, policies) } -func (m *monitorT) waitStart(ctx context.Context) error { //nolint:unused // not sure if this is used in tests +// waitStart returns once Run has started +// It's used in tests. +func (m *monitorT) waitStart(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() diff --git a/internal/pkg/policy/monitor_integration_test.go b/internal/pkg/policy/monitor_integration_test.go index fd904fecb..380dc5b1d 100644 --- a/internal/pkg/policy/monitor_integration_test.go +++ b/internal/pkg/policy/monitor_integration_test.go @@ -14,11 +14,13 @@ import ( "time" "github.com/gofrs/uuid" + "github.com/stretchr/testify/require" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) var intPolData = model.PolicyData{ @@ -32,6 +34,7 @@ var intPolData = model.PolicyData{ func TestMonitor_Integration(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = testlog.SetLogger(t).WithContext(ctx) index, bulker := ftesting.SetupCleanIndex(ctx, t, dl.FleetPolicies) @@ -69,6 +72,8 @@ func TestMonitor_Integration(t *testing.T) { merr = nil } }() + err = m.(*monitorT).waitStart(ctx) + require.NoError(t, err) agentID := uuid.Must(uuid.NewV4()).String() policyID := uuid.Must(uuid.NewV4()).String() diff --git a/internal/pkg/policy/monitor_test.go b/internal/pkg/policy/monitor_test.go index 8edcd3c60..db3b68a7e 100644 --- a/internal/pkg/policy/monitor_test.go +++ b/internal/pkg/policy/monitor_test.go @@ -37,9 +37,9 @@ var policyDataDefault = &model.PolicyData{ } func TestMonitor_NewPolicy(t *testing.T) { - _ = testlog.SetLogger(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = testlog.SetLogger(t).WithContext(ctx) chHitT := make(chan []es.HitT, 1) defer close(chHitT) @@ -117,9 +117,9 @@ func TestMonitor_NewPolicy(t *testing.T) { } func TestMonitor_SamePolicy(t *testing.T) { - _ = testlog.SetLogger(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = testlog.SetLogger(t).WithContext(ctx) chHitT := make(chan []es.HitT, 1) defer close(chHitT) @@ -144,6 +144,9 @@ func TestMonitor_SamePolicy(t *testing.T) { merr = monitor.Run(ctx) }() + err := monitor.(*monitorT).waitStart(ctx) + require.NoError(t, err) + agentId := uuid.Must(uuid.NewV4()).String() policyId := uuid.Must(uuid.NewV4()).String() s, err := monitor.Subscribe(agentId, policyId, 1, 1) @@ -192,9 +195,9 @@ func TestMonitor_SamePolicy(t *testing.T) { } func TestMonitor_NewPolicyUncoordinated(t *testing.T) { - _ = testlog.SetLogger(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = testlog.SetLogger(t).WithContext(ctx) chHitT := make(chan []es.HitT, 1) defer close(chHitT) @@ -219,6 +222,9 @@ func TestMonitor_NewPolicyUncoordinated(t *testing.T) { merr = monitor.Run(ctx) }() + err := monitor.(*monitorT).waitStart(ctx) + require.NoError(t, err) + agentId := uuid.Must(uuid.NewV4()).String() policyId := uuid.Must(uuid.NewV4()).String() s, err := monitor.Subscribe(agentId, policyId, 1, 1) @@ -281,7 +287,6 @@ func TestMonitor_NewPolicyExists(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - _ = testlog.SetLogger(t) runTestMonitor_NewPolicyExists(t, tc.delay) }) } @@ -290,6 +295,7 @@ func TestMonitor_NewPolicyExists(t *testing.T) { func runTestMonitor_NewPolicyExists(t *testing.T, delay time.Duration) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = testlog.SetLogger(t).WithContext(ctx) chHitT := make(chan []es.HitT, 1) defer close(chHitT) @@ -331,6 +337,9 @@ func runTestMonitor_NewPolicyExists(t *testing.T, delay time.Duration) { merr = monitor.Run(ctx) }() + err := monitor.(*monitorT).waitStart(ctx) + require.NoError(t, err) + s, err := monitor.Subscribe(agentId, policyId, 1, 1) defer monitor.Unsubscribe(s) require.NoError(t, err) diff --git a/internal/pkg/policy/policy_output_integration_test.go b/internal/pkg/policy/policy_output_integration_test.go index e1328cc8e..f0178df4a 100644 --- a/internal/pkg/policy/policy_output_integration_test.go +++ b/internal/pkg/policy/policy_output_integration_test.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) var TestPayload []byte @@ -47,7 +48,8 @@ func TestRenderUpdatePainlessScript(t *testing.T) { outputName := "output_" + tt.name outputAPIKey := bulk.APIKey{ID: "new_ID", Key: "new-key"} - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, dl.FleetAgents) + ctx := testlog.SetLogger(t).WithContext(context.Background()) + index, bulker := ftesting.SetupCleanIndex(ctx, t, dl.FleetAgents) now := time.Now().UTC() nowStr := now.Format(time.RFC3339) @@ -97,7 +99,7 @@ func TestRenderUpdatePainlessScript(t *testing.T) { require.NoError(t, err) _, err = bulker.Create( - context.Background(), index, agentID, body, bulk.WithRefresh()) + ctx, index, agentID, body, bulk.WithRefresh()) require.NoError(t, err) fields := map[string]interface{}{ @@ -111,7 +113,7 @@ func TestRenderUpdatePainlessScript(t *testing.T) { got, err := renderUpdatePainlessScript(outputName, fields) require.NoError(t, err, "renderUpdatePainlessScript returned an unexpected error") - err = bulker.Update(context.Background(), dl.FleetAgents, agentID, got) + err = bulker.Update(ctx, dl.FleetAgents, agentID, got) require.NoError(t, err, "bulker.Update failed") // there is some refresh thing that needs time, I didn't manage to find @@ -119,7 +121,7 @@ func TestRenderUpdatePainlessScript(t *testing.T) { time.Sleep(time.Second) gotAgent, err := dl.FindAgent( - context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) + ctx, bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) require.NoError(t, err) assert.Equal(t, agentID, gotAgent.Id) @@ -130,11 +132,12 @@ func TestRenderUpdatePainlessScript(t *testing.T) { } func TestPolicyOutputESPrepareRealES(t *testing.T) { - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, dl.FleetAgents) + ctx := testlog.SetLogger(t).WithContext(context.Background()) + index, bulker := ftesting.SetupCleanIndex(ctx, t, dl.FleetAgents) - agentID := createAgent(t, index, bulker) + agentID := createAgent(ctx, t, index, bulker) agent, err := dl.FindAgent( - context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) + ctx, bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) if err != nil { require.NoError(t, err, "failed to find agent ID %q", agentID) } @@ -152,7 +155,7 @@ func TestPolicyOutputESPrepareRealES(t *testing.T) { } err = output.prepareElasticsearch( - context.Background(), zerolog.Nop(), bulker, &agent, policyMap) + ctx, zerolog.Nop(), bulker, &agent, policyMap) require.NoError(t, err) // need to wait a bit before querying the agent again @@ -160,7 +163,7 @@ func TestPolicyOutputESPrepareRealES(t *testing.T) { time.Sleep(time.Second) got, err := dl.FindAgent( - context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) + ctx, bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) if err != nil { require.NoError(t, err, "failed to find agent ID %q", agentID) } @@ -175,7 +178,7 @@ func TestPolicyOutputESPrepareRealES(t *testing.T) { assert.NotEmpty(t, gotOutput.APIKeyID) } -func createAgent(t *testing.T, index string, bulker bulk.Bulk) string { +func createAgent(ctx context.Context, t *testing.T, index string, bulker bulk.Bulk) string { const nowStr = "2022-08-12T16:50:05Z" agentID := uuid.Must(uuid.NewV4()).String() @@ -194,7 +197,7 @@ func createAgent(t *testing.T, index string, bulker bulk.Bulk) string { require.NoError(t, err) _, err = bulker.Create( - context.Background(), index, agentID, body, bulk.WithRefresh()) + ctx, index, agentID, body, bulk.WithRefresh()) require.NoError(t, err) return agentID diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index 8fc9b28e7..b9c8d7bd0 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -15,7 +15,6 @@ import ( "go.elastic.co/apm/v2" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/config" @@ -69,7 +68,6 @@ type selfMonitorT struct { // has a Fleet Server input defined. func NewSelfMonitor(fleet config.Fleet, bulker bulk.Bulk, monitor monitor.Monitor, policyID string, reporter state.Reporter) SelfMonitor { return &selfMonitorT{ - log: log.With().Str("ctx", "policy self monitor").Logger(), fleet: fleet, bulker: bulker, monitor: monitor, @@ -86,6 +84,7 @@ func NewSelfMonitor(fleet config.Fleet, bulker bulk.Bulk, monitor monitor.Monito // Run runs the monitor. func (m *selfMonitorT) Run(ctx context.Context) error { + m.log = zerolog.Ctx(ctx).With().Str("ctx", "policy self monitor").Logger() s := m.monitor.Subscribe() defer m.monitor.Unsubscribe(s) diff --git a/internal/pkg/policy/standalone.go b/internal/pkg/policy/standalone.go index 07c376c6e..597d267bd 100644 --- a/internal/pkg/policy/standalone.go +++ b/internal/pkg/policy/standalone.go @@ -17,7 +17,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/state" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "go.elastic.co/apm/v2" ) @@ -41,7 +40,6 @@ type standAloneSelfMonitorT struct { // Checks that this Fleet Server has access to the policies index. func NewStandAloneSelfMonitor(bulker bulk.Bulk, reporter state.Reporter) *standAloneSelfMonitorT { return &standAloneSelfMonitorT{ - log: log.With().Str("ctx", "policy self monitor").Logger(), bulker: bulker, state: client.UnitStateStarting, reporter: reporter, @@ -54,6 +52,7 @@ func NewStandAloneSelfMonitor(bulker bulk.Bulk, reporter state.Reporter) *standA // Run runs the monitor. func (m *standAloneSelfMonitorT) Run(ctx context.Context) error { + m.log = zerolog.Ctx(ctx).With().Str("ctx", "policy self monitor").Logger() ticker := time.NewTicker(m.checkTime) defer ticker.Stop() diff --git a/internal/pkg/profile/profile.go b/internal/pkg/profile/profile.go index 024239784..931859480 100644 --- a/internal/pkg/profile/profile.go +++ b/internal/pkg/profile/profile.go @@ -12,14 +12,14 @@ import ( "net/http/pprof" "github.com/elastic/fleet-server/v7/internal/pkg/config" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) // RunProfiler exposes /debug/pprof on the passed address by staring a server. func RunProfiler(ctx context.Context, addr string) error { if addr == "" { - log.Info().Msg("Profiler disabled") + zerolog.Ctx(ctx).Info().Msg("Profiler disabled") return nil } @@ -46,9 +46,9 @@ func RunProfiler(ctx context.Context, addr string) error { IdleTimeout: cfg.Idle, } - log.Info().Str("bind", addr).Msg("Installing profiler") + zerolog.Ctx(ctx).Info().Str("bind", addr).Msg("Installing profiler") if err := server.ListenAndServe(); err != nil { - log.Error().Err(err).Str("bind", addr).Msg("Fail install profiler") + zerolog.Ctx(ctx).Error().Err(err).Str("bind", addr).Msg("Fail install profiler") return err } diff --git a/internal/pkg/scheduler/scheduler.go b/internal/pkg/scheduler/scheduler.go index 945b05499..a9d8fc6c3 100644 --- a/internal/pkg/scheduler/scheduler.go +++ b/internal/pkg/scheduler/scheduler.go @@ -13,7 +13,6 @@ import ( "math/rand" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" ) @@ -34,8 +33,6 @@ type Schedule struct { // Scheduler tracks scheduled functions. type Scheduler struct { - log zerolog.Logger - splayPercent int firstRunDelay time.Duration // Interval to run the scheduled function for the first time since the scheduler started, splayed as well. @@ -70,7 +67,6 @@ func WithFirstRunDelay(delay time.Duration) OptFunc { // Schedules may not be added to a scheduler after creation. func New(schedules []Schedule, opts ...OptFunc) (*Scheduler, error) { s := &Scheduler{ - log: log.With().Str("ctx", "elasticsearch CG scheduler").Logger(), splayPercent: defaultSplayPercent, firstRunDelay: defaultFirstRunDelay, rand: rand.New(rand.NewSource(time.Now().UnixNano())), //nolint:gosec // used for timing offsets @@ -91,6 +87,8 @@ func New(schedules []Schedule, opts ...OptFunc) (*Scheduler, error) { // Schedule Interval times are guaranteed minium values (if the execution takes a very long time, the scheduler will wait Interval before running the function again). func (s *Scheduler) Run(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) + log := zerolog.Ctx(ctx).With().Str("ctx", "elasticsearch CG scheduler").Logger() + ctx = log.WithContext(ctx) for _, schedule := range s.schedules { g.Go(s.getRunScheduleFunc(ctx, schedule)) @@ -100,7 +98,7 @@ func (s *Scheduler) Run(ctx context.Context) error { func (s *Scheduler) getRunScheduleFunc(ctx context.Context, schedule Schedule) func() error { return func() error { - log := log.With().Str("schedule", schedule.Name).Logger() + log := zerolog.Ctx(ctx).With().Str("schedule", schedule.Name).Logger() t := time.NewTimer(s.intervalWithSplay(s.firstRunDelay)) // Initial schedule to run right away with splayed randomly delay defer t.Stop() diff --git a/internal/pkg/server/agent.go b/internal/pkg/server/agent.go index 49a287224..65591aeee 100644 --- a/internal/pkg/server/agent.go +++ b/internal/pkg/server/agent.go @@ -17,10 +17,10 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/reload" "github.com/elastic/fleet-server/v7/internal/pkg/sleep" "github.com/elastic/fleet-server/v7/internal/pkg/state" + "github.com/rs/zerolog" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/go-ucfg" - "github.com/rs/zerolog/log" "gopkg.in/yaml.v3" ) @@ -85,6 +85,7 @@ func NewAgent(cliCfg *ucfg.Config, reader io.Reader, bi build.Info, reloadables // Run starts a Server instance using config from the configured client. func (a *Agent) Run(ctx context.Context) error { + log := zerolog.Ctx(ctx) a.agent.RegisterDiagnosticHook("fleet-server config", "fleet-server's current configuration", "fleet-server.yml", "application/yml", func() []byte { a.l.RLock() if a.cfg == nil { diff --git a/internal/pkg/server/agent_integration_test.go b/internal/pkg/server/agent_integration_test.go index 29cc0cc4b..a59c8d722 100644 --- a/internal/pkg/server/agent_integration_test.go +++ b/internal/pkg/server/agent_integration_test.go @@ -37,6 +37,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/reload" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) var biInfo = build.Info{ @@ -57,15 +58,22 @@ var policyData = model.PolicyData{ } func TestAgent(t *testing.T) { + l, err := logger.Init(&config.Config{}, "test") + require.NoError(t, err) + + lg := testlog.SetLogger(t) + zerolog.DefaultContextLogger = &lg + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = lg.WithContext(ctx) t.Log("Setup agent integration test") bulker := ftesting.SetupBulk(ctx, t) // add a real default fleet server policy policyID := uuid.Must(uuid.NewV4()).String() - _, err := dl.CreatePolicy(ctx, bulker, model.Policy{ + _, err = dl.CreatePolicy(ctx, bulker, model.Policy{ PolicyID: policyID, RevisionIdx: 1, DefaultFleetServer: true, @@ -108,9 +116,6 @@ func TestAgent(t *testing.T) { go func() { defer wg.Done() - l, err := logger.Init(&config.Config{}, "test") - require.NoError(t, err) - a := &Agent{ cliCfg: ucfg.New(), reloadables: []reload.Reloadable{l}, diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go index fa489d8ee..be3c9410e 100644 --- a/internal/pkg/server/fleet.go +++ b/internal/pkg/server/fleet.go @@ -38,7 +38,6 @@ import ( "github.com/hashicorp/go-version" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" ) @@ -77,6 +76,7 @@ type runFuncCfg func(context.Context, *config.Config) error // Run runs the fleet server func (f *Fleet) Run(ctx context.Context, initCfg *config.Config) error { + log := zerolog.Ctx(ctx) err := initCfg.LoadServerLimits() if err != nil { return fmt.Errorf("encountered error while loading server limits: %w", err) @@ -173,7 +173,7 @@ LOOP: } // Start or restart server - if configChangedServer(curCfg, newCfg) { + if configChangedServer(*log, curCfg, newCfg) { if srvCancel != nil { log.Info().Msg("stopping server on configuration change") stop(srvCancel, srvEg) @@ -239,7 +239,7 @@ func configCacheChanged(curCfg, newCfg *config.Config) bool { return curCfg.Inputs[0].Cache != newCfg.Inputs[0].Cache } -func configChangedServer(curCfg, newCfg *config.Config) bool { +func configChangedServer(log zerolog.Logger, curCfg, newCfg *config.Config) bool { zlog := log.With().Interface("new", newCfg.Redact()).Logger() changed := true @@ -275,7 +275,7 @@ func safeWait(g *errgroup.Group, to time.Duration) error { select { case err = <-waitCh: case <-time.After(to): - log.Warn().Msg("deadlock: goroutine locked up on errgroup.Wait()") + zerolog.Ctx(context.TODO()).Warn().Msg("deadlock: goroutine locked up on errgroup.Wait()") err = errors.New("group wait timeout") } @@ -283,8 +283,8 @@ func safeWait(g *errgroup.Group, to time.Duration) error { } func loggedRunFunc(ctx context.Context, tag string, runfn runFunc) func() error { + log := zerolog.Ctx(ctx) return func() error { - log.Debug().Msg(tag + " started") err := runfn(ctx) @@ -308,7 +308,7 @@ func initRuntime(cfg *config.Config) { if gcPercent != 0 { old := debug.SetGCPercent(gcPercent) - log.Info(). + zerolog.Ctx(context.TODO()).Info(). Int("old", old). Int("new", gcPercent). Msg("SetGCPercent") @@ -317,7 +317,7 @@ func initRuntime(cfg *config.Config) { if memoryLimit != 0 { old := debug.SetMemoryLimit(memoryLimit) - log.Info(). + zerolog.Ctx(context.TODO()).Info(). Int64("old", old). Int64("new", memoryLimit). Msg("SetMemoryLimit") @@ -341,7 +341,7 @@ func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) { initRuntime(cfg) // Create the APM tracer. - tracer, err := f.initTracer(cfg.Inputs[0].Server.Instrumentation) + tracer, err := f.initTracer(ctx, cfg.Inputs[0].Server.Instrumentation) if err != nil { return err } @@ -404,7 +404,7 @@ func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) { if tracer != nil { go func() { <-ctx.Done() - log.Info().Msg("flushing instrumentation tracer...") + zerolog.Ctx(ctx).Info().Msg("flushing instrumentation tracer...") tracer.Flush(nil) tracer.Close() }() @@ -549,12 +549,12 @@ func (f *Fleet) Reload(ctx context.Context, cfg *config.Config) error { const envAPMActive = "ELASTIC_APM_ACTIVE" -func (f *Fleet) initTracer(cfg config.Instrumentation) (*apm.Tracer, error) { +func (f *Fleet) initTracer(ctx context.Context, cfg config.Instrumentation) (*apm.Tracer, error) { if !cfg.Enabled && os.Getenv(envAPMActive) != "true" { return nil, nil } - log.Info().Msg("fleet-server instrumentation is enabled") + zerolog.Ctx(ctx).Info().Msg("fleet-server instrumentation is enabled") // Use env vars to configure additional APM settings. const ( diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index b154b7e03..8fab5c56b 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -27,7 +27,6 @@ import ( "github.com/gofrs/uuid" "github.com/google/go-cmp/cmp" "github.com/hashicorp/go-cleanhttp" - "github.com/rs/zerolog/log" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -42,6 +41,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/state" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) const ( @@ -186,7 +186,7 @@ func startTestServer(t *testing.T, ctx context.Context, opts ...Option) (*tserve srvcfg.Host = localhost srvcfg.Port = port cfg.Inputs[0].Server = *srvcfg - log.Info().Uint16("port", port).Msg("Test fleet server") + t.Logf("Test fleet server port=%d", port) for _, opt := range opts { if err := opt(cfg); err != nil { @@ -293,6 +293,7 @@ func TestServerConfigErrorReload(t *testing.T) { require.NoError(t, err) logger.Init(cfg, "fleet-server") //nolint:errcheck // test logging setup + ctx = testlog.SetLogger(t).WithContext(ctx) bulker := ftesting.SetupBulk(ctx, t) policyID := uuid.Must(uuid.NewV4()).String() @@ -355,7 +356,7 @@ func TestServerConfigErrorReload(t *testing.T) { newCfg.Inputs[0].Server = *srvcfg cfg.HTTP.Enabled = false newCfg.HTTP.Enabled = false - log.Info().Uint16("port", port).Msg("Test fleet server") + t.Logf("Test fleet server port=%d", port) mReporter := &MockReporter{} srv, err := NewFleet(build.Info{Version: serverVersion}, mReporter, false) @@ -394,6 +395,7 @@ func TestServerUnauthorized(t *testing.T) { // Start test server srv, err := startTestServer(t, ctx) require.NoError(t, err) + ctx = testlog.SetLogger(t).WithContext(ctx) agentID := uuid.Must(uuid.NewV4()).String() cli := cleanhttp.DefaultClient() @@ -412,6 +414,7 @@ func TestServerUnauthorized(t *testing.T) { // Not sure if this is right response, just capturing what we have so far // TODO: revisit error response format t.Run("no auth header", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) for _, u := range allurls { req, err := http.NewRequestWithContext(ctx, "POST", u, bytes.NewBuffer([]byte("{}"))) if err != nil { @@ -447,6 +450,7 @@ func TestServerUnauthorized(t *testing.T) { // Unauthorized, expecting error from /_security/_authenticate t.Run("unauthorized", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) for _, u := range agenturls { req, err := http.NewRequestWithContext(ctx, "POST", u, bytes.NewBuffer([]byte("{}"))) require.NoError(t, err) @@ -504,6 +508,7 @@ func TestServerInstrumentation(t *testing.T) { // Start test server with instrumentation disabled srv, err := startTestServer(t, ctx, WithAPM(server.URL, false)) require.NoError(t, err) + ctx = testlog.SetLogger(t).WithContext(ctx) agentID := "1e4954ce-af37-4731-9f4a-407b08e69e42" checkinURL := srv.buildURL(agentID, "checkin") @@ -588,6 +593,7 @@ func Test_SmokeTest_Agent_Calls(t *testing.T) { // Start test server srv, err := startTestServer(t, ctx) require.NoError(t, err) + ctx = testlog.SetLogger(t).WithContext(ctx) cli := cleanhttp.DefaultClient() @@ -741,6 +747,7 @@ func Test_Agent_Enrollment_Id(t *testing.T) { // Start test server srv, err := startTestServer(t, ctx) require.NoError(t, err) + ctx = testlog.SetLogger(t).WithContext(ctx) t.Log("Enroll the first agent with enrollment_id") firstAgentID := EnrollAgent(enrollBodyWEnrollmentID, t, ctx, srv) @@ -787,6 +794,7 @@ func Test_Agent_Enrollment_Id_Invalidated_API_key(t *testing.T) { // Start test server srv, err := startTestServer(t, ctx) require.NoError(t, err) + ctx = testlog.SetLogger(t).WithContext(ctx) t.Log("Enroll the first agent with enrollment_id") firstAgentID := EnrollAgent(enrollBodyWEnrollmentID, t, ctx, srv) @@ -835,6 +843,7 @@ func Test_Agent_Auth_errors(t *testing.T) { // Start test server srv, err := startTestServer(t, ctx) require.NoError(t, err) + ctx = testlog.SetLogger(t).WithContext(ctx) cli := cleanhttp.DefaultClient() @@ -873,6 +882,7 @@ func Test_Agent_Auth_errors(t *testing.T) { require.NotEmpty(t, id) t.Run("use enroll key for checkin", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+id+"/checkin", strings.NewReader(checkinBody)) require.NoError(t, err) req.Header.Set("Authorization", "ApiKey "+srv.enrollKey) @@ -885,6 +895,7 @@ func Test_Agent_Auth_errors(t *testing.T) { require.Equal(t, http.StatusNotFound, res.StatusCode) // NOTE this is a 404 and not a 400 }) t.Run("wrong agent ID", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/bad-agent-id/checkin", strings.NewReader(checkinBody)) require.NoError(t, err) req.Header.Set("Authorization", "ApiKey "+key) @@ -897,6 +908,7 @@ func Test_Agent_Auth_errors(t *testing.T) { require.Equal(t, http.StatusBadRequest, res.StatusCode) }) t.Run("use another agent's api key", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/enroll", strings.NewReader(enrollBody)) require.NoError(t, err) req.Header.Set("Authorization", "ApiKey "+srv.enrollKey) @@ -936,6 +948,7 @@ func Test_Agent_Auth_errors(t *testing.T) { require.Equal(t, http.StatusBadRequest, res.StatusCode) }) t.Run("use api key for enrollment", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/enroll", strings.NewReader(enrollBody)) require.NoError(t, err) req.Header.Set("Authorization", "ApiKey "+key) @@ -955,9 +968,11 @@ func Test_Agent_request_errors(t *testing.T) { // Start test server srv, err := startTestServer(t, ctx) require.NoError(t, err) + ctx = testlog.SetLogger(t).WithContext(ctx) cli := cleanhttp.DefaultClient() t.Run("no auth", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/enroll", strings.NewReader(enrollBody)) require.NoError(t, err) req.Header.Set("User-Agent", "elastic agent "+serverVersion) @@ -968,6 +983,7 @@ func Test_Agent_request_errors(t *testing.T) { require.Equal(t, http.StatusBadRequest, res.StatusCode) }) t.Run("bad path", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/temporary", strings.NewReader(enrollBody)) require.NoError(t, err) req.Header.Set("Authorization", "ApiKey "+srv.enrollKey) @@ -979,6 +995,7 @@ func Test_Agent_request_errors(t *testing.T) { require.Equal(t, http.StatusNotFound, res.StatusCode) }) t.Run("wrong method", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) req, err := http.NewRequestWithContext(ctx, "PUT", srv.baseURL()+"/api/fleet/agents/enroll", strings.NewReader(enrollBody)) require.NoError(t, err) req.Header.Set("Authorization", "ApiKey "+srv.enrollKey) @@ -990,6 +1007,7 @@ func Test_Agent_request_errors(t *testing.T) { require.Equal(t, http.StatusMethodNotAllowed, res.StatusCode) }) t.Run("no body", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/enroll", nil) require.NoError(t, err) req.Header.Set("Authorization", "ApiKey "+srv.enrollKey) @@ -1001,6 +1019,7 @@ func Test_Agent_request_errors(t *testing.T) { require.Equal(t, http.StatusInternalServerError, res.StatusCode) }) t.Run("no user agent", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/enroll", strings.NewReader(enrollBody)) require.NoError(t, err) req.Header.Set("Authorization", "ApiKey "+srv.enrollKey) @@ -1011,6 +1030,7 @@ func Test_Agent_request_errors(t *testing.T) { require.Equal(t, http.StatusBadRequest, res.StatusCode) }) t.Run("bad user agent", func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(ctx) req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/enroll", strings.NewReader(enrollBody)) require.NoError(t, err) req.Header.Set("Authorization", "ApiKey "+srv.enrollKey) @@ -1030,6 +1050,7 @@ func Test_SmokeTest_CheckinPollTimeout(t *testing.T) { // Start test server srv, err := startTestServer(t, ctx) require.NoError(t, err) + ctx = testlog.SetLogger(t).WithContext(ctx) cli := cleanhttp.DefaultClient() diff --git a/internal/pkg/server/fleet_secrets_integration_test.go b/internal/pkg/server/fleet_secrets_integration_test.go index 1fe27960c..3adedea6e 100644 --- a/internal/pkg/server/fleet_secrets_integration_test.go +++ b/internal/pkg/server/fleet_secrets_integration_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/model" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) type SecretResponse struct { @@ -158,6 +159,7 @@ func Test_Agent_Policy_Secrets(t *testing.T) { // Start test server srv, err := startTestServer(t, ctx) require.NoError(t, err) + ctx = testlog.SetLogger(t).WithContext(ctx) // create secret with kibana_system user secretID := createSecret(t, ctx, srv.bulker) diff --git a/internal/pkg/server/fleet_test.go b/internal/pkg/server/fleet_test.go index 1c304b1fe..3342eba58 100644 --- a/internal/pkg/server/fleet_test.go +++ b/internal/pkg/server/fleet_test.go @@ -5,9 +5,11 @@ package server import ( + "context" "testing" "github.com/elastic/fleet-server/v7/internal/pkg/config" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" "github.com/stretchr/testify/assert" ) @@ -108,7 +110,8 @@ func Test_configChangedServer(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - changed := configChangedServer(cfg, tc.cfg) + log := testlog.SetLogger(t) + changed := configChangedServer(log, cfg, tc.cfg) assert.Equal(t, changed, tc.changed) }) } @@ -141,9 +144,10 @@ func Test_initTracer(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(context.Background()) f := Fleet{} t.Setenv("ELASTIC_APM_ACTIVE", tc.apmActiveEnvVariable) - tarcer, err := f.initTracer(tc.cfg) + tarcer, err := f.initTracer(ctx, tc.cfg) assert.Nil(t, err) if tc.expectTracer { diff --git a/internal/pkg/signal/signal.go b/internal/pkg/signal/signal.go index 05eda759e..007bd854f 100644 --- a/internal/pkg/signal/signal.go +++ b/internal/pkg/signal/signal.go @@ -11,12 +11,13 @@ import ( "os/signal" "syscall" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) // HandleInterrupt will wrap and return a context that is cancelled when the process receives a SIGINT or SIGTERM. func HandleInterrupt(ctx context.Context) context.Context { ctx, cfunc := context.WithCancel(ctx) + log := zerolog.Ctx(ctx) log.Debug().Msg("Install signal handlers for SIGINT and SIGTERM") sigs := make(chan os.Signal, 1) diff --git a/internal/pkg/state/reporter.go b/internal/pkg/state/reporter.go index e2224c80f..d0aeaf877 100644 --- a/internal/pkg/state/reporter.go +++ b/internal/pkg/state/reporter.go @@ -6,8 +6,10 @@ package state import ( + "context" + "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) // Reporter is interface that reports updated state on. @@ -26,7 +28,7 @@ func NewLog() *Log { // UpdateState triggers updating the state. func (l *Log) UpdateState(state client.UnitState, message string, _ map[string]interface{}) error { - log.Info().Str("state", state.String()).Msg(message) + zerolog.Ctx(context.TODO()).Info().Str("state", state.String()).Msg(message) return nil } diff --git a/internal/pkg/testing/esutil/datastream.go b/internal/pkg/testing/esutil/datastream.go index 91b5c2ab2..5cbe74a29 100644 --- a/internal/pkg/testing/esutil/datastream.go +++ b/internal/pkg/testing/esutil/datastream.go @@ -11,7 +11,7 @@ import ( "fmt" "github.com/elastic/go-elasticsearch/v8" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) func CreateDatastream(ctx context.Context, cli *elasticsearch.Client, name string) error { @@ -28,7 +28,7 @@ func CreateDatastream(ctx context.Context, cli *elasticsearch.Client, name strin err = checkResponseError(res) if err != nil { if errors.Is(err, ErrResourceAlreadyExists) { - log.Info().Str("name", name).Msg("Datastream already exists") + zerolog.Ctx(ctx).Info().Str("name", name).Msg("Datastream already exists") return nil } return err diff --git a/internal/pkg/testing/esutil/esutil.go b/internal/pkg/testing/esutil/esutil.go index a1c7dc4fb..60a3eac69 100644 --- a/internal/pkg/testing/esutil/esutil.go +++ b/internal/pkg/testing/esutil/esutil.go @@ -5,6 +5,7 @@ package esutil import ( + "context" "encoding/json" "errors" "fmt" @@ -13,7 +14,7 @@ import ( "strings" "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) const ( @@ -104,7 +105,7 @@ func parseResponseError(res *esapi.Response) (*errorResponse, error) { // Unexpected error, probably from the cloud deployment, not elasticsearch API response if eres.Status == 0 { - log.Warn(). + zerolog.Ctx(context.TODO()).Warn(). Int("status", eres.Status). Str("type", eres.Error.Type). Str("reason", eres.Error.Reason).Msg("ES client response error") diff --git a/internal/pkg/testing/esutil/ilm.go b/internal/pkg/testing/esutil/ilm.go index ec4b3ceb6..76754d5c2 100644 --- a/internal/pkg/testing/esutil/ilm.go +++ b/internal/pkg/testing/esutil/ilm.go @@ -14,7 +14,6 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" ) // Can be cleaner but it's temporary bootstrap until it's moved to the elasticseach system index plugin @@ -37,7 +36,7 @@ const ( func EnsureILMPolicy(ctx context.Context, cli *elasticsearch.Client, name string) error { policy := GetILMPolicyName(name) - lg := log.With().Str("policy", policy).Logger() + lg := zerolog.Ctx(ctx).With().Str("policy", policy).Logger() res, err := cli.ILM.GetLifecycle( cli.ILM.GetLifecycle.WithPolicy(policy), @@ -148,7 +147,7 @@ func createILMPolicy(ctx context.Context, cli *elasticsearch.Client, name string // The elastic will respond with an error if the ILM policy doesn't exists // in that case let's just create the ILM policy - log.Debug().Str("policy", policy).Str("body", body).Msg("Creating ILM policy") + zerolog.Ctx(ctx).Debug().Str("policy", policy).Str("body", body).Msg("Creating ILM policy") res, err := cli.ILM.PutLifecycle(policy, cli.ILM.PutLifecycle.WithBody(strings.NewReader(body)), diff --git a/internal/pkg/testing/esutil/index.go b/internal/pkg/testing/esutil/index.go index ea217bb18..aafcb8096 100644 --- a/internal/pkg/testing/esutil/index.go +++ b/internal/pkg/testing/esutil/index.go @@ -11,7 +11,7 @@ import ( "fmt" "github.com/elastic/go-elasticsearch/v8" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) func CreateIndex(ctx context.Context, cli *elasticsearch.Client, name string) error { @@ -28,7 +28,7 @@ func CreateIndex(ctx context.Context, cli *elasticsearch.Client, name string) er err = checkResponseError(res) if err != nil { if errors.Is(err, ErrResourceAlreadyExists) { - log.Info().Str("name", name).Msg("Index already exists") + zerolog.Ctx(ctx).Info().Str("name", name).Msg("Index already exists") return nil } return err diff --git a/internal/pkg/testing/esutil/template.go b/internal/pkg/testing/esutil/template.go index 3952703c2..5ca5dfeab 100644 --- a/internal/pkg/testing/esutil/template.go +++ b/internal/pkg/testing/esutil/template.go @@ -13,7 +13,7 @@ import ( "strings" "github.com/elastic/go-elasticsearch/v8" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) const ( @@ -88,17 +88,17 @@ func EnsureTemplate(ctx context.Context, cli *elasticsearch.Client, name, mappin } // Check settings - log.Debug().Interface("settings", template.Settings).Msg("Found existing settings") + zerolog.Ctx(ctx).Debug().Interface("settings", template.Settings).Msg("Found existing settings") if template.Version >= templateVersion { - log.Info(). + zerolog.Ctx(ctx).Info(). Int("current templated version", template.Version). Int("new template version", templateVersion). Msg("Skipping template creation because upstream version") return nil } - log.Info(). + zerolog.Ctx(ctx).Info(). Int("current templated version", template.Version). Int("new template version", templateVersion). Msg("Creating template") @@ -107,8 +107,7 @@ func EnsureTemplate(ctx context.Context, cli *elasticsearch.Client, name, mappin } func createTemplate(ctx context.Context, cli *elasticsearch.Client, name string, templateVersion int, settings, mapping string, ilm bool) error { - - log.Info().Str("name", name).Msg("Create template") + zerolog.Ctx(ctx).Info().Str("name", name).Msg("Create template") datastream := "" if ilm { @@ -128,7 +127,7 @@ func createTemplate(ctx context.Context, cli *elasticsearch.Client, name string, err = checkResponseError(res) if err != nil { if errors.Is(err, ErrResourceAlreadyExists) { - log.Info().Str("name", name).Msg("Index template already exists") + zerolog.Ctx(ctx).Info().Str("name", name).Msg("Index template already exists") return nil } return err diff --git a/internal/pkg/testing/log/log.go b/internal/pkg/testing/log/log.go index 2ac422ec5..386bdc6bf 100644 --- a/internal/pkg/testing/log/log.go +++ b/internal/pkg/testing/log/log.go @@ -15,7 +15,6 @@ import ( func SetLogger(tb testing.TB) zerolog.Logger { tb.Helper() tw := zerolog.TestWriter{T: tb, Frame: 4} - logger := zerolog.New(tw).Level(zerolog.DebugLevel) - //zl.Logger = logger // causing data races - return logger + log := zerolog.New(tw).Level(zerolog.DebugLevel) + return log } diff --git a/internal/pkg/throttle/throttle.go b/internal/pkg/throttle/throttle.go index 87af57ab0..9faec11e1 100644 --- a/internal/pkg/throttle/throttle.go +++ b/internal/pkg/throttle/throttle.go @@ -8,10 +8,11 @@ package throttle import ( + "context" "sync" "time" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) // Token indicates successful access. @@ -51,6 +52,7 @@ func (tt *Throttle) Acquire(key string, ttl time.Duration) *Token { tt.mut.Lock() defer tt.mut.Unlock() + log := zerolog.Ctx(context.TODO()) if tt.checkAtMaxPending(key) { log.Trace(). Str("key", key). @@ -107,6 +109,7 @@ func (tt *Throttle) checkAtMaxPending(key string) bool { now := time.Now() + log := zerolog.Ctx(context.TODO()) // Try to eject the target key first if state, ok := tt.tokenMap[key]; ok && state.expire.Before(now) { delete(tt.tokenMap, key) @@ -138,6 +141,7 @@ func (tt *Throttle) release(id uint64, key string) bool { tt.mut.Lock() defer tt.mut.Unlock() + log := zerolog.Ctx(context.TODO()) state, ok := tt.tokenMap[key] if !ok { log.Trace().Uint64("id", id).Str("key", key).Msg("Token not found to release") diff --git a/internal/pkg/throttle/throttle_test.go b/internal/pkg/throttle/throttle_test.go index 4969dc5f4..630e9ed0d 100644 --- a/internal/pkg/throttle/throttle_test.go +++ b/internal/pkg/throttle/throttle_test.go @@ -11,11 +11,12 @@ import ( "time" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) func TestThrottleZero(t *testing.T) { - log.Logger = testlog.SetLogger(t) + l := testlog.SetLogger(t) + zerolog.DefaultContextLogger = &l // Zero max parallel means we can acquire as many as we want, // but still cannot acquire existing that has not timed out @@ -84,7 +85,8 @@ func TestThrottleZero(t *testing.T) { } func TestThrottleN(t *testing.T) { - log.Logger = testlog.SetLogger(t) + l := testlog.SetLogger(t) + zerolog.DefaultContextLogger = &l for N := 1; N < 11; N++ { @@ -153,7 +155,8 @@ func TestThrottleN(t *testing.T) { } func TestThrottleExpireIdentity(t *testing.T) { - log.Logger = testlog.SetLogger(t) + l := testlog.SetLogger(t) + zerolog.DefaultContextLogger = &l throttle := NewThrottle(1) @@ -189,7 +192,8 @@ func TestThrottleExpireIdentity(t *testing.T) { // Test that a token from a different key is expired when at max func TestThrottleExpireAtMax(t *testing.T) { - log.Logger = testlog.SetLogger(t) + l := testlog.SetLogger(t) + zerolog.DefaultContextLogger = &l throttle := NewThrottle(1) diff --git a/internal/pkg/ver/check.go b/internal/pkg/ver/check.go index 60e8e4941..313c212af 100644 --- a/internal/pkg/ver/check.go +++ b/internal/pkg/ver/check.go @@ -14,9 +14,9 @@ import ( "strings" esh "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/rs/zerolog" "github.com/hashicorp/go-version" - "github.com/rs/zerolog/log" "github.com/elastic/go-elasticsearch/v8" ) @@ -30,23 +30,23 @@ var ( // CheckCompatiblility will check the remote Elasticsearch version retrieved by the Elasticsearch client with the passed fleet version. // Versions are compatible when Elasticsearch's version is greater then or equal to fleet-server's version func CheckCompatibility(ctx context.Context, esCli *elasticsearch.Client, fleetVersion string) (string, error) { - log.Debug().Str("fleet_version", fleetVersion).Msg("check version compatibility with elasticsearch") + zerolog.Ctx(ctx).Debug().Str("fleet_version", fleetVersion).Msg("check version compatibility with elasticsearch") esVersion, err := esh.FetchESVersion(ctx, esCli) if err != nil { - log.Error().Err(err).Msg("failed to fetch elasticsearch version") + zerolog.Ctx(ctx).Error().Err(err).Msg("failed to fetch elasticsearch version") return "", err } - log.Debug().Str("elasticsearch_version", esVersion).Msg("fetched elasticsearch version") + zerolog.Ctx(ctx).Debug().Str("elasticsearch_version", esVersion).Msg("fetched elasticsearch version") - return esVersion, checkCompatibility(fleetVersion, esVersion) + return esVersion, checkCompatibility(ctx, fleetVersion, esVersion) } -func checkCompatibility(fleetVersion, esVersion string) error { +func checkCompatibility(ctx context.Context, fleetVersion, esVersion string) error { verConst, err := buildVersionConstraint(fleetVersion) if err != nil { - log.Error().Err(err).Str("fleet_version", fleetVersion).Msg("failed to build constraint") + zerolog.Ctx(ctx).Error().Err(err).Str("fleet_version", fleetVersion).Msg("failed to build constraint") return err } @@ -56,14 +56,14 @@ func checkCompatibility(fleetVersion, esVersion string) error { } if !verConst.Check(ver) { - log.Error(). + zerolog.Ctx(ctx).Error(). Err(ErrUnsupportedVersion). Str("constraint", verConst.String()). Str("reported", ver.String()). Msg("failed elasticsearch version check") return ErrUnsupportedVersion } - log.Info().Str("fleet_version", fleetVersion).Str("elasticsearch_version", esVersion).Msg("Elasticsearch compatibility check successful") + zerolog.Ctx(ctx).Info().Str("fleet_version", fleetVersion).Str("elasticsearch_version", esVersion).Msg("Elasticsearch compatibility check successful") return nil } @@ -91,7 +91,7 @@ func minimizePatch(ver *version.Version) string { func parseVersion(sver string) (*version.Version, error) { ver, err := version.NewVersion(strings.Split(sver, "-")[0]) if err != nil { - return nil, fmt.Errorf("%v: %w", err, ErrMalformedVersion) + return nil, fmt.Errorf("%w: %w", err, ErrMalformedVersion) } return ver, nil } diff --git a/internal/pkg/ver/check_test.go b/internal/pkg/ver/check_test.go index 7422722a6..b58e7a665 100644 --- a/internal/pkg/ver/check_test.go +++ b/internal/pkg/ver/check_test.go @@ -5,11 +5,11 @@ package ver import ( + "context" "errors" "testing" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" - "github.com/rs/zerolog/log" ) func TestCheckCompatibilityInternal(t *testing.T) { @@ -88,8 +88,8 @@ func TestCheckCompatibilityInternal(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - log.Logger = testlog.SetLogger(t) - err := checkCompatibility(tc.fleetVersion, tc.esVersion) + ctx := testlog.SetLogger(t).WithContext(context.Background()) + err := checkCompatibility(ctx, tc.fleetVersion, tc.esVersion) if tc.err != nil { if err == nil { t.Error("expected error")