Skip to content

Use bytetstring instead of string for performance and switch proto libraries. #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion implementations/prometheus/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (q *queue) deserializeAndSend(ctx context.Context, meta map[string]string,
sg := &types.SeriesGroup{
Series: make([]*types.TimeSeriesBinary, seriesCount),
Metadata: make([]*types.TimeSeriesBinary, metaCount),
Strings: make([]string, stringsCount),
Strings: make([]types.ByteString, stringsCount),
}
// Prefill our series with items from the pool to limit allocs.
for i := 0; i < seriesCount; i++ {
Expand Down
31 changes: 9 additions & 22 deletions network/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/walqueue/types"
"github.com/prometheus/prometheus/prompb"
Expand All @@ -41,7 +41,6 @@ type loop struct {
series []*types.TimeSeriesBinary
self actor.Actor
ticker *time.Ticker
req *prompb.WriteRequest
buf *proto.Buffer
sendBuffer []byte
}
Expand Down Expand Up @@ -88,10 +87,6 @@ func newLoop(cc types.ConnectionConfig, isMetaData bool, l log.Logger, stats fun
ticker: time.NewTicker(1 * time.Second),
buf: proto.NewBuffer(nil),
sendBuffer: make([]byte, 0),
req: &prompb.WriteRequest{
// We know BatchCount is the most we will ever send.
Timeseries: make([]prompb.TimeSeries, 0, cc.BatchCount),
},
}, nil
}

