From a308c69db38fa195b01dcb23b03e969404c53c39 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:57:58 -0500 Subject: [PATCH] feat: compress kept trace decision message --- collect/trace_decision.go | 100 +++++++++++++++++++++------- collect/trace_decision_test.go | 118 ++++++++++++++------------------- go.mod | 1 + go.sum | 2 + 4 files changed, 128 insertions(+), 93 deletions(-) diff --git a/collect/trace_decision.go b/collect/trace_decision.go index 8465d3e11e..e1d0ea48b8 100644 --- a/collect/trace_decision.go +++ b/collect/trace_decision.go @@ -1,10 +1,13 @@ package collect import ( - "encoding/json" + "bytes" + "encoding/gob" "fmt" "strings" + "sync" + "github.com/golang/snappy" "github.com/honeycombio/refinery/collect/cache" ) @@ -46,19 +49,6 @@ func newDroppedDecisionMessage(tds []TraceDecision) (string, error) { return strings.Join(traceIDs, ","), nil } -func newKeptDecisionMessage(tds []TraceDecision) (string, error) { - if len(tds) == 0 { - return "", fmt.Errorf("no kept trace decisions provided") - } - - data, err := json.Marshal(tds) - if err != nil { - return "", err - } - - return string(data), nil -} - func newDroppedTraceDecision(msg string) ([]TraceDecision, error) { if msg == "" { return nil, fmt.Errorf("empty drop message") @@ -73,14 +63,23 @@ func newDroppedTraceDecision(msg string) ([]TraceDecision, error) { return decisions, nil } +func newKeptDecisionMessage(tds []TraceDecision) (string, error) { + if len(tds) == 0 { + return "", fmt.Errorf("no kept trace decisions provided") + } + compressed, err := compress(tds) + if err != nil { + return "", err + } + return string(compressed), nil +} + func newKeptTraceDecision(msg string) ([]TraceDecision, error) { - keptDecisions := make([]TraceDecision, 0) - err := json.Unmarshal([]byte(msg), &keptDecisions) + compressed, err := decompress([]byte(msg)) if err != nil { return nil, err } - - return keptDecisions, nil + return compressed, nil } var _ cache.KeptTrace = &TraceDecision{} @@ -92,16 +91,16 @@ type TraceDecision struct { // keptDecision Kept bool Rate uint - SamplerKey string `json:",omitempty"` - SamplerSelector string `json:",omitempty"` + SamplerKey string + SamplerSelector string SendReason string HasRoot bool Reason string - Count uint32 `json:",omitempty"` // number of spans in the trace - EventCount uint32 `json:",omitempty"` // number of span events in the trace - LinkCount uint32 `json:",omitempty"` // number of span links in the trace + Count uint32 + EventCount uint32 + LinkCount uint32 - keptReasonIdx uint `json:",omitempty"` + keptReasonIdx uint } func (td *TraceDecision) DescendantCount() uint32 { @@ -135,3 +134,56 @@ func (td *TraceDecision) KeptReason() uint { func (td *TraceDecision) SetKeptReason(reasonIdx uint) { td.keptReasonIdx = reasonIdx } + +var bufferPool = sync.Pool{ + New: func() any { return new(bytes.Buffer) }, +} + +var snappyWriterPool = sync.Pool{ + New: func() any { return snappy.NewBufferedWriter(nil) }, +} + +func compress(tds []TraceDecision) ([]byte, error) { + // Get a buffer from the pool and reset it + buf := bufferPool.Get().(*bytes.Buffer) + buf.Reset() + defer bufferPool.Put(buf) + + // Get a snappy writer from the pool, set it to write to the buffer, and reset it + compr := snappyWriterPool.Get().(*snappy.Writer) + compr.Reset(buf) + defer snappyWriterPool.Put(compr) + + enc := gob.NewEncoder(compr) + if err := enc.Encode(tds); err != nil { + return nil, err + } + + // Flush snappy writer + if err := compr.Close(); err != nil { + return nil, err + } + + // Copy the buffer’s bytes to avoid reuse issues when returning + out := make([]byte, buf.Len()) + copy(out, buf.Bytes()) + return out, nil +} + +func decompress(data []byte) ([]TraceDecision, error) { + // Get a buffer from the pool and set it up with data + buf := bufferPool.Get().(*bytes.Buffer) + buf.Reset() + buf.Write(data) + defer bufferPool.Put(buf) + + // Snappy reader to decompress data in buffer + compr := snappy.NewReader(buf) + dec := gob.NewDecoder(compr) + + var tds []TraceDecision + if err := dec.Decode(&tds); err != nil { + return nil, err + } + return tds, nil +} diff --git a/collect/trace_decision_test.go b/collect/trace_decision_test.go index 107eb03258..ad08d8a98c 100644 --- a/collect/trace_decision_test.go +++ b/collect/trace_decision_test.go @@ -34,42 +34,6 @@ func TestNewDroppedTraceDecision(t *testing.T) { } } -func TestNewKeptTraceDecision(t *testing.T) { - tests := []struct { - name string - msg string - want []TraceDecision - wantErr bool - }{ - { - name: "kept decision", - msg: `[{"TraceID":"1", "Kept": true, "Rate": 100, "SendReason":"` + TraceSendGotRoot + `"}]`, - want: []TraceDecision{ - {TraceID: "1", Kept: true, Rate: 100, SendReason: TraceSendGotRoot}}, - wantErr: false, - }, - { - name: "invalid message format", - msg: "invalid", - want: nil, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := newKeptTraceDecision(tt.msg) - - if tt.wantErr { - require.Error(t, err) - return - } - - require.NoError(t, err) - assert.EqualValues(t, tt.want, got) - }) - } -} func TestNewDroppedDecisionMessage(t *testing.T) { tests := []struct { name string @@ -108,43 +72,59 @@ func TestNewDroppedDecisionMessage(t *testing.T) { }) } } - -func TestNewKeptDecisionMessage(t *testing.T) { - tests := []struct { - name string - td []TraceDecision - want string - wantErr bool - }{ +func TestKeptDecisionRoundTrip(t *testing.T) { + // Test data for kept decisions covering all fields + tds := []TraceDecision{ { - name: "kept decision", - td: []TraceDecision{ - { - TraceID: "1", - Kept: true, - Rate: 100, - SendReason: TraceSendGotRoot, - Reason: "deterministic", - }, - }, - want: `[{"TraceID":"1","Kept":true,"Rate":100,"SendReason":"trace_send_got_root","HasRoot":false,"Reason":"deterministic"}]`, - wantErr: false, + TraceID: "trace1", + Kept: true, + Rate: 1, + SamplerKey: "sampler1", + SamplerSelector: "selector1", + SendReason: "reason1", + HasRoot: true, + Reason: "valid reason 1", + Count: 5, + EventCount: 10, + LinkCount: 15, }, { - name: "invalid", - td: nil, - want: "", - wantErr: true, + TraceID: "trace2", + Kept: true, + Rate: 2, + SamplerKey: "sampler2", + SamplerSelector: "selector2", + SendReason: "reason2", + HasRoot: false, + Reason: "valid reason 2", + Count: 3, + EventCount: 6, + LinkCount: 9, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := newKeptDecisionMessage(tt.td) - if tt.wantErr { - require.Error(t, err) - return - } - assert.Equal(t, tt.want, got) - }) + + // Step 1: Create a kept decision message + msg, err := newKeptDecisionMessage(tds) + assert.NoError(t, err, "expected no error for valid kept decision message") + assert.NotEmpty(t, msg, "expected non-empty message") + + // Step 3: Decompress the message back to TraceDecision using newKeptTraceDecision + decompressedTds, err := newKeptTraceDecision(msg) + assert.NoError(t, err, "expected no error during decompression of the kept decision message") + assert.Len(t, decompressedTds, len(tds), "expected decompressed TraceDecision length to match original") + + // Step 4: Verify that the decompressed data matches the original TraceDecision data + for i, td := range decompressedTds { + assert.Equal(t, td.TraceID, tds[i].TraceID, "expected TraceID to match") + assert.Equal(t, td.Kept, tds[i].Kept, "expected Kept status to match") + assert.Equal(t, td.Rate, tds[i].Rate, "expected Rate to match") + assert.Equal(t, td.SamplerKey, tds[i].SamplerKey, "expected SamplerKey to match") + assert.Equal(t, td.SamplerSelector, tds[i].SamplerSelector, "expected SamplerSelector to match") + assert.Equal(t, td.SendReason, tds[i].SendReason, "expected SendReason to match") + assert.Equal(t, td.HasRoot, tds[i].HasRoot, "expected HasRoot to match") + assert.Equal(t, td.Reason, tds[i].Reason, "expected Reason to match") + assert.Equal(t, td.Count, tds[i].Count, "expected Count to match") + assert.Equal(t, td.EventCount, tds[i].EventCount, "expected EventCount to match") + assert.Equal(t, td.LinkCount, tds[i].LinkCount, "expected LinkCount to match") } } diff --git a/go.mod b/go.mod index 3c24561b74..599ee9c80a 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/facebookgo/structtag v0.0.0-20150214074306-217e25fb9691 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/snappy v0.0.4 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index 3788454ad7..e85970d724 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s= github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=