From 66e928efa546d2a24d81e1a8d071f78c9617dee1 Mon Sep 17 00:00:00 2001 From: Andrei Guzun Date: Fri, 3 Mar 2023 10:29:43 +0100 Subject: [PATCH] Previously when graphite schema had only one archive, there was not OOO for short period because there was no buffer. In this commit buffer for single archive is added allowing OOO within short time --- compress.go | 12 ++++++------ compress_test.go | 34 +++++++++++++++++++++++++++++++++- whisper.go | 37 +++++++++++++++++++++++++++---------- 3 files changed, 66 insertions(+), 17 deletions(-) diff --git a/compress.go b/compress.go index 874910f..de9290c 100644 --- a/compress.go +++ b/compress.go @@ -283,8 +283,6 @@ func (whisper *Whisper) readHeaderCompressed() (err error) { if !arc.hasBuffer() { continue } - arc.buffer = make([]byte, arc.bufferSize) - readed, err = whisper.file.Read(arc.buffer) if err != nil { return fmt.Errorf("unable to read archive %d buffer: %s", i, err) @@ -474,7 +472,7 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points } baseIntervalsPerUnit, currentUnit, minInterval := archive.getBufferInfo() - bufferUnitPointsCount := archive.next.secondsPerPoint / archive.secondsPerPoint + bufferUnitPointsCount := whisper.bufferUnitPointsCount(archive) for aindex := 0; aindex < len(alignedPoints); { dp := alignedPoints[aindex] dpBaseInterval := archive.AggregateInterval(dp.interval) @@ -537,7 +535,9 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points // TODO: record and continue? return err } - + if archive.next == nil { + continue + } // propagate lower := archive.next lowerIntervalStart := archive.AggregateInterval(dps[0].interval) @@ -584,11 +584,11 @@ func (archive *archiveInfo) getBufferInfo() (units []int, index, min int) { } func (archive *archiveInfo) bufferUnitCount() int { - return len(archive.buffer) / PointSize / (archive.next.secondsPerPoint / archive.secondsPerPoint) + return len(archive.buffer) / PointSize / archive.whisper.bufferUnitPointsCount(archive) } func (archive *archiveInfo) getBufferByUnit(unit int) []byte { - count := archive.next.secondsPerPoint / archive.secondsPerPoint + count := archive.whisper.bufferUnitPointsCount(archive) lb := unit * PointSize * count ub := (unit + 1) * PointSize * count return archive.buffer[lb:ub] diff --git a/compress_test.go b/compress_test.go index 9ab920a..d913bbe 100644 --- a/compress_test.go +++ b/compress_test.go @@ -812,6 +812,7 @@ func TestCompressedWhisperSingleRetentionOutOfOrderWrite(t *testing.T) { {Value: 1, Time: now + 1}, {Value: 1, Time: now + 2}, }) + // we can accept OOO for short period cwhisper.UpdateMany([]*TimeSeriesPoint{ {Value: 0, Time: now + 1}, }) @@ -822,7 +823,38 @@ func TestCompressedWhisperSingleRetentionOutOfOrderWrite(t *testing.T) { } if got, want := data.Points(), []TimeSeriesPoint{ {Time: now + 0, Value: 1}, - {Time: now + 1, Value: 1}, + {Time: now + 1, Value: 0}, + {Time: now + 2, Value: 1}, + }; !reflect.DeepEqual(got, want) { + t.Errorf("data.Points() = %v; want %v", got, want) + } + i := 3 + var points []*TimeSeriesPoint + for { + points = append( + points, + &TimeSeriesPoint{Value: float64(i), Time: now + i}, + ) + if i > 120 { + break + } + i += 1 + } + cwhisper.UpdateMany(points) + + // buffer us flushed, can't accept OOO data not within the buffer + + cwhisper.UpdateMany([]*TimeSeriesPoint{ + {Value: 1000, Time: now + 1}, + }) + + data, err = cwhisper.Fetch(now-1, now+2) + if err != nil { + t.Error(err) + } + if got, want := data.Points(), []TimeSeriesPoint{ + {Time: now + 0, Value: 1}, + {Time: now + 1, Value: 0}, {Time: now + 2, Value: 1}, }; !reflect.DeepEqual(got, want) { t.Errorf("data.Points() = %v; want %v", got, want) diff --git a/whisper.go b/whisper.go index 7dca0b2..c758ec2 100644 --- a/whisper.go +++ b/whisper.go @@ -43,6 +43,10 @@ const ( classicHeaderAggregationOffset = 0 classicHeaderXFFOffset = IntSize * 2 ) +const ( + BufferUnitPointsCountSingleArchive = 60 + CompVersion = 2 // with added archive buffer for single archive +) // Note: 4 bytes long in Whisper Header, 1 byte long in Archive Header type AggregationMethod int @@ -415,7 +419,7 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg whisper.opts = options whisper.compressed = options.Compressed - whisper.compVersion = 1 + whisper.compVersion = CompVersion whisper.pointsPerBlock = options.PointsPerBlock whisper.avgCompressedPointSize = options.PointSize for _, retention := range retentions { @@ -481,11 +485,6 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg archive.offset = offset offset += archive.blockSize * archive.blockCount - if i > 0 { - size := archive.secondsPerPoint / whisper.archives[i-1].secondsPerPoint * PointSize * 2 - whisper.archives[i-1].buffer = make([]byte, size) - } - continue } @@ -699,10 +698,16 @@ func (whisper *Whisper) initMetaInfo() { prevArc := whisper.archives[i-1] prevArc.next = arc - if whisper.aggregationMethod != Mix && whisper.compVersion == 1 { + if whisper.aggregationMethod != Mix && (whisper.compVersion == 1 || whisper.compVersion == 2) { prevArc.bufferSize = arc.secondsPerPoint / prevArc.secondsPerPoint * PointSize * bufferCount + prevArc.buffer = make([]byte, prevArc.bufferSize) } } + // for OOO write for short time + if len(whisper.archives) == 1 && whisper.compVersion == 2 { + whisper.archives[0].bufferSize = 60 * PointSize * bufferCount + whisper.archives[0].buffer = make([]byte, whisper.archives[0].bufferSize) + } } func (whisper *Whisper) writeHeader() (err error) { @@ -778,8 +783,8 @@ func (whisper *Whisper) bufferSize() int { return 0 } var bufSize int - for i, arc := range whisper.archives[1:] { - bufSize += arc.secondsPerPoint / whisper.archives[i].secondsPerPoint * PointSize * bufferCount + for _, arc := range whisper.archives { + bufSize += whisper.bufferUnitPointsCount(arc) * PointSize * bufferCount } return bufSize } @@ -965,6 +970,17 @@ func (whisper *Whisper) archiveUpdateMany(archive *archiveInfo, points []*TimeSe alignedPoints := alignPoints(archive, points) return whisper.archiveUpdateManyDataPoints(archive, alignedPoints, true) } +func (whisper *Whisper) bufferUnitPointsCount(archive *archiveInfo) int { + for i, arc := range whisper.archives { + if i != 0 && whisper.archives[i-1].Retention == archive.Retention { + return arc.secondsPerPoint / whisper.archives[i-1].secondsPerPoint + } + } + if len(whisper.archives) == 1 { + return BufferUnitPointsCountSingleArchive + } + return 0 +} // skipcq: RVV-A0005 func (whisper *Whisper) archiveUpdateManyDataPoints(archive *archiveInfo, alignedPoints []dataPoint, propagate bool) error { @@ -1518,7 +1534,8 @@ func (archive *archiveInfo) Interval(time int) int { } func (archive *archiveInfo) AggregateInterval(time int) int { - return time - mod(time, archive.next.secondsPerPoint) + count := archive.whisper.bufferUnitPointsCount(archive) + return time - mod(time, count*archive.secondsPerPoint) // supposed archive.next.secondsPerPoint } type TimeSeries struct {