diff --git a/pkg/agent/v2/agent.go b/pkg/agent/v2/agent.go index b809c76c1c..4e3b0acb4f 100644 --- a/pkg/agent/v2/agent.go +++ b/pkg/agent/v2/agent.go @@ -137,7 +137,8 @@ func New(ctx context.Context, conf *v1beta1.AgentConfig, opts ...AgentOption) (* if conf.Spec.LogLevel != "" { level = logger.ParseLevel(conf.Spec.LogLevel) } - lg := logger.New(logger.WithLogLevel(level), logger.WithFileWriter(logger.WriteOnlyFile(id))).WithGroup("agent") + fileWriter := logger.InitPluginWriter(id) + lg := logger.New(logger.WithLogLevel(level), logger.WithFileWriter(fileWriter)).WithGroup("agent") lg.Debug(fmt.Sprintf("using log level: %s", level.String())) var pl *plugins.PluginLoader diff --git a/pkg/logger/color_handler.go b/pkg/logger/color_handler.go index 4b03cad0b5..3fea78e21b 100644 --- a/pkg/logger/color_handler.go +++ b/pkg/logger/color_handler.go @@ -189,22 +189,22 @@ func (h *colorHandler) appendLevel(buf *buffer, level slog.Level) { switch { case level < slog.LevelInfo: buf.WriteStringIf(h.colorEnabled, ansiBrightMagenta) - buf.WriteString("DEBUG") + buf.WriteString(levelString[0]) appendLevelDelta(buf, level-slog.LevelDebug) buf.WriteStringIf(h.colorEnabled, ansiReset) case level < slog.LevelWarn: buf.WriteStringIf(h.colorEnabled, ansiBrightBlue) - buf.WriteString("INFO") + buf.WriteString(levelString[1]) appendLevelDelta(buf, level-slog.LevelInfo) buf.WriteStringIf(h.colorEnabled, ansiReset) case level < slog.LevelError: buf.WriteStringIf(h.colorEnabled, ansiBrightYellow) - buf.WriteString("WARN") + buf.WriteString(levelString[2]) appendLevelDelta(buf, level-slog.LevelWarn) buf.WriteStringIf(h.colorEnabled, ansiReset) default: buf.WriteStringIf(h.colorEnabled, ansiBrightRed) - buf.WriteString("ERROR") + buf.WriteString(levelString[3]) appendLevelDelta(buf, level-slog.LevelError) buf.WriteStringIf(h.colorEnabled, ansiReset) } diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index fcae5e0464..195917756a 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -6,16 +6,23 @@ import ( "io" "log/slog" "os" + "sync" "time" "github.com/go-logr/logr" "github.com/go-logr/logr/slogr" - "github.com/kralicky/gpkg/sync" + gpkgsync "github.com/kralicky/gpkg/sync" slogmulti "github.com/samber/slog-multi" slogsampling "github.com/samber/slog-sampling" "github.com/spf13/afero" ) +const ( + pluginGroupPrefix = "plugin" + NoRepeatInterval = 3600 * time.Hour // arbitrarily long time to denote one-time sampling + errKey = "err" +) + var ( asciiLogo = ` _ ____ ____ ____ (_) @@ -28,15 +35,15 @@ var ( DefaultLogLevel = slog.LevelDebug DefaultWriter io.Writer DefaultAddSource = true - pluginGroupPrefix = "plugin" - NoRepeatInterval = 3600 * time.Hour // arbitrarily long time to denote one-time sampling logFs afero.Fs DefaultTimeFormat = "2006 Jan 02 15:04:05" - errKey = "err" + logSampler = &sampler{} + PluginFileWriter = &remoteFileWriter{ + mu: &sync.Mutex{}, + } + levelString = []string{"DEBUG", "INFO", "WARN", "ERROR"} ) -var logSampler = &sampler{} - func init() { logFs = afero.NewMemMapFs() } @@ -190,7 +197,6 @@ func colorHandlerWithOptions(opts ...LoggerOption) slog.Handler { if options.FileWriter != nil { logFileHandler := newProtoHandler(options.FileWriter, ConfigureProtoOptions(options)) - // distribute logs to handlers in parallel return slogmulti.Fanout(handler, logFileHandler) } @@ -218,13 +224,11 @@ func NewNop() *slog.Logger { } func NewPluginLogger(opts ...LoggerOption) *slog.Logger { - opts = append(opts, WithFileWriter(sharedPluginWriter)) - return New(opts...).WithGroup(pluginGroupPrefix) } type sampler struct { - dropped sync.Map[string, uint64] + dropped gpkgsync.Map[string, uint64] } func (s *sampler) onDroppedHook(_ context.Context, r slog.Record) { @@ -248,11 +252,3 @@ func WriteOnlyFile(clusterID string) afero.File { } return f } - -func GetFileIfExists(clusterID string) afero.File { - f, err := logFs.Open(clusterID) - if err != nil { - return nil - } - return f -} diff --git a/pkg/logger/plugin_logger.go b/pkg/logger/plugin_logger.go new file mode 100644 index 0000000000..20f27ac110 --- /dev/null +++ b/pkg/logger/plugin_logger.go @@ -0,0 +1,96 @@ +package logger + +import ( + "io" + "log/slog" + "regexp" + "strings" + "sync" +) + +const ( + numGoPluginSegments = 4 // hclog timestamp + hclog level + go-plugin name + log message + logDelimiter = " " +) + +var ( + logLevelPattern = `\(` + strings.Join(levelString, "|") + `\)` + pluginNamePattern = pluginGroupPrefix + `\.\w+` +) + +// forwards plugin logs to their hosts. Retrieve these logs with the "debug" cli +type remoteFileWriter struct { + logForwarder *slog.Logger + w io.Writer + mu *sync.Mutex +} + +func InitPluginWriter(agentId string) io.Writer { + PluginFileWriter.mu.Lock() + defer PluginFileWriter.mu.Unlock() + + if PluginFileWriter.w != nil { + return PluginFileWriter.w + } + + f := WriteOnlyFile(agentId) + writer := f.(io.Writer) + PluginFileWriter.w = writer + PluginFileWriter.logForwarder = newPluginLogForwarder(writer).WithGroup("forwarded") + return writer +} + +func (f remoteFileWriter) Write(b []byte) (int, error) { + if f.w == nil || f.logForwarder == nil { + return 0, nil + } + + // workaround to remove hclog's default DEBUG prefix on all non-hclog logs + prefixes := strings.SplitN(string(b), logDelimiter, numGoPluginSegments) + if len(prefixes) != numGoPluginSegments { + f.logForwarder.Info(string(b)) + return len(b), nil + } + + forwardedLog := prefixes[numGoPluginSegments-1] + + levelPattern := regexp.MustCompile(logLevelPattern) + level := levelPattern.FindString(forwardedLog) + + namePattern := regexp.MustCompile(pluginNamePattern) + loggerName := namePattern.FindString(forwardedLog) + + namedLogger := f.logForwarder.WithGroup(loggerName) + + switch level { + case levelString[0]: + namedLogger.Debug(forwardedLog) + case levelString[1]: + namedLogger.Info(forwardedLog) + case levelString[2]: + namedLogger.Warn(forwardedLog) + case levelString[3]: + namedLogger.Error(forwardedLog) + default: + namedLogger.Debug(forwardedLog) + } + + return len(b), nil +} + +func newPluginLogForwarder(w io.Writer) *slog.Logger { + dropForwardedPrefixes := func(groups []string, a slog.Attr) slog.Attr { + if a.Key == slog.SourceKey { + return slog.Attr{} + } + return a + } + + options := &slog.HandlerOptions{ + AddSource: true, + Level: slog.LevelDebug, + ReplaceAttr: dropForwardedPrefixes, + } + + return slog.New(newProtoHandler(w, options)) +} diff --git a/pkg/logger/plugin_writer.go b/pkg/logger/plugin_writer.go deleted file mode 100644 index 597df3bf85..0000000000 --- a/pkg/logger/plugin_writer.go +++ /dev/null @@ -1,44 +0,0 @@ -package logger - -import ( - "io" - "sync" -) - -var sharedPluginWriter = &pluginWriter{ - mu: &sync.Mutex{}, -} - -type pluginWriter struct { - w *io.Writer - mu *sync.Mutex -} - -func InitPluginWriter(agentId string) { - sharedPluginWriter.mu.Lock() - defer sharedPluginWriter.mu.Unlock() - if sharedPluginWriter.w != nil { - return - } - f := GetFileIfExists(agentId) - if f == nil { - f = WriteOnlyFile(agentId) - } - fileWriter := f.(io.Writer) - sharedPluginWriter.w = &fileWriter -} - -func (pw pluginWriter) Write(b []byte) (int, error) { - pw.mu.Lock() - defer pw.mu.Unlock() - if pw.w == nil { - return 0, nil - } - - n, err := (*pw.w).Write(b) - if err != nil { - return n, err - } - - return n, nil -} diff --git a/pkg/logger/remotelogs/server.go b/pkg/logger/remotelogs/server.go index bcbe24b918..3f815433a6 100644 --- a/pkg/logger/remotelogs/server.go +++ b/pkg/logger/remotelogs/server.go @@ -77,7 +77,8 @@ func (ls *LogServer) StreamLogs(req *controlv1.LogStreamRequest, server controlv return nil } if err != nil { - return err + ls.logger.Warn("malformed log record, skipping", logger.Err(err)) + continue } if minLevel != nil && logger.ParseLevel(msg.Level) < slog.Level(*minLevel) { @@ -123,7 +124,6 @@ func (ls *LogServer) getLogMessage(f afero.File) (*controlv1.StructuredLogRecord } if err := proto.Unmarshal(recordBytes, record); err != nil { - ls.logger.Error("failed to unmarshal record bytes") return nil, err } diff --git a/pkg/opni/commands/admin.go b/pkg/opni/commands/admin.go index eb45cbc076..a8766fa5fd 100644 --- a/pkg/opni/commands/admin.go +++ b/pkg/opni/commands/admin.go @@ -313,7 +313,7 @@ func parseTimeOrDie(timeStr string) time.Time { } t, err := when.EN.Parse(timeStr, time.Now()) if err != nil || t == nil { - lg.Error("could not interpret start time") + lg.Error("could not interpret start/end time") os.Exit(1) } return t.Time diff --git a/pkg/plugins/client.go b/pkg/plugins/client.go index 8e4efc3c10..18a66c0346 100644 --- a/pkg/plugins/client.go +++ b/pkg/plugins/client.go @@ -10,6 +10,7 @@ import ( "github.com/rancher/opni/pkg/auth/cluster" "github.com/rancher/opni/pkg/auth/session" "github.com/rancher/opni/pkg/caching" + "github.com/rancher/opni/pkg/logger" "github.com/rancher/opni/pkg/plugins/meta" "github.com/rancher/opni/pkg/util/streams" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -51,7 +52,8 @@ func ClientConfig(md meta.PluginMeta, scheme meta.Scheme, opts ...ClientOption) AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, Managed: true, Logger: hclog.New(&hclog.LoggerOptions{ - Level: hclog.Error, + Level: hclog.Debug, + Output: logger.PluginFileWriter, }), GRPCDialOptions: []grpc.DialOption{ grpc.WithChainUnaryInterceptor( @@ -61,8 +63,7 @@ func ClientConfig(md meta.PluginMeta, scheme meta.Scheme, opts ...ClientOption) grpc.WithPerRPCCredentials(cluster.ClusterIDKey), grpc.WithPerRPCCredentials(session.AttributesKey), }, - SyncStderr: os.Stderr, - Stderr: os.Stderr, + Stderr: os.Stderr, } if options.reattach != nil { diff --git a/plugins/alerting/pkg/agent/stream.go b/plugins/alerting/pkg/agent/stream.go index e9f91017d1..10e835b77a 100644 --- a/plugins/alerting/pkg/agent/stream.go +++ b/plugins/alerting/pkg/agent/stream.go @@ -3,12 +3,10 @@ package agent import ( capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" controlv1 "github.com/rancher/opni/pkg/apis/control/v1" - "github.com/rancher/opni/pkg/logger" streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/plugins/alerting/pkg/apis/node" "github.com/rancher/opni/plugins/alerting/pkg/apis/rules" "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/emptypb" ) func (p *Plugin) StreamServers() []streamext.Server { @@ -26,8 +24,6 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { identityClient := controlv1.NewIdentityClient(cc) ruleSyncClient := rules.NewRuleSyncClient(cc) - p.configureLoggers(identityClient) - p.node.SetClients( healthListenerClient, nodeClient, @@ -36,11 +32,3 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { p.ruleStreamer.SetClients(ruleSyncClient) } - -func (p *Plugin) configureLoggers(identityClient controlv1.IdentityClient) { - id, err := identityClient.Whoami(p.ctx, &emptypb.Empty{}) - if err != nil { - p.lg.Error("error fetching node id", logger.Err(err)) - } - logger.InitPluginWriter(id.GetId()) -} diff --git a/plugins/logging/pkg/agent/stream.go b/plugins/logging/pkg/agent/stream.go index c7c73daaba..b18478bb18 100644 --- a/plugins/logging/pkg/agent/stream.go +++ b/plugins/logging/pkg/agent/stream.go @@ -2,13 +2,10 @@ package agent import ( capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" - controlv1 "github.com/rancher/opni/pkg/apis/control/v1" - "github.com/rancher/opni/pkg/logger" streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/plugins/logging/apis/node" collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1" "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/emptypb" ) func (p *Plugin) StreamServers() []streamext.Server { @@ -25,17 +22,7 @@ func (p *Plugin) StreamServers() []streamext.Server { } func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { - p.configureLoggers(cc) nodeClient := node.NewNodeLoggingCapabilityClient(cc) p.node.SetClient(nodeClient) p.otelForwarder.SetClient(cc) } - -func (p *Plugin) configureLoggers(cc grpc.ClientConnInterface) { - identityClient := controlv1.NewIdentityClient(cc) - id, err := identityClient.Whoami(p.ctx, &emptypb.Empty{}) - if err != nil { - p.logger.Error("error fetching node id", logger.Err(err)) - } - logger.InitPluginWriter(id.GetId()) -} diff --git a/plugins/metrics/pkg/agent/stream.go b/plugins/metrics/pkg/agent/stream.go index 7598038b15..73393ddc26 100644 --- a/plugins/metrics/pkg/agent/stream.go +++ b/plugins/metrics/pkg/agent/stream.go @@ -4,13 +4,11 @@ import ( capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" controlv1 "github.com/rancher/opni/pkg/apis/control/v1" "github.com/rancher/opni/pkg/clients" - "github.com/rancher/opni/pkg/logger" streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/plugins/metrics/apis/node" "github.com/rancher/opni/plugins/metrics/apis/remoteread" "github.com/rancher/opni/plugins/metrics/apis/remotewrite" "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/emptypb" ) func (p *Plugin) StreamServers() []streamext.Server { @@ -31,8 +29,6 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { healthListenerClient := controlv1.NewHealthListenerClient(cc) identityClient := controlv1.NewIdentityClient(cc) - p.configureLoggers(identityClient) - p.httpServer.SetRemoteWriteClient(clients.NewLocker(cc, remotewrite.NewRemoteWriteClient)) p.ruleStreamer.SetRemoteWriteClient(remotewrite.NewRemoteWriteClient(cc)) p.node.SetRemoteWriter(clients.NewLocker(cc, remotewrite.NewRemoteWriteClient)) @@ -44,11 +40,3 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { ) } - -func (p *Plugin) configureLoggers(identityClient controlv1.IdentityClient) { - id, err := identityClient.Whoami(p.ctx, &emptypb.Empty{}) - if err != nil { - p.logger.Error("error fetching node id", logger.Err(err)) - } - logger.InitPluginWriter(id.GetId()) -} diff --git a/plugins/topology/pkg/topology/agent/stream.go b/plugins/topology/pkg/topology/agent/stream.go index d2731fb843..d6ae865a5f 100644 --- a/plugins/topology/pkg/topology/agent/stream.go +++ b/plugins/topology/pkg/topology/agent/stream.go @@ -8,10 +8,8 @@ import ( // "github.com/rancher/opni/pkg/clients" controlv1 "github.com/rancher/opni/pkg/apis/control/v1" - "github.com/rancher/opni/pkg/logger" streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/emptypb" ) func (p *Plugin) StreamServers() []streamext.Server { @@ -25,16 +23,7 @@ func (p *Plugin) StreamServers() []streamext.Server { func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { p.topologyStreamer.SetIdentityClient(controlv1.NewIdentityClient(cc)) - p.configureLoggers() p.topologyStreamer.SetTopologyStreamClient(stream.NewRemoteTopologyClient(cc)) p.node.SetClient(node.NewNodeTopologyCapabilityClient(cc)) } - -func (p *Plugin) configureLoggers() { - id, err := p.topologyStreamer.identityClient.Whoami(p.ctx, &emptypb.Empty{}) - if err != nil { - p.logger.Error("error fetching node id", logger.Err(err)) - } - logger.InitPluginWriter(id.GetId()) -}