Expand Down Expand Up @@ -202,9 +197,9 @@ func (l *loop) send(ctx context.Context, retryCount int) sendResult {
var data []byte
var wrErr error
if l.isMeta {
data, wrErr = createWriteRequestMetadata(l.log, l.req, l.series, l.buf)
data, wrErr = createWriteRequestMetadata(l.log, l.series, l.buf)
} else {
data, wrErr = createWriteRequest(l.req, l.series, l.externalLabels, l.buf)
data, wrErr = createWriteRequest(l.series, l.externalLabels, l.buf)
}
if wrErr != nil {
result.err = wrErr
Expand Down Expand Up @@ -269,11 +264,8 @@ func (l *loop) send(ctx context.Context, retryCount int) sendResult {
return result
}

func createWriteRequest(wr *prompb.WriteRequest, series []*types.TimeSeriesBinary, externalLabels map[string]string, data *proto.Buffer) ([]byte, error) {
if cap(wr.Timeseries) < len(series) {
wr.Timeseries = make([]prompb.TimeSeries, len(series))
}
wr.Timeseries = wr.Timeseries[:len(series)]
func createWriteRequest(series []*types.TimeSeriesBinary, externalLabels map[string]string, data *proto.Buffer) ([]byte, error) {
wr := &prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, len(series))}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this to inside the func had no impact on CPU but saved memory.


for i, tsBuf := range series {
ts := wr.Timeseries[i]
Expand Down Expand Up @@ -330,26 +322,21 @@ func createWriteRequest(wr *prompb.WriteRequest, series []*types.TimeSeriesBinar
ts.Samples[0].Timestamp = tsBuf.TS
wr.Timeseries[i] = ts
}
defer func() {
for i := 0; i < len(wr.Timeseries); i++ {
wr.Timeseries[i].Histograms = wr.Timeseries[i].Histograms[:0]
wr.Timeseries[i].Labels = wr.Timeseries[i].Labels[:0]
wr.Timeseries[i].Exemplars = wr.Timeseries[i].Exemplars[:0]
}
}()
// Reset the buffer for reuse.
data.Reset()
err := data.Marshal(wr)
return data.Bytes(), err
}

func createWriteRequestMetadata(l log.Logger, wr *prompb.WriteRequest, series []*types.TimeSeriesBinary, data *proto.Buffer) ([]byte, error) {
func createWriteRequestMetadata(l log.Logger, series []*types.TimeSeriesBinary, data *proto.Buffer) ([]byte, error) {
wr := &prompb.WriteRequest{}

// Metadata is rarely sent so having this being less than optimal is fine.
wr.Metadata = make([]prompb.MetricMetadata, 0)
for _, ts := range series {
mt, valid := toMetadata(ts)
if !valid {
level.Error(l).Log("msg", "invalid metadata was found", "labels", ts.Labels.String())
level.Error(l).Log("msg", "invalid metadata was found")
continue
}
wr.Metadata = append(wr.Metadata, mt)
Expand Down
2 changes: 1 addition & 1 deletion serialization/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewAppender(ctx context.Context, ttl time.Duration, s types.Serializer, log

// Append metric
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {

ts := types.GetTimeSeriesFromPool()
ts.Labels = l
ts.TS = t
Expand Down
11 changes: 7 additions & 4 deletions serialization/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package serialization
import (
"context"
"fmt"
"github.com/go-kit/log/level"
"strconv"
"time"

"github.com/go-kit/log/level"

snappy "github.com/eapache/go-xerial-snappy"
"github.com/go-kit/log"
"github.com/grafana/walqueue/types"
Expand Down Expand Up @@ -161,7 +162,9 @@ func (s *serializer) flushToDisk(ctx actor.Context) error {
}()

// This maps strings to index position in a slice. This is doing to reduce the file size of the data.
strMapToIndex := make(map[string]uint32)
// Assume roughly each series has 10 labels, we do this because at very large mappings growing the map took up to 5% of cpu time.
// By pre allocating it that disappeared.
strMapToIndex := make(map[string]uint32, len(s.series)*10)
Comment on lines +165 to +167
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Kinda surprised it had so big impact. Maybe we can track the average size of this and autotune it at runtime?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also surprised. This trades memory for cpu, ie this map is bigger than we actually need in most cases since we store only the unique strings.

for i, ts := range s.series {
ts.FillLabelMapping(strMapToIndex)
group.Series[i] = ts
Expand All @@ -171,9 +174,9 @@ func (s *serializer) flushToDisk(ctx actor.Context) error {
group.Metadata[i] = ts
}

stringsSlice := make([]string, len(strMapToIndex))
stringsSlice := make([]types.ByteString, len(strMapToIndex))
for stringValue, index := range strMapToIndex {
stringsSlice[index] = stringValue
stringsSlice[index] = types.ByteString(stringValue)
}
group.Strings = stringsSlice

Expand Down
216 changes: 4 additions & 212 deletions types/serialization.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
//go:generate msgp
package types

import (
"sync"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"go.uber.org/atomic"
)
import "github.com/prometheus/prometheus/model/labels"

const MetaType = "__alloy_metadata_type__"
const MetaUnit = "__alloy_metadata_unit__"
Expand All @@ -19,7 +12,7 @@ const MetaHelp = "__alloy_metadata_help__"
// LabelNames and LabelsValues that point to the index in Strings.
// This deduplicates the strings and decreases the size on disk.
type SeriesGroup struct {
Strings []string
Strings []ByteString
Series []*TimeSeriesBinary
Metadata []*TimeSeriesBinary
}
Expand All @@ -39,6 +32,8 @@ type TimeSeriesBinary struct {
Histograms Histograms
}

type ByteString []byte

type Histograms struct {
Histogram *Histogram
FloatHistogram *FloatHistogram
Expand Down Expand Up @@ -92,206 +87,3 @@ type BucketSpan struct {
Offset int32
Length uint32
}

// IsMetadata is used because it's easier to store metadata as a set of labels.
func (ts TimeSeriesBinary) IsMetadata() bool {
return ts.Labels.Has("__alloy_metadata_type__")
}

func (h *Histogram) ToPromHistogram() prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountInt{CountInt: h.Count.IntValue},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount.IntValue},
NegativeSpans: ToPromBucketSpans(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: ToPromBucketSpans(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: prompb.Histogram_ResetHint(h.ResetHint),
Timestamp: h.TimestampMillisecond,
}
}

func (h *FloatHistogram) ToPromFloatHistogram() prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountFloat{CountFloat: h.Count.FloatValue},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: h.ZeroCount.FloatValue},
NegativeSpans: ToPromBucketSpans(h.NegativeSpans),
NegativeCounts: h.NegativeCounts,
PositiveSpans: ToPromBucketSpans(h.PositiveSpans),
PositiveCounts: h.PositiveCounts,
ResetHint: prompb.Histogram_ResetHint(h.ResetHint),
Timestamp: h.TimestampMillisecond,
}
}
func ToPromBucketSpans(bss []BucketSpan) []prompb.BucketSpan {
spans := make([]prompb.BucketSpan, len(bss))
for i, bs := range bss {
spans[i] = bs.ToPromBucketSpan()
}
return spans
}

func (bs *BucketSpan) ToPromBucketSpan() prompb.BucketSpan {
return prompb.BucketSpan{
Offset: bs.Offset,
Length: bs.Length,
}
}

func (ts *TimeSeriesBinary) FromHistogram(timestamp int64, h *histogram.Histogram) {
ts.Histograms.Histogram = &Histogram{
Count: HistogramCount{IsInt: true, IntValue: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: HistogramZeroCount{IsInt: true, IntValue: h.ZeroCount},
NegativeSpans: FromPromSpan(h.NegativeSpans),
NegativeBuckets: h.NegativeBuckets,
PositiveSpans: FromPromSpan(h.PositiveSpans),
PositiveBuckets: h.PositiveBuckets,
ResetHint: int32(h.CounterResetHint),
TimestampMillisecond: timestamp,
}
}
func (ts *TimeSeriesBinary) FromFloatHistogram(timestamp int64, h *histogram.FloatHistogram) {
ts.Histograms.FloatHistogram = &FloatHistogram{
Count: HistogramCount{IsInt: false, FloatValue: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: HistogramZeroCount{IsInt: false, FloatValue: h.ZeroCount},
NegativeSpans: FromPromSpan(h.NegativeSpans),
NegativeCounts: h.NegativeBuckets,
PositiveSpans: FromPromSpan(h.PositiveSpans),
PositiveCounts: h.PositiveBuckets,
ResetHint: int32(h.CounterResetHint),
TimestampMillisecond: timestamp,
}
}
func FromPromSpan(spans []histogram.Span) []BucketSpan {
bs := make([]BucketSpan, len(spans))
for i, s := range spans {
bs[i].Offset = s.Offset
bs[i].Length = s.Length
}
return bs
}

// FillLabelMapping is what does the conversion from labels.Labels to LabelNames and
// LabelValues while filling in the string map, that is later converted to []string.
func (ts *TimeSeriesBinary) FillLabelMapping(strMapToInt map[string]uint32) {
ts.LabelsNames = setSliceLength(ts.LabelsNames, len(ts.Labels))
ts.LabelsValues = setSliceLength(ts.LabelsValues, len(ts.Labels))

// This is where we deduplicate the ts.Labels into uint32 values
// that map to a string in the strings slice via the index.
for i, v := range ts.Labels {
val, found := strMapToInt[v.Name]
if !found {
val = uint32(len(strMapToInt))
strMapToInt[v.Name] = val
}
ts.LabelsNames[i] = val

val, found = strMapToInt[v.Value]
if !found {
val = uint32(len(strMapToInt))
strMapToInt[v.Value] = val
}
ts.LabelsValues[i] = val
}

}

func setSliceLength(lbls []uint32, length int) []uint32 {
if cap(lbls) <= length {
lbls = make([]uint32, length)
} else {
lbls = lbls[:length]
}
return lbls
}

var tsBinaryPool = sync.Pool{
New: func() any {
return &TimeSeriesBinary{}
},
}

func GetTimeSeriesFromPool() *TimeSeriesBinary {
OutStandingTimeSeriesBinary.Inc()
return tsBinaryPool.Get().(*TimeSeriesBinary)
}

var OutStandingTimeSeriesBinary = atomic.Int32{}

func PutTimeSeriesSliceIntoPool(tss []*TimeSeriesBinary) {
for i := 0; i < len(tss); i++ {
PutTimeSeriesIntoPool(tss[i])
}

}

func PutTimeSeriesIntoPool(ts *TimeSeriesBinary) {
OutStandingTimeSeriesBinary.Dec()
ts.LabelsNames = ts.LabelsNames[:0]
ts.LabelsValues = ts.LabelsValues[:0]
ts.Labels = nil
ts.TS = 0
ts.Value = 0
ts.Hash = 0
ts.Histograms.Histogram = nil
ts.Histograms.FloatHistogram = nil
tsBinaryPool.Put(ts)
}

// DeserializeToSeriesGroup transforms a buffer to a SeriesGroup and converts the stringmap + indexes into actual Labels.
func DeserializeToSeriesGroup(sg *SeriesGroup, buf []byte) (*SeriesGroup, []byte, error) {
buffer, err := sg.UnmarshalMsg(buf)
if err != nil {
return sg, nil, err
}
// Need to fill in the labels.
for _, series := range sg.Series {
if cap(series.Labels) < len(series.LabelsNames) {
series.Labels = make(labels.Labels, len(series.LabelsNames))
} else {
series.Labels = series.Labels[:len(series.LabelsNames)]
}
// Since the LabelNames/LabelValues are indexes into the Strings slice we can access it like the below.
// 1 Label corresponds to two entries, one in LabelsNames and one in LabelsValues.
for i := range series.LabelsNames {
series.Labels[i] = labels.Label{
Name: sg.Strings[series.LabelsNames[i]],
Value: sg.Strings[series.LabelsValues[i]],
}
}
series.LabelsNames = series.LabelsNames[:0]
series.LabelsValues = series.LabelsValues[:0]
}
for _, series := range sg.Metadata {
if cap(series.Labels) < len(series.LabelsNames) {
series.Labels = make(labels.Labels, len(series.LabelsNames))
} else {
series.Labels = series.Labels[:len(series.LabelsNames)]
}
for i := range series.LabelsNames {
series.Labels[i] = labels.Label{
Name: sg.Strings[series.LabelsNames[i]],
Value: sg.Strings[series.LabelsValues[i]],
}
}
// Finally ensure we reset the labelnames and labelvalues.
series.LabelsNames = series.LabelsNames[:0]
series.LabelsValues = series.LabelsValues[:0]
}

sg.Strings = sg.Strings[:0]
return sg, buffer, err
}
Loading