Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compress kept trace decision message #1430

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 76 additions & 24 deletions collect/trace_decision.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package collect

import (
"encoding/json"
"bytes"
"encoding/gob"
"fmt"
"strings"
"sync"

"github.com/golang/snappy"
"github.com/honeycombio/refinery/collect/cache"
)

Expand Down Expand Up @@ -46,19 +49,6 @@ func newDroppedDecisionMessage(tds []TraceDecision) (string, error) {

return strings.Join(traceIDs, ","), nil
}
func newKeptDecisionMessage(tds []TraceDecision) (string, error) {
if len(tds) == 0 {
return "", fmt.Errorf("no kept trace decisions provided")
}

data, err := json.Marshal(tds)
if err != nil {
return "", err
}

return string(data), nil
}

func newDroppedTraceDecision(msg string) ([]TraceDecision, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not also compress the dropped messages? Even though they're unlikely to do much better than 50% compression (they're essentially hex-coded noise most of the time), that's not nothing given how many we send.

if msg == "" {
return nil, fmt.Errorf("empty drop message")
Expand All @@ -73,14 +63,23 @@ func newDroppedTraceDecision(msg string) ([]TraceDecision, error) {
return decisions, nil
}

func newKeptDecisionMessage(tds []TraceDecision) (string, error) {
if len(tds) == 0 {
return "", fmt.Errorf("no kept trace decisions provided")
}
compressed, err := compress(tds)
if err != nil {
return "", err
}
return string(compressed), nil
}

func newKeptTraceDecision(msg string) ([]TraceDecision, error) {
keptDecisions := make([]TraceDecision, 0)
err := json.Unmarshal([]byte(msg), &keptDecisions)
compressed, err := decompress([]byte(msg))
if err != nil {
return nil, err
}

return keptDecisions, nil
return compressed, nil
}

var _ cache.KeptTrace = &TraceDecision{}
Expand All @@ -92,16 +91,16 @@ type TraceDecision struct {
// keptDecision
Kept bool
Rate uint
SamplerKey string `json:",omitempty"`
SamplerSelector string `json:",omitempty"`
SamplerKey string
SamplerSelector string
SendReason string
HasRoot bool
Reason string
Count uint32 `json:",omitempty"` // number of spans in the trace
EventCount uint32 `json:",omitempty"` // number of span events in the trace
LinkCount uint32 `json:",omitempty"` // number of span links in the trace
Count uint32
EventCount uint32
LinkCount uint32

keptReasonIdx uint `json:",omitempty"`
keptReasonIdx uint
}

func (td *TraceDecision) DescendantCount() uint32 {
Expand Down Expand Up @@ -135,3 +134,56 @@ func (td *TraceDecision) KeptReason() uint {
func (td *TraceDecision) SetKeptReason(reasonIdx uint) {
td.keptReasonIdx = reasonIdx
}

var bufferPool = sync.Pool{
New: func() any { return new(bytes.Buffer) },
}

var snappyWriterPool = sync.Pool{
New: func() any { return snappy.NewBufferedWriter(nil) },
}

func compress(tds []TraceDecision) ([]byte, error) {
// Get a buffer from the pool and reset it
buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufferPool.Put(buf)

// Get a snappy writer from the pool, set it to write to the buffer, and reset it
compr := snappyWriterPool.Get().(*snappy.Writer)
compr.Reset(buf)
defer snappyWriterPool.Put(compr)

enc := gob.NewEncoder(compr)
if err := enc.Encode(tds); err != nil {
return nil, err
}

// Flush snappy writer
if err := compr.Close(); err != nil {
return nil, err
}

// Copy the buffer’s bytes to avoid reuse issues when returning
out := make([]byte, buf.Len())
copy(out, buf.Bytes())
return out, nil
Comment on lines +168 to +170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the most efficient way to do that:

Suggested change
out := make([]byte, buf.Len())
copy(out, buf.Bytes())
return out, nil
return bytes.Clone(buf.Bytes()), nil

}

func decompress(data []byte) ([]TraceDecision, error) {
// Get a buffer from the pool and set it up with data
buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset()
buf.Write(data)
defer bufferPool.Put(buf)

// Snappy reader to decompress data in buffer
compr := snappy.NewReader(buf)
dec := gob.NewDecoder(compr)

var tds []TraceDecision
if err := dec.Decode(&tds); err != nil {
return nil, err
}
return tds, nil
}
Loading