Skip to content

Commit

Permalink
feat: compress kept trace decision message
Browse files Browse the repository at this point in the history
  • Loading branch information
VinozzZ committed Nov 14, 2024
1 parent 8570fe0 commit a308c69
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 93 deletions.
100 changes: 76 additions & 24 deletions collect/trace_decision.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package collect

import (
"encoding/json"
"bytes"
"encoding/gob"
"fmt"
"strings"
"sync"

"github.com/golang/snappy"
"github.com/honeycombio/refinery/collect/cache"
)

Expand Down Expand Up @@ -46,19 +49,6 @@ func newDroppedDecisionMessage(tds []TraceDecision) (string, error) {

return strings.Join(traceIDs, ","), nil
}
func newKeptDecisionMessage(tds []TraceDecision) (string, error) {
if len(tds) == 0 {
return "", fmt.Errorf("no kept trace decisions provided")
}

data, err := json.Marshal(tds)
if err != nil {
return "", err
}

return string(data), nil
}

func newDroppedTraceDecision(msg string) ([]TraceDecision, error) {
if msg == "" {
return nil, fmt.Errorf("empty drop message")
Expand All @@ -73,14 +63,23 @@ func newDroppedTraceDecision(msg string) ([]TraceDecision, error) {
return decisions, nil
}

func newKeptDecisionMessage(tds []TraceDecision) (string, error) {
if len(tds) == 0 {
return "", fmt.Errorf("no kept trace decisions provided")
}
compressed, err := compress(tds)
if err != nil {
return "", err
}
return string(compressed), nil
}

func newKeptTraceDecision(msg string) ([]TraceDecision, error) {
keptDecisions := make([]TraceDecision, 0)
err := json.Unmarshal([]byte(msg), &keptDecisions)
compressed, err := decompress([]byte(msg))
if err != nil {
return nil, err
}

return keptDecisions, nil
return compressed, nil
}

var _ cache.KeptTrace = &TraceDecision{}
Expand All @@ -92,16 +91,16 @@ type TraceDecision struct {
// keptDecision
Kept bool
Rate uint
SamplerKey string `json:",omitempty"`
SamplerSelector string `json:",omitempty"`
SamplerKey string
SamplerSelector string
SendReason string
HasRoot bool
Reason string
Count uint32 `json:",omitempty"` // number of spans in the trace
EventCount uint32 `json:",omitempty"` // number of span events in the trace
LinkCount uint32 `json:",omitempty"` // number of span links in the trace
Count uint32
EventCount uint32
LinkCount uint32

keptReasonIdx uint `json:",omitempty"`
keptReasonIdx uint
}

func (td *TraceDecision) DescendantCount() uint32 {
Expand Down Expand Up @@ -135,3 +134,56 @@ func (td *TraceDecision) KeptReason() uint {
func (td *TraceDecision) SetKeptReason(reasonIdx uint) {
td.keptReasonIdx = reasonIdx
}

var bufferPool = sync.Pool{
New: func() any { return new(bytes.Buffer) },
}

var snappyWriterPool = sync.Pool{
New: func() any { return snappy.NewBufferedWriter(nil) },
}

func compress(tds []TraceDecision) ([]byte, error) {
// Get a buffer from the pool and reset it
buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufferPool.Put(buf)

// Get a snappy writer from the pool, set it to write to the buffer, and reset it
compr := snappyWriterPool.Get().(*snappy.Writer)
compr.Reset(buf)
defer snappyWriterPool.Put(compr)

enc := gob.NewEncoder(compr)
if err := enc.Encode(tds); err != nil {
return nil, err
}

// Flush snappy writer
if err := compr.Close(); err != nil {
return nil, err
}

// Copy the buffer’s bytes to avoid reuse issues when returning
out := make([]byte, buf.Len())
copy(out, buf.Bytes())
return out, nil
}

func decompress(data []byte) ([]TraceDecision, error) {
// Get a buffer from the pool and set it up with data
buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset()
buf.Write(data)
defer bufferPool.Put(buf)

// Snappy reader to decompress data in buffer
compr := snappy.NewReader(buf)
dec := gob.NewDecoder(compr)

var tds []TraceDecision
if err := dec.Decode(&tds); err != nil {
return nil, err
}
return tds, nil
}
118 changes: 49 additions & 69 deletions collect/trace_decision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,42 +34,6 @@ func TestNewDroppedTraceDecision(t *testing.T) {
}
}

func TestNewKeptTraceDecision(t *testing.T) {
tests := []struct {
name string
msg string
want []TraceDecision
wantErr bool
}{
{
name: "kept decision",
msg: `[{"TraceID":"1", "Kept": true, "Rate": 100, "SendReason":"` + TraceSendGotRoot + `"}]`,
want: []TraceDecision{
{TraceID: "1", Kept: true, Rate: 100, SendReason: TraceSendGotRoot}},
wantErr: false,
},
{
name: "invalid message format",
msg: "invalid",
want: nil,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := newKeptTraceDecision(tt.msg)

if tt.wantErr {
require.Error(t, err)
return
}

require.NoError(t, err)
assert.EqualValues(t, tt.want, got)
})
}
}
func TestNewDroppedDecisionMessage(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -108,43 +72,59 @@ func TestNewDroppedDecisionMessage(t *testing.T) {
})
}
}

