diff --git a/compress.go b/compress.go index 874910f..de9290c 100644 --- a/compress.go +++ b/compress.go @@ -283,8 +283,6 @@ func (whisper *Whisper) readHeaderCompressed() (err error) { if !arc.hasBuffer() { continue } - arc.buffer = make([]byte, arc.bufferSize) - readed, err = whisper.file.Read(arc.buffer) if err != nil { return fmt.Errorf("unable to read archive %d buffer: %s", i, err) @@ -474,7 +472,7 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points } baseIntervalsPerUnit, currentUnit, minInterval := archive.getBufferInfo() - bufferUnitPointsCount := archive.next.secondsPerPoint / archive.secondsPerPoint + bufferUnitPointsCount := whisper.bufferUnitPointsCount(archive) for aindex := 0; aindex < len(alignedPoints); { dp := alignedPoints[aindex] dpBaseInterval := archive.AggregateInterval(dp.interval) @@ -537,7 +535,9 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points // TODO: record and continue? return err } - + if archive.next == nil { + continue + } // propagate lower := archive.next lowerIntervalStart := archive.AggregateInterval(dps[0].interval) @@ -584,11 +584,11 @@ func (archive *archiveInfo) getBufferInfo() (units []int, index, min int) { } func (archive *archiveInfo) bufferUnitCount() int { - return len(archive.buffer) / PointSize / (archive.next.secondsPerPoint / archive.secondsPerPoint) + return len(archive.buffer) / PointSize / archive.whisper.bufferUnitPointsCount(archive) } func (archive *archiveInfo) getBufferByUnit(unit int) []byte { - count := archive.next.secondsPerPoint / archive.secondsPerPoint + count := archive.whisper.bufferUnitPointsCount(archive) lb := unit * PointSize * count ub := (unit + 1) * PointSize * count return archive.buffer[lb:ub] diff --git a/compress_test.go b/compress_test.go index 9ab920a..d913bbe 100644 --- a/compress_test.go +++ b/compress_test.go @@ -812,6 +812,7 @@ func TestCompressedWhisperSingleRetentionOutOfOrderWrite(t *testing.T) { {Value: 1, Time: now + 1}, {Value: 1, Time: now + 2}, }) + // we can accept OOO for short period cwhisper.UpdateMany([]*TimeSeriesPoint{ {Value: 0, Time: now + 1}, }) @@ -822,7 +823,38 @@ func TestCompressedWhisperSingleRetentionOutOfOrderWrite(t *testing.T) { } if got, want := data.Points(), []TimeSeriesPoint{ {Time: now + 0, Value: 1}, - {Time: now + 1, Value: 1}, + {Time: now + 1, Value: 0}, + {Time: now + 2, Value: 1}, + }; !reflect.DeepEqual(got, want) { + t.Errorf("data.Points() = %v; want %v", got, want) + } + i := 3 + var points []*TimeSeriesPoint + for { + points = append( + points, + &TimeSeriesPoint{Value: float64(i), Time: now + i}, + ) + if i > 120 { + break + } + i += 1 + } + cwhisper.UpdateMany(points) + + // buffer us flushed, can't accept OOO data not within the buffer + + cwhisper.UpdateMany([]*TimeSeriesPoint{ + {Value: 1000, Time: now + 1}, + }) + + data, err = cwhisper.Fetch(now-1, now+2) + if err != nil { + t.Error(err) + } + if got, want := data.Points(), []TimeSeriesPoint{ + {Time: now + 0, Value: 1}, + {Time: now + 1, Value: 0}, {Time: now + 2, Value: 1}, }; !reflect.DeepEqual(got, want) { t.Errorf("data.Points() = %v; want %v", got, want) diff --git a/whisper.go b/whisper.go index 7dca0b2..c758ec2 100644 --- a/whisper.go +++ b/whisper.go @@ -43,6 +43,10 @@ const ( classicHeaderAggregationOffset = 0 classicHeaderXFFOffset = IntSize * 2 ) +const ( + BufferUnitPointsCountSingleArchive = 60 + CompVersion = 2 // with added archive buffer for single archive +) // Note: 4 bytes long in Whisper Header, 1 byte long in Archive Header type AggregationMethod int @@ -415,7 +419,7 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg whisper.opts = options whisper.compressed = options.Compressed - whisper.compVersion = 1 + whisper.compVersion = CompVersion whisper.pointsPerBlock = options.PointsPerBlock whisper.avgCompressedPointSize = options.PointSize for _, retention := range retentions { @@ -481,11 +485,6 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg archive.offset = offset offset += archive.blockSize * archive.blockCount - if i > 0 { - size := archive.secondsPerPoint / whisper.archives[i-1].secondsPerPoint * PointSize * 2 - whisper.archives[i-1].buffer = make([]byte, size) - } - continue } @@ -699,10 +698,16 @@ func (whisper *Whisper) initMetaInfo() { prevArc := whisper.archives[i-1] prevArc.next = arc - if whisper.aggregationMethod != Mix && whisper.compVersion == 1 { + if whisper.aggregationMethod != Mix && (whisper.compVersion == 1 || whisper.compVersion == 2) { prevArc.bufferSize = arc.secondsPerPoint / prevArc.secondsPerPoint * PointSize * bufferCount + prevArc.buffer = make([]byte, prevArc.bufferSize) } } + // for OOO write for short time + if len(whisper.archives) == 1 && whisper.compVersion == 2 { + whisper.archives[0].bufferSize = 60 * PointSize * bufferCount + whisper.archives[0].buffer = make([]byte, whisper.archives[0].bufferSize) + } } func (whisper *Whisper) writeHeader() (err error) { @@ -778,8 +783,8 @@ func (whisper *Whisper) bufferSize() int { return 0 } var bufSize int - for i, arc := range whisper.archives[1:] { - bufSize += arc.secondsPerPoint / whisper.archives[i].secondsPerPoint * PointSize * bufferCount + for _, arc := range whisper.archives { + bufSize += whisper.bufferUnitPointsCount(arc) * PointSize * bufferCount } return bufSize } @@ -965,6 +970,17 @@ func (whisper *Whisper) archiveUpdateMany(archive *archiveInfo, points []*TimeSe alignedPoints := alignPoints(archive, points) return whisper.archiveUpdateManyDataPoints(archive, alignedPoints, true) } +func (whisper *Whisper) bufferUnitPointsCount(archive *archiveInfo) int { + for i, arc := range whisper.archives { + if i != 0 && whisper.archives[i-1].Retention == archive.Retention { + return arc.secondsPerPoint / whisper.archives[i-1].secondsPerPoint + } + } + if len(whisper.archives) == 1 { + return BufferUnitPointsCountSingleArchive + } + return 0 +} // skipcq: RVV-A0005 func (whisper *Whisper) archiveUpdateManyDataPoints(archive *archiveInfo, alignedPoints []dataPoint, propagate bool) error { @@ -1518,7 +1534,8 @@ func (archive *archiveInfo) Interval(time int) int { } func (archive *archiveInfo) AggregateInterval(time int) int { - return time - mod(time, archive.next.secondsPerPoint) + count := archive.whisper.bufferUnitPointsCount(archive) + return time - mod(time, count*archive.secondsPerPoint) // supposed archive.next.secondsPerPoint } type TimeSeries struct {