diff --git a/cmd/influx_tools/internal/storage/resultset.go b/cmd/influx_tools/internal/storage/resultset.go index 3590a3415e5..ad0ff0f9fdc 100644 --- a/cmd/influx_tools/internal/storage/resultset.go +++ b/cmd/influx_tools/internal/storage/resultset.go @@ -72,13 +72,14 @@ func (ci *CursorIterator) Next() bool { } var shard tsdb.CursorIterator + var err error ci.cur = nil - for ci.cur == nil && len(ci.itrs) > 0 { + for ci.cur == nil && len(ci.itrs) > 0 && err == nil { shard, ci.itrs = ci.itrs[0], ci.itrs[1:] - ci.cur, _ = shard.Next(ci.ctx, &ci.req) + ci.cur, err = shard.Next(ci.ctx, &ci.req) } - return ci.cur != nil + return ci.cur != nil && err == nil } func (ci *CursorIterator) Cursor() tsdb.Cursor { diff --git a/storage/reads/array_cursor.gen.go b/storage/reads/array_cursor.gen.go index a7e477c1768..32dfb99785b 100644 --- a/storage/reads/array_cursor.gen.go +++ b/storage/reads/array_cursor.gen.go @@ -335,13 +335,15 @@ func (c *floatMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.FloatArrayCursor next, ok = cur.(cursors.FloatArrayCursor) if !ok { @@ -1340,13 +1342,15 @@ func (c *integerMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.IntegerArrayCursor next, ok = cur.(cursors.IntegerArrayCursor) if !ok { @@ -2345,13 +2349,15 @@ func (c *unsignedMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.UnsignedArrayCursor next, ok = cur.(cursors.UnsignedArrayCursor) if !ok { @@ -3350,13 +3356,15 @@ func (c *stringMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.StringArrayCursor next, ok = cur.(cursors.StringArrayCursor) if !ok { @@ -3777,13 +3785,15 @@ func (c *booleanMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.BooleanArrayCursor next, ok = cur.(cursors.BooleanArrayCursor) if !ok { diff --git a/storage/reads/array_cursor.gen.go.tmpl b/storage/reads/array_cursor.gen.go.tmpl index adc3801e9b6..736ecce64ed 100644 --- a/storage/reads/array_cursor.gen.go.tmpl +++ b/storage/reads/array_cursor.gen.go.tmpl @@ -282,13 +282,15 @@ func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.{{.Name}}ArrayCursor next, ok = cur.(cursors.{{.Name}}ArrayCursor) if !ok { diff --git a/storage/reads/array_cursor.go b/storage/reads/array_cursor.go index b568464a51f..a710ce2482d 100644 --- a/storage/reads/array_cursor.go +++ b/storage/reads/array_cursor.go @@ -117,12 +117,13 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor { var shard cursors.CursorIterator var cur cursors.Cursor + var err error for cur == nil && len(row.Query) > 0 { shard, row.Query = row.Query[0], row.Query[1:] - cur, _ = shard.Next(m.ctx, &m.req) + cur, err = shard.Next(m.ctx, &m.req) } - if cur == nil { + if cur == nil || err != nil { return nil } diff --git a/tsdb/engine/tsm1/array_cursor.gen.go b/tsdb/engine/tsm1/array_cursor.gen.go index e043b29a482..d56c4e058ef 100644 --- a/tsdb/engine/tsm1/array_cursor.gen.go +++ b/tsdb/engine/tsm1/array_cursor.gen.go @@ -39,7 +39,8 @@ func newFloatArrayAscendingCursor() *floatArrayAscendingCursor { return c } -func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -47,10 +48,14 @@ func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, t }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *floatArrayAscendingCursor) Err() error { return nil } @@ -182,7 +187,8 @@ func newFloatArrayDescendingCursor() *floatArrayDescendingCursor { return c } -func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -194,11 +200,15 @@ func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *floatArrayDescendingCursor) Err() error { return nil } @@ -321,7 +331,8 @@ func newIntegerArrayAscendingCursor() *integerArrayAscendingCursor { return c } -func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -329,10 +340,14 @@ func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *integerArrayAscendingCursor) Err() error { return nil } @@ -464,7 +479,8 @@ func newIntegerArrayDescendingCursor() *integerArrayDescendingCursor { return c } -func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -476,11 +492,15 @@ func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *integerArrayDescendingCursor) Err() error { return nil } @@ -603,7 +623,8 @@ func newUnsignedArrayAscendingCursor() *unsignedArrayAscendingCursor { return c } -func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -611,10 +632,14 @@ func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *unsignedArrayAscendingCursor) Err() error { return nil } @@ -746,7 +771,8 @@ func newUnsignedArrayDescendingCursor() *unsignedArrayDescendingCursor { return c } -func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -758,11 +784,15 @@ func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Value c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *unsignedArrayDescendingCursor) Err() error { return nil } @@ -885,7 +915,8 @@ func newStringArrayAscendingCursor() *stringArrayAscendingCursor { return c } -func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -893,10 +924,14 @@ func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *stringArrayAscendingCursor) Err() error { return nil } @@ -1028,7 +1063,8 @@ func newStringArrayDescendingCursor() *stringArrayDescendingCursor { return c } -func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -1040,11 +1076,15 @@ func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *stringArrayDescendingCursor) Err() error { return nil } @@ -1167,7 +1207,8 @@ func newBooleanArrayAscendingCursor() *booleanArrayAscendingCursor { return c } -func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -1175,10 +1216,14 @@ func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *booleanArrayAscendingCursor) Err() error { return nil } @@ -1310,7 +1355,8 @@ func newBooleanArrayDescendingCursor() *booleanArrayDescendingCursor { return c } -func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -1322,11 +1368,15 @@ func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *booleanArrayDescendingCursor) Err() error { return nil } diff --git a/tsdb/engine/tsm1/array_cursor.gen.go.tmpl b/tsdb/engine/tsm1/array_cursor.gen.go.tmpl index 6639fed9b8d..2fc104784e0 100644 --- a/tsdb/engine/tsm1/array_cursor.gen.go.tmpl +++ b/tsdb/engine/tsm1/array_cursor.gen.go.tmpl @@ -38,18 +38,23 @@ func new{{$Type}}() *{{$type}} { return c } -func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { -c.end = end +func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error + c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *{{$type}}) Err() error { return nil } @@ -184,7 +189,8 @@ func new{{$Type}}() *{{$type}} { return c } -func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -196,11 +202,15 @@ func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *Key c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *{{$type}}) Err() error { return nil } diff --git a/tsdb/engine/tsm1/array_cursor_iterator.gen.go b/tsdb/engine/tsm1/array_cursor_iterator.gen.go index e462e8dc563..6464603cf1d 100644 --- a/tsdb/engine/tsm1/array_cursor_iterator.gen.go +++ b/tsdb/engine/tsm1/array_cursor_iterator.gen.go @@ -15,7 +15,8 @@ import ( ) // buildFloatArrayCursor creates an array cursor for a float field. -func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.FloatArrayCursor { +func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.FloatArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -23,19 +24,26 @@ func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name [] if q.asc.Float == nil { q.asc.Float = newFloatArrayAscendingCursor() } - q.asc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Float + err = q.asc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Float, nil } else { if q.desc.Float == nil { q.desc.Float = newFloatArrayDescendingCursor() } - q.desc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Float + err = q.desc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Float, nil } } // buildIntegerArrayCursor creates an array cursor for a integer field. -func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.IntegerArrayCursor { +func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.IntegerArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -43,19 +51,26 @@ func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name if q.asc.Integer == nil { q.asc.Integer = newIntegerArrayAscendingCursor() } - q.asc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Integer + err = q.asc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Integer, nil } else { if q.desc.Integer == nil { q.desc.Integer = newIntegerArrayDescendingCursor() } - q.desc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Integer + err = q.desc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Integer, nil } } // buildUnsignedArrayCursor creates an array cursor for a unsigned field. -func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.UnsignedArrayCursor { +func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.UnsignedArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -63,19 +78,26 @@ func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name if q.asc.Unsigned == nil { q.asc.Unsigned = newUnsignedArrayAscendingCursor() } - q.asc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Unsigned + err = q.asc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Unsigned, nil } else { if q.desc.Unsigned == nil { q.desc.Unsigned = newUnsignedArrayDescendingCursor() } - q.desc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Unsigned + err = q.desc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Unsigned, nil } } // buildStringArrayCursor creates an array cursor for a string field. -func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.StringArrayCursor { +func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.StringArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -83,19 +105,26 @@ func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name [ if q.asc.String == nil { q.asc.String = newStringArrayAscendingCursor() } - q.asc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.String + err = q.asc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.String, nil } else { if q.desc.String == nil { q.desc.String = newStringArrayDescendingCursor() } - q.desc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.String + err = q.desc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.String, nil } } // buildBooleanArrayCursor creates an array cursor for a boolean field. -func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.BooleanArrayCursor { +func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.BooleanArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -103,13 +132,19 @@ func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name if q.asc.Boolean == nil { q.asc.Boolean = newBooleanArrayAscendingCursor() } - q.asc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Boolean + err = q.asc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Boolean, nil } else { if q.desc.Boolean == nil { q.desc.Boolean = newBooleanArrayDescendingCursor() } - q.desc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Boolean + err = q.desc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Boolean, nil } } diff --git a/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl b/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl index 48f1b1400c1..1b78b22a437 100644 --- a/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl +++ b/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl @@ -11,7 +11,8 @@ import ( {{range .}} // build{{.Name}}ArrayCursor creates an array cursor for a {{.name}} field. -func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.{{.Name}}ArrayCursor { +func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.{{.Name}}ArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -19,14 +20,20 @@ func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, nam if q.asc.{{.Name}} == nil { q.asc.{{.Name}} = new{{.Name}}ArrayAscendingCursor() } - q.asc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.{{.Name}} + err = q.asc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.{{.Name}}, nil } else { if q.desc.{{.Name}} == nil { q.desc.{{.Name}} = new{{.Name}}ArrayDescendingCursor() } - q.desc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.{{.Name}} + err = q.desc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.{{.Name}}, nil } } diff --git a/tsdb/engine/tsm1/array_cursor_iterator.go b/tsdb/engine/tsm1/array_cursor_iterator.go index 0ba62273e8f..b6b6b3787d4 100644 --- a/tsdb/engine/tsm1/array_cursor_iterator.go +++ b/tsdb/engine/tsm1/array_cursor_iterator.go @@ -62,15 +62,15 @@ func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) ( // Return appropriate cursor based on type. switch f.Type { case influxql.Float: - return q.buildFloatArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildFloatArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.Integer: - return q.buildIntegerArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildIntegerArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.Unsigned: - return q.buildUnsignedArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildUnsignedArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.String: - return q.buildStringArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildStringArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.Boolean: - return q.buildBooleanArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildBooleanArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) default: panic(fmt.Sprintf("unreachable: %T", f.Type)) } diff --git a/tsdb/engine/tsm1/array_cursor_test.go b/tsdb/engine/tsm1/array_cursor_test.go index 11566dfde8e..43c45e6de11 100644 --- a/tsdb/engine/tsm1/array_cursor_test.go +++ b/tsdb/engine/tsm1/array_cursor_test.go @@ -69,6 +69,35 @@ func newFiles(dir string, values ...keyValues) ([]string, error) { return files, nil } +func TestCursor_ResetFail(t *testing.T) { + t.Run("bad block", func(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + fs := NewFileStore(dir) + + const START, END = 10, 1 + + data := []keyValues{ + // Write a single data point with timestamp equal to END + {"m,_field=v#!~#v", []Value{NewIntegerValue(1, 1)}}, + } + + files, err := newFiles(dir, data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + _ = fs.Replace(nil, files) + + kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) + defer kc.Close() + // Open a float cursor for an integer block + cur := newFloatArrayDescendingCursor() + err = cur.reset(START, END, nil, kc) + assert.ErrorContains(t, err, "invalid block", "expected invalid block") + }) +} + func TestDescendingCursor_SinglePointStartTime(t *testing.T) { t.Run("cache", func(t *testing.T) { dir := MustTempDir() @@ -80,7 +109,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) { defer kc.Close() cur := newIntegerArrayDescendingCursor() // Include a cached value with timestamp equal to END - cur.reset(START, END, Values{NewIntegerValue(1, 1)}, kc) + assert.NoError(t, cur.reset(START, END, Values{NewIntegerValue(1, 1)}, kc), "unexpected error resetting cursor") var got []int64 ar := cur.Next() @@ -115,7 +144,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newIntegerArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") var got []int64 ar := cur.Next() @@ -163,7 +192,7 @@ func TestFileStore_DuplicatePoints(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true) defer kc.Close() cur := newFloatArrayAscendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") var got []int64 ar := cur.Next() @@ -182,7 +211,7 @@ func TestFileStore_DuplicatePoints(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newFloatArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") var got []int64 ar := cur.Next() @@ -257,7 +286,7 @@ func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing. kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true) defer kc.Close() cur := newFloatArrayAscendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeTs(1000, 800, 10) exp = append(exp, makeTs(1005, 400, 10)...) @@ -280,7 +309,7 @@ func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing. kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newFloatArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeTs(1000, 800, 10) exp = append(exp, makeTs(1005, 400, 10)...) @@ -360,7 +389,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true) defer kc.Close() cur := newFloatArrayAscendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 3500, 10, 1.01) a2 := makeArray(4005, 3500, 5, 2.01) @@ -387,7 +416,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newFloatArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 3500, 10, 1.01) a2 := makeArray(4005, 3500, 5, 2.01) @@ -454,7 +483,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true) defer kc.Close() cur := newFloatArrayAscendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 100, 1, 1.01) @@ -479,7 +508,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true) defer kc.Close() cur := newFloatArrayAscendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1050, 50, 1, 1.01) a2 := makeArray(1100, 50, 1, 2.01) @@ -506,7 +535,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newFloatArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 100, 1, 1.01) sort.Sort(sort.Reverse(&FloatArray{exp})) @@ -532,7 +561,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) defer kc.Close() cur := newFloatArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1050, 50, 1, 1.01) a2 := makeArray(1100, 50, 1, 2.01)