Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(influx_tools): Add export to parquet files #25297

Open
wants to merge 14 commits into
base: master-1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions cmd/influx_tools/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
geninit "github.com/influxdata/influxdb/cmd/influx_tools/generate/init"
"github.com/influxdata/influxdb/cmd/influx_tools/help"
"github.com/influxdata/influxdb/cmd/influx_tools/importer"
"github.com/influxdata/influxdb/cmd/influx_tools/parquet"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
"github.com/influxdata/influxdb/cmd/influxd/run"
"github.com/influxdata/influxdb/services/meta"
Expand Down Expand Up @@ -55,36 +56,41 @@ func (m *Main) Run(args ...string) error {
switch name {
case "", "help":
if err := help.NewCommand().Run(args...); err != nil {
return fmt.Errorf("help failed: %s", err)
return fmt.Errorf("help failed: %w", err)
}
case "compact-shard":
c := compact.NewCommand()
if err := c.Run(args); err != nil {
return fmt.Errorf("compact-shard failed: %s", err)
return fmt.Errorf("compact-shard failed: %w", err)
}
case "export":
c := export.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
return fmt.Errorf("export failed: %s", err)
return fmt.Errorf("export failed: %w", err)
}
case "export-parquet":
c := parquet.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
return fmt.Errorf("export failed: %w", err)
}
case "import":
c := importer.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
return fmt.Errorf("import failed: %s", err)
return fmt.Errorf("import failed: %w", err)
}
case "gen-init":
c := geninit.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
return fmt.Errorf("gen-init failed: %s", err)
return fmt.Errorf("gen-init failed: %w", err)
}
case "gen-exec":
deps := genexec.Dependencies{Server: &ossServer{logger: zap.NewNop()}}
c := genexec.NewCommand(deps)
if err := c.Run(args); err != nil {
return fmt.Errorf("gen-exec failed: %s", err)
return fmt.Errorf("gen-exec failed: %w", err)
}
default:
return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influx-tools help' for usage`+"\n\n", name)
return fmt.Errorf("unknown command %q\nRun 'influx-tools help' for usage", name)
}

return nil
Expand All @@ -106,7 +112,7 @@ func (s *ossServer) Open(path string) (err error) {

// Validate the configuration.
if err = s.config.Validate(); err != nil {
return fmt.Errorf("validate config: %s", err)
return fmt.Errorf("validate config: %w", err)
}

if s.noClient {
Expand Down
236 changes: 236 additions & 0 deletions cmd/influx_tools/parquet/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package parquet

import (
"context"
"fmt"
"sort"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
"go.uber.org/zap"
)

type row struct {
timestamp int64
tags map[string]string
fields map[string]interface{}
}

type batcher struct {
measurement []byte
shard *tsdb.Shard

typeResolutions map[string]influxql.DataType
converter map[string]func(interface{}) (interface{}, error)
nameResolutions map[string]string

series []seriesEntry
start int64

logger *zap.SugaredLogger
}

func (b *batcher) init() error {
// Setup the type converters for the conflicting fields
b.converter = make(map[string]func(interface{}) (interface{}, error), len(b.typeResolutions))
for field, ftype := range b.typeResolutions {
switch ftype {
case influxql.Float:
b.converter[field] = toFloat
case influxql.Unsigned:
b.converter[field] = toUint
case influxql.Integer:
b.converter[field] = toInt
case influxql.Boolean:
b.converter[field] = toBool
case influxql.String:
b.converter[field] = toString
default:
return fmt.Errorf("unknown converter %v for field %q", ftype, field)
}
}

b.start = models.MinNanoTime

return nil
}

func (b *batcher) reset() {
b.start = models.MinNanoTime
}

func (b *batcher) next(ctx context.Context) ([]row, error) {
// Iterate over the series and fields and accumulate the data row-wise
iter, err := b.shard.CreateCursorIterator(ctx)
if err != nil {
return nil, fmt.Errorf("getting cursor iterator for %q failed: %w", string(b.measurement), err)
}

data := make(map[string]map[int64]row)
end := models.MaxNanoTime
for _, s := range b.series {
data[s.key] = make(map[int64]row)
tags := make(map[string]string, len(s.tags))
for _, t := range s.tags {
tags[string(t.Key)] = string(t.Value)
}
for field := range s.fields {
cursor, err := iter.Next(ctx,
&tsdb.CursorRequest{
Name: b.measurement,
Tags: s.tags,
Field: field,
Ascending: true,
StartTime: b.start,
EndTime: models.MaxNanoTime,
},
)
if err != nil {
return nil, fmt.Errorf("getting cursor for %s-%s failed: %w", s.key, field, err)
}
if cursor == nil {
continue
}

// Prepare mappings
fname := field
if n, found := b.nameResolutions[field]; found {
fname = n
}
converter := identity
if c, found := b.converter[field]; found {
converter = c
}
fieldEnd := models.MaxNanoTime
switch c := cursor.(type) {
case tsdb.IntegerArrayCursor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the cases in this type switch call a single generic function to reduce duplicate code? I'm not sure, but that would simplify maintenance and readability.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried my very best, but due to the fact that each implementation returns a different type and function signature for Next() I could not find a way to make this generic. Do you have some idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured out how to have implement a generic Next function, but the access to the Values and Timestamps fields are the problem. Even with type constraints that all have two fields named Values and Timestamps the compiler inference fails.
https://goplay.tools/snippet/_BsjhhcYGGB

values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
case tsdb.FloatArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
case tsdb.UnsignedArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
case tsdb.BooleanArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
case tsdb.StringArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
default:
cursor.Close()
return nil, fmt.Errorf("unexpected type %T", cursor)
}
cursor.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does Close return an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it does not.

end = min(end, fieldEnd)
}
}
if len(data) == 0 {
return nil, nil
}

// Extract the rows ordered by timestamp
var rows []row
srebhan marked this conversation as resolved.
Show resolved Hide resolved
for _, tmap := range data {
for _, r := range tmap {
rows = append(rows, r)
}
}
sort.Slice(rows, func(i, j int) bool { return rows[i].timestamp < rows[j].timestamp })

// Only include rows that are before the end-timestamp to avoid duplicate
// or incomplete entries due to not iterating through all data
n := sort.Search(len(rows), func(i int) bool { return rows[i].timestamp > end })

// Remember the earliest datum to use this for the next batch excluding the entry itself
b.start = end + 1

return rows[:n], nil
}
Loading