Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(influx_tools): Add export to parquet files #25297

Open
wants to merge 14 commits into
base: master-1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 30 additions & 100 deletions cmd/influx_tools/parquet/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"fmt"
"sort"

"go.uber.org/zap"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
"go.uber.org/zap"
)

type row struct {
Expand Down Expand Up @@ -67,10 +68,11 @@ func (b *batcher) next(ctx context.Context) ([]row, error) {
return nil, fmt.Errorf("getting cursor iterator for %q failed: %w", string(b.measurement), err)
}

data := make(map[string]map[int64]row)
data := make(map[string]map[int64]row, len(b.series))
end := models.MaxNanoTime
var rowCount int
for _, s := range b.series {
data[s.key] = make(map[int64]row)
data[s.key] = make(map[int64]row, tsdb.DefaultMaxPointsPerBlock)
tags := make(map[string]string, len(s.tags))
for _, t := range s.tags {
tags[string(t.Key)] = string(t.Value)
Expand Down Expand Up @@ -103,112 +105,40 @@ func (b *batcher) next(ctx context.Context) ([]row, error) {
converter = c
}
fieldEnd := models.MaxNanoTime
switch c := cursor.(type) {
case tsdb.IntegerArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
case tsdb.FloatArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}
c, err := newValueCursor(cursor)
if err != nil {
return nil, fmt.Errorf("creating value cursor failed: %w", err)
}

data[s.key][t].fields[fname] = v
fieldEnd = t
for {
// Check if we do still have data
timestamp, ok := c.peek()
if !ok {
break
}
case tsdb.UnsignedArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
timestamp, value := c.next()
v, err := converter(value)
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", value, field, err)
continue
}
case tsdb.BooleanArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
if _, found := data[s.key][timestamp]; !found {
data[s.key][timestamp] = row{
timestamp: timestamp,
tags: tags,
fields: make(map[string]interface{}),
}

data[s.key][t].fields[fname] = v
fieldEnd = t
rowCount++
}
case tsdb.StringArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
default:
cursor.Close()
return nil, fmt.Errorf("unexpected type %T", cursor)
data[s.key][timestamp].fields[fname] = v
fieldEnd = timestamp
}
cursor.Close()

c.close()
end = min(end, fieldEnd)
}
}
Expand All @@ -217,7 +147,7 @@ func (b *batcher) next(ctx context.Context) ([]row, error) {
}

// Extract the rows ordered by timestamp
var rows []row
rows := make([]row, 0, rowCount)
for _, tmap := range data {
for _, r := range tmap {
rows = append(rows, r)
Expand Down
2 changes: 1 addition & 1 deletion cmd/influx_tools/parquet/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (cmd *Command) Run(args []string) (err error) {
if err := exp.open(ctx); err != nil {
return fmt.Errorf("opening exporter failed: %w", err)
}
defer internal_errors.Capture(&err, exp.close)
defer internal_errors.Capture(&err, exp.close)()

exp.printPlan(cmd.Stderr)

Expand Down
213 changes: 213 additions & 0 deletions cmd/influx_tools/parquet/cursors.go
srebhan marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package parquet

import (
"fmt"

"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
)

type valueCursor interface {
next() (int64, interface{})
peek() (int64, bool)
close()
}

func newValueCursor(cursor cursors.Cursor) (valueCursor, error) {
switch c := cursor.(type) {
case tsdb.FloatArrayCursor:
return &floatValueCursor{cur: c}, nil
case tsdb.UnsignedArrayCursor:
return &uintValueCursor{cur: c}, nil
case tsdb.IntegerArrayCursor:
return &intValueCursor{cur: c}, nil
case tsdb.BooleanArrayCursor:
return &boolValueCursor{cur: c}, nil
case tsdb.StringArrayCursor:
return &stringValueCursor{cur: c}, nil
}
return nil, fmt.Errorf("unexpected type %T", cursor)

}

type floatValueCursor struct {
cur tsdb.FloatArrayCursor
arr *cursors.FloatArray
idx int
}

func (c *floatValueCursor) next() (int64, interface{}) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, nil
}

defer func() { c.idx++ }()
return c.arr.Timestamps[c.idx], c.arr.Values[c.idx]
}

func (c *floatValueCursor) peek() (int64, bool) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, false
}
return c.arr.Timestamps[c.idx], true
}

func (c *floatValueCursor) close() {
c.cur.Close()
}

type uintValueCursor struct {
cur tsdb.UnsignedArrayCursor
arr *cursors.UnsignedArray
idx int
}

func (c *uintValueCursor) next() (int64, interface{}) {
// Initialize the array on first call
if c.arr == nil {
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, nil
}
defer func() { c.idx++ }()
return c.arr.Timestamps[c.idx], c.arr.Values[c.idx]
}

func (c *uintValueCursor) peek() (int64, bool) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, false
}
return c.arr.Timestamps[c.idx], true
}

func (c *uintValueCursor) close() {
c.cur.Close()
}

type intValueCursor struct {
cur tsdb.IntegerArrayCursor
arr *cursors.IntegerArray
idx int
}

func (c *intValueCursor) next() (int64, interface{}) {
// Initialize the array on first call
if c.arr == nil {
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, nil
}
defer func() { c.idx++ }()
return c.arr.Timestamps[c.idx], c.arr.Values[c.idx]
}

func (c *intValueCursor) peek() (int64, bool) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, false
}
return c.arr.Timestamps[c.idx], true
}

func (c *intValueCursor) close() {
c.cur.Close()
}

type boolValueCursor struct {
cur tsdb.BooleanArrayCursor
arr *cursors.BooleanArray
idx int
}

func (c *boolValueCursor) next() (int64, interface{}) {
// Initialize the array on first call
if c.arr == nil {
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, nil
}
defer func() { c.idx++ }()
return c.arr.Timestamps[c.idx], c.arr.Values[c.idx]
}

func (c *boolValueCursor) peek() (int64, bool) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, false
}
return c.arr.Timestamps[c.idx], true
}

func (c *boolValueCursor) close() {
c.cur.Close()
}

type stringValueCursor struct {
cur tsdb.StringArrayCursor
arr *cursors.StringArray
idx int
}

func (c *stringValueCursor) next() (int64, interface{}) {
// Initialize the array on first call
if c.arr == nil {
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, nil
}
defer func() { c.idx++ }()
return c.arr.Timestamps[c.idx], c.arr.Values[c.idx]
}

func (c *stringValueCursor) peek() (int64, bool) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, false
}
return c.arr.Timestamps[c.idx], true
}

func (c *stringValueCursor) close() {
c.cur.Close()
}
Loading