Skip to content

Commit

Permalink
feat: log support
Browse files Browse the repository at this point in the history
  • Loading branch information
ymtdzzz committed Apr 7, 2024
1 parent fe25b09 commit a46caa7
Show file tree
Hide file tree
Showing 15 changed files with 1,056 additions and 66 deletions.
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ service:
receivers: [otlp]
processors: []
exporters: [tui]
logs:
receivers: [otlp]
processors: []
exporters: [tui]
`

provider := yamlprovider.New()
Expand Down
7 changes: 7 additions & 0 deletions tuiexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ymtdzzz/otel-tui/tuiexporter/internal/telemetry"
"github.com/ymtdzzz/otel-tui/tuiexporter/internal/tui"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand All @@ -26,6 +27,12 @@ func (e *tuiExporter) pushTraces(_ context.Context, traces ptrace.Traces) error
return nil
}

func (e *tuiExporter) pushLogs(_ context.Context, logs plog.Logs) error {
e.app.Store().AddLog(&logs)

return nil
}

// Start runs the TUI exporter
func (e *tuiExporter) Start(_ context.Context, _ component.Host) error {
go func() {
Expand Down
23 changes: 22 additions & 1 deletion tuiexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewFactory() exporter.Factory {
createDefaultConfig,
exporter.WithTraces(createTraces, stability),
//exporter.WithMetrics(createMetrics, stability),
//exporter.WithLogs(createLog, stability),
exporter.WithLogs(createLogs, stability),
)
}

Expand Down Expand Up @@ -49,6 +49,27 @@ func createTraces(ctx context.Context, set exporter.CreateSettings, cfg componen
)
}

func createLogs(ctx context.Context, set exporter.CreateSettings, cfg component.Config) (exporter.Logs, error) {
oCfg := cfg.(*Config)

e, err := exporters.LoadOrStore(
oCfg,
func() (*tuiExporter, error) {
return newTuiExporter(oCfg), nil
},
&set.TelemetrySettings,
)
if err != nil {
return nil, err
}

return exporterhelper.NewLogsExporter(ctx, set, oCfg,
e.Unwrap().pushLogs,
exporterhelper.WithStart(e.Start),
exporterhelper.WithShutdown(e.Shutdown),
)
}

// This is the map of already created OTLP receivers for particular configurations.
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not
Expand Down
51 changes: 51 additions & 0 deletions tuiexporter/internal/telemetry/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ type TraceSpanDataMap map[string][]*SpanData
// This is used to quickly look up all spans in a trace for a service
type TraceServiceSpanDataMap map[string]map[string][]*SpanData

// TraceLogDataMap is a map of trace id to a slice of logs
// This is used to quickly look up all logs in a trace
type TraceLogDataMap map[string][]*LogData

// TraceCache is a cache of trace spans
type TraceCache struct {
spanid2span SpanDataMap
Expand Down Expand Up @@ -98,3 +102,50 @@ func (c *TraceCache) flush() {
c.traceid2spans = TraceSpanDataMap{}
c.tracesvc2spans = TraceServiceSpanDataMap{}
}

// LogCache is a cache of logs
type LogCache struct {
traceid2logs TraceLogDataMap
}

// NewLogCache returns a new log cache
func NewLogCache() *LogCache {
return &LogCache{
traceid2logs: TraceLogDataMap{},
}
}

// UpdateCache updates the cache with a new log
func (c *LogCache) UpdateCache(data *LogData) {
traceID := data.Log.TraceID().String()
if ts, ok := c.traceid2logs[traceID]; ok {
c.traceid2logs[traceID] = append(ts, data)
} else {
c.traceid2logs[traceID] = []*LogData{data}
}
}

// DeleteCache deletes a list of logs from the cache
func (c *LogCache) DeleteCache(logs []*LogData) {
for _, l := range logs {
traceID := l.Log.TraceID().String()
if _, ok := c.traceid2logs[traceID]; ok {
for i, log := range c.traceid2logs[traceID] {
if log == l {
c.traceid2logs[traceID] = append(c.traceid2logs[traceID][:i], c.traceid2logs[traceID][i+1:]...)
break
}
}
}
}
}

// GetLogsByTraceID returns all logs for a given trace id
func (c *LogCache) GetLogsByTraceID(traceID string) ([]*LogData, bool) {
logs, ok := c.traceid2logs[traceID]
return logs, ok
}

func (c *LogCache) flush() {
c.traceid2logs = TraceLogDataMap{}
}
38 changes: 36 additions & 2 deletions tuiexporter/internal/telemetry/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func TestGetSpansByTraceID(t *testing.T) {
c := NewTraceCache()
spans := []*SpanData{{}}
spans := []*SpanData{}
c.traceid2spans["traceid"] = spans

tests := []struct {
Expand Down Expand Up @@ -42,7 +42,7 @@ func TestGetSpansByTraceID(t *testing.T) {

func TestGetSpansByTraceIDAndSvc(t *testing.T) {
c := NewTraceCache()
spans := []*SpanData{{}}
spans := []*SpanData{}
c.tracesvc2spans["traceid"] = map[string][]*SpanData{"svc-name": spans}

tests := []struct {
Expand Down Expand Up @@ -117,3 +117,37 @@ func TestGetSpanByID(t *testing.T) {
})
}
}

func TestGetLogsByTraceID(t *testing.T) {
c := NewLogCache()
logs := []*LogData{}
c.traceid2logs["traceid"] = logs

tests := []struct {
name string
traceID string
wantdata []*LogData
wantok bool
}{
{
name: "traceid exists",
traceID: "traceid",
wantdata: logs,
wantok: true,
},
{
name: "traceid does not exist",
traceID: "traceid2",
wantdata: nil,
wantok: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotdata, gotok := c.GetLogsByTraceID(tt.traceID)
assert.Equal(t, tt.wantdata, gotdata)
assert.Equal(t, tt.wantok, gotok)
})
}
}
120 changes: 110 additions & 10 deletions tuiexporter/internal/telemetry/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"sync"
"time"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
)

const MAX_SERVICE_SPAN_COUNT = 1000
const (
MAX_SERVICE_SPAN_COUNT = 1000
MAX_LOG_COUNT = 1000
)

// SpanData is a struct to represent a span
type SpanData struct {
Span *ptrace.Span
ResourceSpan *ptrace.ResourceSpans
Expand All @@ -26,31 +31,48 @@ func (sd *SpanData) IsRoot() bool {
// This is a slice of one span of a single service
type SvcSpans []*SpanData

// LogData is a struct to represent a log
type LogData struct {
Log *plog.LogRecord
ResourceLog *plog.ResourceLogs
ScopeLog *plog.ScopeLogs
ReceivedAt time.Time
}

// Store is a store of trace spans
type Store struct {
mut sync.Mutex
filterSvc string
filterLog string
svcspans SvcSpans
svcspansFiltered SvcSpans
cache *TraceCache
tracecache *TraceCache
logs []*LogData
logsFiltered []*LogData
logcache *LogCache
updatedAt time.Time
maxServiceSpanCount int
maxLogCount int
}

// NewStore creates a new store
func NewStore() *Store {
return &Store{
mut: sync.Mutex{},
svcspans: []*SpanData{},
svcspansFiltered: []*SpanData{},
cache: NewTraceCache(),
svcspans: SvcSpans{},
svcspansFiltered: SvcSpans{},
tracecache: NewTraceCache(),
logs: []*LogData{},
logsFiltered: []*LogData{},
logcache: NewLogCache(),
maxServiceSpanCount: MAX_SERVICE_SPAN_COUNT, // TODO: make this configurable
maxLogCount: MAX_LOG_COUNT, // TODO: make this configurable
}
}

// GetCache returns the cache
func (s *Store) GetCache() *TraceCache {
return s.cache
return s.tracecache
}

// GetSvcSpans returns the service spans in the store
Expand All @@ -63,6 +85,11 @@ func (s *Store) GetFilteredSvcSpans() *SvcSpans {
return &s.svcspansFiltered
}

// GetFilteredLogs returns the filtered logs in the store
func (s *Store) GetFilteredLogs() *[]*LogData {
return &s.logsFiltered
}

// UpdatedAt returns the last updated time
func (s *Store) UpdatedAt() time.Time {
return s.updatedAt
Expand Down Expand Up @@ -90,6 +117,29 @@ func (s *Store) updateFilterService() {
s.ApplyFilterService(s.filterSvc)
}

// ApplyFilterLogs applies a filter to the logs
func (s *Store) ApplyFilterLogs(filter string) {
s.filterLog = filter
s.logsFiltered = []*LogData{}

if filter == "" {
s.logsFiltered = s.logs
return
}

for _, log := range s.logs {
sname, _ := log.ResourceLog.Resource().Attributes().Get("service.name")
target := sname.AsString() + " " + log.Log.Body().AsString()
if strings.Contains(target, filter) {
s.logsFiltered = append(s.logsFiltered, log)
}
}
}

func (s *Store) updateFilterLogs() {
s.ApplyFilterLogs(s.filterLog)
}

// GetTraceIDByFilteredIdx returns the trace at the given index
func (s *Store) GetTraceIDByFilteredIdx(idx int) string {
if idx >= 0 && idx < len(s.svcspansFiltered) {
Expand All @@ -106,11 +156,19 @@ func (s *Store) GetFilteredServiceSpansByIdx(idx int) []*SpanData {
span := s.svcspansFiltered[idx]
traceID := span.Span.TraceID().String()
sname, _ := span.ResourceSpan.Resource().Attributes().Get("service.name")
spans, _ := s.cache.GetSpansByTraceIDAndSvc(traceID, sname.AsString())
spans, _ := s.tracecache.GetSpansByTraceIDAndSvc(traceID, sname.AsString())

return spans
}

// GetFilteredLogByIdx returns the log at the given index
func (s *Store) GetFilteredLogByIdx(idx int) *LogData {
if idx < 0 || idx >= len(s.logsFiltered) {
return nil
}
return s.logsFiltered[idx]
}

// AddSpan adds a span to the store
func (s *Store) AddSpan(traces *ptrace.Traces) {
s.mut.Lock()
Expand All @@ -136,7 +194,7 @@ func (s *Store) AddSpan(traces *ptrace.Traces) {
ScopeSpans: &ss,
ReceivedAt: time.Now(),
}
newtracesvc := s.cache.UpdateCache(sname.AsString(), sd)
newtracesvc := s.tracecache.UpdateCache(sname.AsString(), sd)
if newtracesvc {
s.svcspans = append(s.svcspans, sd)
}
Expand All @@ -148,14 +206,53 @@ func (s *Store) AddSpan(traces *ptrace.Traces) {
if len(s.svcspans) > s.maxServiceSpanCount {
deleteSpans := s.svcspans[:len(s.svcspans)-s.maxServiceSpanCount]

s.cache.DeleteCache(deleteSpans)
s.tracecache.DeleteCache(deleteSpans)

s.svcspans = s.svcspans[len(s.svcspans)-s.maxServiceSpanCount:]
}

s.updateFilterService()
}

// AddLog adds a log to the store
func (s *Store) AddLog(logs *plog.Logs) {
s.mut.Lock()
defer func() {
s.updatedAt = time.Now()
s.mut.Unlock()
}()

for rli := 0; rli < logs.ResourceLogs().Len(); rli++ {
rl := logs.ResourceLogs().At(rli)

for sli := 0; sli < rl.ScopeLogs().Len(); sli++ {
sl := rl.ScopeLogs().At(sli)

for li := 0; li < sl.LogRecords().Len(); li++ {
lr := sl.LogRecords().At(li)
ld := &LogData{
Log: &lr,
ResourceLog: &rl,
ScopeLog: &sl,
ReceivedAt: time.Now(),
}
s.logs = append(s.logs, ld)
s.logcache.UpdateCache(ld)
}
}
}

// data rotation
if len(s.logs) > s.maxLogCount {
deleteLogs := s.logs[:len(s.logs)-s.maxLogCount]
s.logs = s.logs[len(s.logs)-s.maxLogCount:]

s.logcache.DeleteCache(deleteLogs)
}

s.updateFilterLogs()
}

// Flush clears the store including the cache
func (s *Store) Flush() {
s.mut.Lock()
Expand All @@ -166,6 +263,9 @@ func (s *Store) Flush() {

s.svcspans = SvcSpans{}
s.svcspansFiltered = SvcSpans{}
s.cache.flush()
s.tracecache.flush()
s.logs = []*LogData{}
s.logsFiltered = []*LogData{}
s.logcache.flush()
s.updatedAt = time.Now()
}
Loading

0 comments on commit a46caa7

Please sign in to comment.