Skip to content

Commit

Permalink
Stop using zerolog/log.Logger and use logger from context instead (el…
Browse files Browse the repository at this point in the history
…astic#3078)

Stop using the global zerolog logger and use one that is passed through the context instead.
Set context default to the same as the global-logger (the one that is controlled through the fleet-server's logger package) so that when fleet-server is running normally there is no change, however it is much easier to change for test cases so we can have log output that's tied to the specific test making it easier to track any errors in tests.
Change how the fleet-server logger package reloads config when adjusting for new output
  • Loading branch information
michel-laterman authored Nov 7, 2023
1 parent 464141d commit 0d439e6
Show file tree
Hide file tree
Showing 85 changed files with 490 additions and 338 deletions.
22 changes: 11 additions & 11 deletions internal/pkg/action/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
"golang.org/x/time/rate"

"github.com/rs/zerolog/log"
"github.com/rs/zerolog"
"golang.org/x/time/rate"
)

// Sub is an action subscription that will give a single agent all of it's actions.
Expand Down Expand Up @@ -84,7 +84,7 @@ func (d *Dispatcher) Subscribe(agentID string, seqNo sqn.SeqNo) *Sub {
sz := len(d.subs)
d.mx.Unlock()

log.Trace().Str(logger.AgentID, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher")
zerolog.Ctx(context.TODO()).Trace().Str(logger.AgentID, agentID).Int("sz", sz).Msg("Subscribed to action dispatcher")

return &sub
}
Expand All @@ -101,7 +101,7 @@ func (d *Dispatcher) Unsubscribe(sub *Sub) {
sz := len(d.subs)
d.mx.Unlock()

log.Trace().Str(logger.AgentID, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher")
zerolog.Ctx(context.TODO()).Trace().Str(logger.AgentID, sub.agentID).Int("sz", sz).Msg("Unsubscribed from action dispatcher")
}

// process gathers actions from the monitor and dispatches them to the corresponding subscriptions.
Expand All @@ -114,14 +114,14 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) {
var action model.Action
err := hit.Unmarshal(&action)
if err != nil {
log.Error().Err(err).Msg("Failed to unmarshal action document")
zerolog.Ctx(ctx).Error().Err(err).Msg("Failed to unmarshal action document")
break
}
numAgents := len(action.Agents)
for i, agentID := range action.Agents {
arr := agentActions[agentID]
actionNoAgents := action
actionNoAgents.StartTime = offsetStartTime(action.StartTime, action.RolloutDurationSeconds, i, numAgents)
actionNoAgents.StartTime = offsetStartTime(ctx, action.StartTime, action.RolloutDurationSeconds, i, numAgents)
actionNoAgents.Agents = nil
arr = append(arr, actionNoAgents)
agentActions[agentID] = arr
Expand All @@ -130,7 +130,7 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) {

for agentID, actions := range agentActions {
if err := d.limit.Wait(ctx); err != nil {
log.Error().Err(err).Msg("action dispatcher rate limit error")
zerolog.Ctx(ctx).Error().Err(err).Msg("action dispatcher rate limit error")
return
}
d.dispatch(ctx, agentID, actions)
Expand All @@ -139,14 +139,14 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) {

// offsetStartTime will return a new start time between start:start+dur based on index i and the total number of agents
// As we expect i < total the latest return time will always be < start+dur
func offsetStartTime(start string, dur int64, i, total int) string {
func offsetStartTime(ctx context.Context, start string, dur int64, i, total int) string {

if start == "" {
return ""
}
startTS, err := time.Parse(time.RFC3339, start)
if err != nil {
log.Error().Err(err).Msg("unable to parse start_time string")
zerolog.Ctx(ctx).Error().Err(err).Msg("unable to parse start_time string")
return ""
}
d := time.Second * time.Duration(dur)
Expand All @@ -164,10 +164,10 @@ func (d *Dispatcher) getSub(agentID string) (Sub, bool) {

// dispatch passes the actions into the subscription channel as a non-blocking operation.
// It may drop actions that will be re-sent to the agent on its next check in.
func (d *Dispatcher) dispatch(_ context.Context, agentID string, acdocs []model.Action) {
func (d *Dispatcher) dispatch(ctx context.Context, agentID string, acdocs []model.Action) {
sub, ok := d.getSub(agentID)
if !ok {
log.Debug().Str(logger.AgentID, agentID).Msg("Agent is not currently connected. Not dispatching actions.")
zerolog.Ctx(ctx).Debug().Str(logger.AgentID, agentID).Msg("Agent is not currently connected. Not dispatching actions.")
return
}
select {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/action/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func Test_offsetStartTime(t *testing.T) {
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := offsetStartTime(tt.start, tt.dur, tt.i, tt.total)
r := offsetStartTime(context.Background(), tt.start, tt.dur, tt.i, tt.total)
assert.Equal(t, tt.result, r)
})
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/action/token_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/dl"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog"
)

const cacheSize = 5000
Expand Down Expand Up @@ -43,7 +43,7 @@ func (r *TokenResolver) Resolve(ctx context.Context, token string) (int64, error
return 0, dl.ErrNotFound
}
if v, ok := r.cache.Get(token); ok {
log.Debug().Str("token", token).Int64("seqno", v).Msg("Found token cached")
zerolog.Ctx(ctx).Debug().Str("token", token).Int64("seqno", v).Msg("Found token cached")
return v, nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/handleFileDelivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package api

import (
"context"
"errors"
"net/http"
"strconv"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/go-elasticsearch/v8"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

type FileDeliveryT struct {
Expand All @@ -28,7 +28,7 @@ type FileDeliveryT struct {
}

func NewFileDeliveryT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *FileDeliveryT {
log.Info().
zerolog.Ctx(context.TODO()).Info().
Interface("limits", cfg.Limits.ArtifactLimit).
Int64("maxFileSize", maxFileSize).
Msg("upload limits")
Expand Down
3 changes: 1 addition & 2 deletions internal/pkg/api/handleUpload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/go-elasticsearch/v8"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.elastic.co/apm/v2"
)

Expand Down Expand Up @@ -54,7 +53,7 @@ type UploadT struct {
}

func NewUploadT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *UploadT {
log.Info().
zerolog.Ctx(context.TODO()).Info().
Interface("limits", cfg.Limits.ArtifactLimit).
Int64("maxFileSize", maxFileSize).
Msg("upload limits")
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/elastic/elastic-agent-system-metrics/report"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog"
apmprometheus "go.elastic.co/apm/module/apmprometheus/v2"
"go.elastic.co/apm/v2"

Expand Down Expand Up @@ -55,7 +55,7 @@ var (
func init() {
err := report.SetupMetrics(logger.NewZapStub("instance-metrics"), build.ServiceName, version.DefaultVersion)
if err != nil {
log.Error().Err(err).Msg("unable to initialize metrics")
zerolog.Ctx(context.TODO()).Error().Err(err).Msg("unable to initialize metrics")
}

registry = newMetricsRegistry("http_server")
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/api/metrics_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/elastic/fleet-server/v7/internal/pkg/build"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"

"github.com/stretchr/testify/require"
)

Expand All @@ -29,6 +31,7 @@ func TestMetricsEndpoints(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = testlog.SetLogger(t).WithContext(ctx)

srv, err := InitMetrics(ctx, cfg, bi, nil)
require.NoError(t, err, "unable to start metrics server")
Expand Down
34 changes: 18 additions & 16 deletions internal/pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"go.elastic.co/apm/v2"

"github.com/rs/zerolog/log"
"github.com/rs/zerolog"
)

type server struct {
Expand Down Expand Up @@ -71,7 +71,7 @@ func (s *server) Run(ctx context.Context) error {
IdleTimeout: idle,
MaxHeaderBytes: mhbz,
BaseContext: func(net.Listener) context.Context { return ctx },
ErrorLog: errLogger(),
ErrorLog: errLogger(ctx),
ConnState: diagConn,
}

Expand All @@ -82,13 +82,13 @@ func (s *server) Run(ctx context.Context) error {
go func() {
select {
case <-ctx.Done():
log.Debug().Msg("force server close on ctx.Done()")
zerolog.Ctx(ctx).Debug().Msg("force server close on ctx.Done()")
err := srv.Close()
if err != nil {
log.Error().Err(err).Msg("error while closing server")
zerolog.Ctx(ctx).Error().Err(err).Msg("error while closing server")
}
case <-forceCh:
log.Debug().Msg("go routine forced closed on exit")
zerolog.Ctx(ctx).Debug().Msg("go routine forced closed on exit")
}
}()

Expand All @@ -102,7 +102,7 @@ func (s *server) Run(ctx context.Context) error {
defer func() {
err := ln.Close()
if err != nil {
log.Warn().Err(err).Msg("server.Run: error while closing listener.")
zerolog.Ctx(ctx).Warn().Err(err).Msg("server.Run: error while closing listener.")
}
}()

Expand All @@ -127,15 +127,15 @@ func (s *server) Run(ctx context.Context) error {
ln = tls.NewListener(ln, srv.TLSConfig)

} else {
log.Warn().Msg("Exposed over insecure HTTP; enablement of TLS is strongly recommended")
zerolog.Ctx(ctx).Warn().Msg("Exposed over insecure HTTP; enablement of TLS is strongly recommended")
}

errCh := make(chan error)
baseCtx, cancel := context.WithCancel(ctx)
defer cancel()

go func(_ context.Context, errCh chan error, ln net.Listener) {
log.Info().Msgf("Listening on %s", s.addr)
go func(ctx context.Context, errCh chan error, ln net.Listener) {
zerolog.Ctx(ctx).Info().Msgf("Listening on %s", s.addr)
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- err
}
Expand All @@ -157,7 +157,7 @@ func diagConn(c net.Conn, s http.ConnState) {
return
}

log.Trace().
zerolog.Ctx(context.TODO()).Trace().
Str("local", c.LocalAddr().String()).
Str("remote", c.RemoteAddr().String()).
Str("state", s.String()).
Expand All @@ -173,31 +173,33 @@ func diagConn(c net.Conn, s http.ConnState) {
}
}

func wrapConnLimitter(_ context.Context, ln net.Listener, cfg *config.Server) net.Listener {
func wrapConnLimitter(ctx context.Context, ln net.Listener, cfg *config.Server) net.Listener {
hardLimit := cfg.Limits.MaxConnections

if hardLimit != 0 {
log.Info().
zerolog.Ctx(ctx).Info().
Int("hardConnLimit", hardLimit).
Msg("server hard connection limiter installed")

ln = limit.Listener(ln, hardLimit)
} else {
log.Info().Msg("server hard connection limiter disabled")
zerolog.Ctx(ctx).Info().Msg("server hard connection limiter disabled")
}

return ln
}

type stubLogger struct {
log zerolog.Logger
}

func (s *stubLogger) Write(p []byte) (n int, err error) {
log.Error().Bytes(logger.ECSMessage, p).Send()
s.log.Error().Bytes(logger.ECSMessage, p).Send()
return len(p), nil
}

func errLogger() *slog.Logger {
stub := &stubLogger{}
func errLogger(ctx context.Context) *slog.Logger {
log := zerolog.Ctx(ctx)
stub := &stubLogger{*log}
return slog.New(stub, "", 0)
}
5 changes: 5 additions & 0 deletions internal/pkg/apikey/apikey_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"errors"
"testing"

testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"

"github.com/elastic/go-elasticsearch/v8"
"github.com/gofrs/uuid"
"github.com/google/go-cmp/cmp"
Expand All @@ -32,6 +34,7 @@ const testFleetRoles = `
func TestRead_existingKey(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
ctx = testlog.SetLogger(t).WithContext(ctx)

cfg := elasticsearch.Config{
Username: "elastic",
Expand Down Expand Up @@ -83,6 +86,7 @@ func TestRead_existingKey(t *testing.T) {
func TestRead_noKey(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
ctx = testlog.SetLogger(t).WithContext(ctx)

cfg := elasticsearch.Config{
Username: "elastic",
Expand Down Expand Up @@ -114,6 +118,7 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
ctx = testlog.SetLogger(t).WithContext(ctx)

cfg := elasticsearch.Config{
Username: "elastic",
Expand Down
Loading

0 comments on commit 0d439e6

Please sign in to comment.