Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aguzun/short ooo for single archive #39

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,6 @@ func (whisper *Whisper) readHeaderCompressed() (err error) {
if !arc.hasBuffer() {
continue
}
arc.buffer = make([]byte, arc.bufferSize)

readed, err = whisper.file.Read(arc.buffer)
if err != nil {
return fmt.Errorf("unable to read archive %d buffer: %s", i, err)
Expand Down Expand Up @@ -474,7 +472,7 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points
}

baseIntervalsPerUnit, currentUnit, minInterval := archive.getBufferInfo()
bufferUnitPointsCount := archive.next.secondsPerPoint / archive.secondsPerPoint
bufferUnitPointsCount := whisper.bufferUnitPointsCount(archive)
for aindex := 0; aindex < len(alignedPoints); {
dp := alignedPoints[aindex]
dpBaseInterval := archive.AggregateInterval(dp.interval)
Expand Down Expand Up @@ -537,7 +535,9 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points
// TODO: record and continue?
return err
}

if archive.next == nil {
continue
}
// propagate
lower := archive.next
lowerIntervalStart := archive.AggregateInterval(dps[0].interval)
Expand Down Expand Up @@ -584,11 +584,11 @@ func (archive *archiveInfo) getBufferInfo() (units []int, index, min int) {
}

func (archive *archiveInfo) bufferUnitCount() int {
return len(archive.buffer) / PointSize / (archive.next.secondsPerPoint / archive.secondsPerPoint)
return len(archive.buffer) / PointSize / archive.whisper.bufferUnitPointsCount(archive)
}

func (archive *archiveInfo) getBufferByUnit(unit int) []byte {
count := archive.next.secondsPerPoint / archive.secondsPerPoint
count := archive.whisper.bufferUnitPointsCount(archive)
lb := unit * PointSize * count
ub := (unit + 1) * PointSize * count
return archive.buffer[lb:ub]
Expand Down
34 changes: 33 additions & 1 deletion compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ func TestCompressedWhisperSingleRetentionOutOfOrderWrite(t *testing.T) {
{Value: 1, Time: now + 1},
{Value: 1, Time: now + 2},
})
// we can accept OOO for short period
cwhisper.UpdateMany([]*TimeSeriesPoint{
{Value: 0, Time: now + 1},
})
Expand All @@ -822,7 +823,38 @@ func TestCompressedWhisperSingleRetentionOutOfOrderWrite(t *testing.T) {
}
if got, want := data.Points(), []TimeSeriesPoint{
{Time: now + 0, Value: 1},
{Time: now + 1, Value: 1},
{Time: now + 1, Value: 0},
{Time: now + 2, Value: 1},
}; !reflect.DeepEqual(got, want) {
t.Errorf("data.Points() = %v; want %v", got, want)
}
i := 3
var points []*TimeSeriesPoint
for {
points = append(
points,
&TimeSeriesPoint{Value: float64(i), Time: now + i},
)
if i > 120 {
break
}
i += 1
}
cwhisper.UpdateMany(points)

// buffer us flushed, can't accept OOO data not within the buffer

cwhisper.UpdateMany([]*TimeSeriesPoint{
{Value: 1000, Time: now + 1},
})

data, err = cwhisper.Fetch(now-1, now+2)
if err != nil {
t.Error(err)
}
if got, want := data.Points(), []TimeSeriesPoint{
{Time: now + 0, Value: 1},
{Time: now + 1, Value: 0},
{Time: now + 2, Value: 1},
}; !reflect.DeepEqual(got, want) {
t.Errorf("data.Points() = %v; want %v", got, want)
Expand Down
37 changes: 27 additions & 10 deletions whisper.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ const (
classicHeaderAggregationOffset = 0
classicHeaderXFFOffset = IntSize * 2
)
const (
BufferUnitPointsCountSingleArchive = 60
CompVersion = 2 // with added archive buffer for single archive
)

// Note: 4 bytes long in Whisper Header, 1 byte long in Archive Header
type AggregationMethod int
Expand Down Expand Up @@ -415,7 +419,7 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg
whisper.opts = options

whisper.compressed = options.Compressed
whisper.compVersion = 1
whisper.compVersion = CompVersion
whisper.pointsPerBlock = options.PointsPerBlock
whisper.avgCompressedPointSize = options.PointSize
for _, retention := range retentions {
Expand Down Expand Up @@ -481,11 +485,6 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg
archive.offset = offset
offset += archive.blockSize * archive.blockCount

if i > 0 {
size := archive.secondsPerPoint / whisper.archives[i-1].secondsPerPoint * PointSize * 2
whisper.archives[i-1].buffer = make([]byte, size)
}

continue
}

Expand Down Expand Up @@ -699,10 +698,16 @@ func (whisper *Whisper) initMetaInfo() {
prevArc := whisper.archives[i-1]
prevArc.next = arc

if whisper.aggregationMethod != Mix && whisper.compVersion == 1 {
if whisper.aggregationMethod != Mix && (whisper.compVersion == 1 || whisper.compVersion == 2) {
prevArc.bufferSize = arc.secondsPerPoint / prevArc.secondsPerPoint * PointSize * bufferCount
prevArc.buffer = make([]byte, prevArc.bufferSize)
}
}
// for OOO write for short time
if len(whisper.archives) == 1 && whisper.compVersion == 2 {
whisper.archives[0].bufferSize = 60 * PointSize * bufferCount
whisper.archives[0].buffer = make([]byte, whisper.archives[0].bufferSize)
}
}

func (whisper *Whisper) writeHeader() (err error) {
Expand Down Expand Up @@ -778,8 +783,8 @@ func (whisper *Whisper) bufferSize() int {
return 0
}
var bufSize int
for i, arc := range whisper.archives[1:] {
bufSize += arc.secondsPerPoint / whisper.archives[i].secondsPerPoint * PointSize * bufferCount
for _, arc := range whisper.archives {
bufSize += whisper.bufferUnitPointsCount(arc) * PointSize * bufferCount
}
return bufSize
}
Expand Down Expand Up @@ -965,6 +970,17 @@ func (whisper *Whisper) archiveUpdateMany(archive *archiveInfo, points []*TimeSe
alignedPoints := alignPoints(archive, points)
return whisper.archiveUpdateManyDataPoints(archive, alignedPoints, true)
}
func (whisper *Whisper) bufferUnitPointsCount(archive *archiveInfo) int {
for i, arc := range whisper.archives {
if i != 0 && whisper.archives[i-1].Retention == archive.Retention {
return arc.secondsPerPoint / whisper.archives[i-1].secondsPerPoint
}
}
if len(whisper.archives) == 1 {
return BufferUnitPointsCountSingleArchive
}
return 0
}

// skipcq: RVV-A0005
func (whisper *Whisper) archiveUpdateManyDataPoints(archive *archiveInfo, alignedPoints []dataPoint, propagate bool) error {
Expand Down Expand Up @@ -1518,7 +1534,8 @@ func (archive *archiveInfo) Interval(time int) int {
}

func (archive *archiveInfo) AggregateInterval(time int) int {
return time - mod(time, archive.next.secondsPerPoint)
count := archive.whisper.bufferUnitPointsCount(archive)
return time - mod(time, count*archive.secondsPerPoint) // supposed archive.next.secondsPerPoint
}

type TimeSeries struct {
Expand Down