From fe6c64b21ed7e0757375e57b8eca21e9c05f3c89 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Mon, 25 Mar 2024 17:22:33 -0700 Subject: [PATCH] fix: return and respect cursor errors (#24791) ArrayCursors were ignoring errors, which led to panics when nil cursors were operated on. This fix passes errors back up the stack and uses them to enforce healthy cursor creation. Closes https://github.com/influxdata/influxdb/issues/24789 --------- Co-authored-by: Stuart Carnie --- .../internal/storage/resultset.go | 7 +- storage/reads/array_cursor.gen.go | 40 +++++---- storage/reads/array_cursor.gen.go.tmpl | 8 +- storage/reads/array_cursor.go | 5 +- tsdb/engine/tsm1/array_cursor.gen.go | 90 ++++++++++++++----- tsdb/engine/tsm1/array_cursor.gen.go.tmpl | 20 +++-- tsdb/engine/tsm1/array_cursor_iterator.gen.go | 85 ++++++++++++------ .../tsm1/array_cursor_iterator.gen.go.tmpl | 17 ++-- tsdb/engine/tsm1/array_cursor_iterator.go | 10 +-- tsdb/engine/tsm1/array_cursor_test.go | 53 ++++++++--- 10 files changed, 240 insertions(+), 95 deletions(-) diff --git a/cmd/influx_tools/internal/storage/resultset.go b/cmd/influx_tools/internal/storage/resultset.go index 3590a3415e5..ad0ff0f9fdc 100644 --- a/cmd/influx_tools/internal/storage/resultset.go +++ b/cmd/influx_tools/internal/storage/resultset.go @@ -72,13 +72,14 @@ func (ci *CursorIterator) Next() bool { } var shard tsdb.CursorIterator + var err error ci.cur = nil - for ci.cur == nil && len(ci.itrs) > 0 { + for ci.cur == nil && len(ci.itrs) > 0 && err == nil { shard, ci.itrs = ci.itrs[0], ci.itrs[1:] - ci.cur, _ = shard.Next(ci.ctx, &ci.req) + ci.cur, err = shard.Next(ci.ctx, &ci.req) } - return ci.cur != nil + return ci.cur != nil && err == nil } func (ci *CursorIterator) Cursor() tsdb.Cursor { diff --git a/storage/reads/array_cursor.gen.go b/storage/reads/array_cursor.gen.go index a7e477c1768..32dfb99785b 100644 --- a/storage/reads/array_cursor.gen.go +++ b/storage/reads/array_cursor.gen.go @@ -335,13 +335,15 @@ func (c *floatMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.FloatArrayCursor next, ok = cur.(cursors.FloatArrayCursor) if !ok { @@ -1340,13 +1342,15 @@ func (c *integerMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.IntegerArrayCursor next, ok = cur.(cursors.IntegerArrayCursor) if !ok { @@ -2345,13 +2349,15 @@ func (c *unsignedMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.UnsignedArrayCursor next, ok = cur.(cursors.UnsignedArrayCursor) if !ok { @@ -3350,13 +3356,15 @@ func (c *stringMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.StringArrayCursor next, ok = cur.(cursors.StringArrayCursor) if !ok { @@ -3777,13 +3785,15 @@ func (c *booleanMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.BooleanArrayCursor next, ok = cur.(cursors.BooleanArrayCursor) if !ok { diff --git a/storage/reads/array_cursor.gen.go.tmpl b/storage/reads/array_cursor.gen.go.tmpl index adc3801e9b6..736ecce64ed 100644 --- a/storage/reads/array_cursor.gen.go.tmpl +++ b/storage/reads/array_cursor.gen.go.tmpl @@ -282,13 +282,15 @@ func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.{{.Name}}ArrayCursor next, ok = cur.(cursors.{{.Name}}ArrayCursor) if !ok { diff --git a/storage/reads/array_cursor.go b/storage/reads/array_cursor.go index b568464a51f..a710ce2482d 100644 --- a/storage/reads/array_cursor.go +++ b/storage/reads/array_cursor.go @@ -117,12 +117,13 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor { var shard cursors.CursorIterator var cur cursors.Cursor + var err error for cur == nil && len(row.Query) > 0 { shard, row.Query = row.Query[0], row.Query[1:] - cur, _ = shard.Next(m.ctx, &m.req) + cur, err = shard.Next(m.ctx, &m.req) } - if cur == nil { + if cur == nil || err != nil { return nil } diff --git a/tsdb/engine/tsm1/array_cursor.gen.go b/tsdb/engine/tsm1/array_cursor.gen.go index e043b29a482..d56c4e058ef 100644 --- a/tsdb/engine/tsm1/array_cursor.gen.go +++ b/tsdb/engine/tsm1/array_cursor.gen.go @@ -39,7 +39,8 @@ func newFloatArrayAscendingCursor() *floatArrayAscendingCursor { return c } -func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -47,10 +48,14 @@ func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, t }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *floatArrayAscendingCursor) Err() error { return nil } @@ -182,7 +187,8 @@ func newFloatArrayDescendingCursor() *floatArrayDescendingCursor { return c } -func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -194,11 +200,15 @@ func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *floatArrayDescendingCursor) Err() error { return nil } @@ -321,7 +331,8 @@ func newIntegerArrayAscendingCursor() *integerArrayAscendingCursor { return c } -func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -329,10 +340,14 @@ func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *integerArrayAscendingCursor) Err() error { return nil } @@ -464,7 +479,8 @@ func newIntegerArrayDescendingCursor() *integerArrayDescendingCursor { return c } -func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -476,11 +492,15 @@ func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *integerArrayDescendingCursor) Err() error { return nil } @@ -603,7 +623,8 @@ func newUnsignedArrayAscendingCursor() *unsignedArrayAscendingCursor { return c } -func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -611,10 +632,14 @@ func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *unsignedArrayAscendingCursor) Err() error { return nil } @@ -746,7 +771,8 @@ func newUnsignedArrayDescendingCursor() *unsignedArrayDescendingCursor { return c } -func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -758,11 +784,15 @@ func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Value c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *unsignedArrayDescendingCursor) Err() error { return nil } @@ -885,7 +915,8 @@ func newStringArrayAscendingCursor() *stringArrayAscendingCursor { return c } -func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -893,10 +924,14 @@ func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *stringArrayAscendingCursor) Err() error { return nil } @@ -1028,7 +1063,8 @@ func newStringArrayDescendingCursor() *stringArrayDescendingCursor { return c } -func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -1040,11 +1076,15 @@ func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *stringArrayDescendingCursor) Err() error { return nil } @@ -1167,7 +1207,8 @@ func newBooleanArrayAscendingCursor() *booleanArrayAscendingCursor { return c } -func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -1175,10 +1216,14 @@ func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *booleanArrayAscendingCursor) Err() error { return nil } @@ -1310,7 +1355,8 @@ func newBooleanArrayDescendingCursor() *booleanArrayDescendingCursor { return c } -func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -1322,11 +1368,15 @@ func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *booleanArrayDescendingCursor) Err() error { return nil } diff --git a/tsdb/engine/tsm1/array_cursor.gen.go.tmpl b/tsdb/engine/tsm1/array_cursor.gen.go.tmpl index 6639fed9b8d..2fc104784e0 100644 --- a/tsdb/engine/tsm1/array_cursor.gen.go.tmpl +++ b/tsdb/engine/tsm1/array_cursor.gen.go.tmpl @@ -38,18 +38,23 @@ func new{{$Type}}() *{{$type}} { return c } -func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { -c.end = end +func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error + c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *{{$type}}) Err() error { return nil } @@ -184,7 +189,8 @@ func new{{$Type}}() *{{$type}} { return c } -func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -196,11 +202,15 @@ func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *Key c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *{{$type}}) Err() error { return nil } diff --git a/tsdb/engine/tsm1/array_cursor_iterator.gen.go b/tsdb/engine/tsm1/array_cursor_iterator.gen.go index e462e8dc563..6464603cf1d 100644 --- a/tsdb/engine/tsm1/array_cursor_iterator.gen.go +++ b/tsdb/engine/tsm1/array_cursor_iterator.gen.go @@ -15,7 +15,8 @@ import ( ) // buildFloatArrayCursor creates an array cursor for a float field. -func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.FloatArrayCursor { +func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.FloatArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -23,19 +24,26 @@ func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name [] if q.asc.Float == nil { q.asc.Float = newFloatArrayAscendingCursor() } - q.asc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Float + err = q.asc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Float, nil } else { if q.desc.Float == nil { q.desc.Float = newFloatArrayDescendingCursor() } - q.desc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Float + err = q.desc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Float, nil } } // buildIntegerArrayCursor creates an array cursor for a integer field. -func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.IntegerArrayCursor { +func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.IntegerArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -43,19 +51,26 @@ func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name if q.asc.Integer == nil { q.asc.Integer = newIntegerArrayAscendingCursor() } - q.asc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Integer + err = q.asc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Integer, nil } else { if q.desc.Integer == nil { q.desc.Integer = newIntegerArrayDescendingCursor() } - q.desc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Integer + err = q.desc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Integer, nil } } // buildUnsignedArrayCursor creates an array cursor for a unsigned field. -func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.UnsignedArrayCursor { +func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.UnsignedArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -63,19 +78,26 @@ func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name if q.asc.Unsigned == nil { q.asc.Unsigned = newUnsignedArrayAscendingCursor() } - q.asc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Unsigned + err = q.asc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Unsigned, nil } else { if q.desc.Unsigned == nil { q.desc.Unsigned = newUnsignedArrayDescendingCursor() } - q.desc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Unsigned + err = q.desc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Unsigned, nil } } // buildStringArrayCursor creates an array cursor for a string field. -func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.StringArrayCursor { +func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.StringArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -83,19 +105,26 @@ func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name [ if q.asc.String == nil { q.asc.String = newStringArrayAscendingCursor() } - q.asc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.String + err = q.asc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.String, nil } else { if q.desc.String == nil { q.desc.String = newStringArrayDescendingCursor() } - q.desc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.String + err = q.desc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.String, nil } } // buildBooleanArrayCursor creates an array cursor for a boolean field. -func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.BooleanArrayCursor { +func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.BooleanArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -103,13 +132,19 @@ func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name if q.asc.Boolean == nil { q.asc.Boolean = newBooleanArrayAscendingCursor() } - q.asc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Boolean + err = q.asc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Boolean, nil } else { if q.desc.Boolean == nil { q.desc.Boolean = newBooleanArrayDescendingCursor() } - q.desc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Boolean + err = q.desc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Boolean, nil } } diff --git a/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl b/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl index 48f1b1400c1..1b78b22a437 100644 --- a/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl +++ b/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl @@ -11,7 +11,8 @@ import ( {{range .}} // build{{.Name}}ArrayCursor creates an array cursor for a {{.name}} field. -func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.{{.Name}}ArrayCursor { +func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.{{.Name}}ArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -19,14 +20,20 @@ func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, nam if q.asc.{{.Name}} == nil { q.asc.{{.Name}} = new{{.Name}}ArrayAscendingCursor() } - q.asc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.{{.Name}} + err = q.asc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.{{.Name}}, nil } else { if q.desc.{{.Name}} == nil { q.desc.{{.Name}} = new{{.Name}}ArrayDescendingCursor() } - q.desc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.{{.Name}} + err = q.desc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.{{.Name}}, nil } } diff --git a/tsdb/engine/tsm1/array_cursor_iterator.go b/tsdb/engine/tsm1/array_cursor_iterator.go index 0ba62273e8f..b6b6b3787d4 100644 --- a/tsdb/engine/tsm1/array_cursor_iterator.go +++ b/tsdb/engine/tsm1/array_cursor_iterator.go @@ -62,15 +62,15 @@ func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) ( // Return appropriate cursor based on type. switch f.Type { case influxql.Float: - return q.buildFloatArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildFloatArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.Integer: - return q.buildIntegerArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildIntegerArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.Unsigned: - return q.buildUnsignedArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildUnsignedArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.String: - return q.buildStringArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildStringArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.Boolean: - return q.buildBooleanArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildBooleanArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) default: panic(fmt.Sprintf("unreachable: %T", f.Type)) } diff --git a/tsdb/engine/tsm1/array_cursor_test.go b/tsdb/engine/tsm1/array_cursor_test.go index 11566dfde8e..43c45e6de11 100644 --- a/tsdb/engine/tsm1/array_cursor_test.go +++ b/tsdb/engine/tsm1/array_cursor_test.go @@ -69,6 +69,35 @@ func newFiles(dir string, values ...keyValues) ([]string, error) { return files, nil } +func TestCursor_ResetFail(t *testing.T) { + t.Run("bad block", func(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + fs := NewFileStore(dir) + + const START, END = 10, 1 + + data := []keyValues{ + // Write a single data point with timestamp equal to END + {"m,_field=v#!~#v", []Value{NewIntegerValue(1, 1)}}, + } + + files, err := newFiles(dir, data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + _ = fs.Replace(nil, files) + + kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) + defer kc.Close() + // Open a float cursor for an integer block + cur := newFloatArrayDescendingCursor() + err = cur.reset(START, END, nil, kc) + assert.ErrorContains(t, err, "invalid block", "expected invalid block") + }) +} + func TestDescendingCursor_SinglePointStartTime(t *testing.T) { t.Run("cache", func(t *testing.T) { dir := MustTempDir() @@ -80,7 +109,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) { defer kc.Close() cur := newIntegerArrayDescendingCursor() // Include a cached value with timestamp equal to END - cur.reset(START, END, Values{NewIntegerValue(1, 1)}, kc) + assert.NoError(t, cur.reset(START, END, Values{NewIntegerValue(1, 1)}, kc), "unexpected error resetting cursor") var got []int64 ar := cur.Next() @@ -115,7 +144,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newIntegerArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") var got []int64 ar := cur.Next() @@ -163,7 +192,7 @@ func TestFileStore_DuplicatePoints(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true) defer kc.Close() cur := newFloatArrayAscendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") var got []int64 ar := cur.Next() @@ -182,7 +211,7 @@ func TestFileStore_DuplicatePoints(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newFloatArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") var got []int64 ar := cur.Next() @@ -257,7 +286,7 @@ func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing. kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true) defer kc.Close() cur := newFloatArrayAscendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeTs(1000, 800, 10) exp = append(exp, makeTs(1005, 400, 10)...) @@ -280,7 +309,7 @@ func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing. kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newFloatArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeTs(1000, 800, 10) exp = append(exp, makeTs(1005, 400, 10)...) @@ -360,7 +389,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true) defer kc.Close() cur := newFloatArrayAscendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 3500, 10, 1.01) a2 := makeArray(4005, 3500, 5, 2.01) @@ -387,7 +416,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newFloatArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 3500, 10, 1.01) a2 := makeArray(4005, 3500, 5, 2.01) @@ -454,7 +483,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true) defer kc.Close() cur := newFloatArrayAscendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 100, 1, 1.01) @@ -479,7 +508,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true) defer kc.Close() cur := newFloatArrayAscendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1050, 50, 1, 1.01) a2 := makeArray(1100, 50, 1, 2.01) @@ -506,7 +535,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newFloatArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 100, 1, 1.01) sort.Sort(sort.Reverse(&FloatArray{exp})) @@ -532,7 +561,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newFloatArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1050, 50, 1, 1.01) a2 := makeArray(1100, 50, 1, 2.01)