func TestNewKeptDecisionMessage(t *testing.T) {
tests := []struct {
name string
td []TraceDecision
want string
wantErr bool
}{
func TestKeptDecisionRoundTrip(t *testing.T) {
// Test data for kept decisions covering all fields
tds := []TraceDecision{
{
name: "kept decision",
td: []TraceDecision{
{
TraceID: "1",
Kept: true,
Rate: 100,
SendReason: TraceSendGotRoot,
Reason: "deterministic",
},
},
want: `[{"TraceID":"1","Kept":true,"Rate":100,"SendReason":"trace_send_got_root","HasRoot":false,"Reason":"deterministic"}]`,
wantErr: false,
TraceID: "trace1",
Kept: true,
Rate: 1,
SamplerKey: "sampler1",
SamplerSelector: "selector1",
SendReason: "reason1",
HasRoot: true,
Reason: "valid reason 1",
Count: 5,
EventCount: 10,
LinkCount: 15,
},
{
name: "invalid",
td: nil,
want: "",
wantErr: true,
TraceID: "trace2",
Kept: true,
Rate: 2,
SamplerKey: "sampler2",
SamplerSelector: "selector2",
SendReason: "reason2",
HasRoot: false,
Reason: "valid reason 2",
Count: 3,
EventCount: 6,
LinkCount: 9,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := newKeptDecisionMessage(tt.td)
if tt.wantErr {
require.Error(t, err)
return
}
assert.Equal(t, tt.want, got)
})

// Step 1: Create a kept decision message
msg, err := newKeptDecisionMessage(tds)
assert.NoError(t, err, "expected no error for valid kept decision message")
assert.NotEmpty(t, msg, "expected non-empty message")

// Step 3: Decompress the message back to TraceDecision using newKeptTraceDecision
decompressedTds, err := newKeptTraceDecision(msg)
assert.NoError(t, err, "expected no error during decompression of the kept decision message")
assert.Len(t, decompressedTds, len(tds), "expected decompressed TraceDecision length to match original")

// Step 4: Verify that the decompressed data matches the original TraceDecision data
for i, td := range decompressedTds {
assert.Equal(t, td.TraceID, tds[i].TraceID, "expected TraceID to match")
assert.Equal(t, td.Kept, tds[i].Kept, "expected Kept status to match")
assert.Equal(t, td.Rate, tds[i].Rate, "expected Rate to match")
assert.Equal(t, td.SamplerKey, tds[i].SamplerKey, "expected SamplerKey to match")
assert.Equal(t, td.SamplerSelector, tds[i].SamplerSelector, "expected SamplerSelector to match")
assert.Equal(t, td.SendReason, tds[i].SendReason, "expected SendReason to match")
assert.Equal(t, td.HasRoot, tds[i].HasRoot, "expected HasRoot to match")
assert.Equal(t, td.Reason, tds[i].Reason, "expected Reason to match")
assert.Equal(t, td.Count, tds[i].Count, "expected Count to match")
assert.Equal(t, td.EventCount, tds[i].EventCount, "expected EventCount to match")
assert.Equal(t, td.LinkCount, tds[i].LinkCount, "expected LinkCount to match")
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/facebookgo/structtag v0.0.0-20150214074306-217e25fb9691 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s=
github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand Down

0 comments on commit a308c69

Please sign in to comment.