From fe0b2f55fb4ef581f6cfc87bd7e1ba6a8e9ecb4a Mon Sep 17 00:00:00 2001 From: Achille Date: Tue, 13 Dec 2022 20:23:26 -0800 Subject: [PATCH] add parquet.RowBuilder (#457) * add parquet.RowBuilder * fixes for parquet row generation * add parquet.(*RowBuilder).Next * move all merge code to merge.go (#458) * move all merge code to merge.go * add example * add benchmark for parquet.(*RowBuilder).Add * initialize FIXED_LEN_BYTE_ARRAY columns to non-null zero-value * make it OK to call Next if the repetition level is zero --- merge.go | 67 +++++++++ merge_test.go | 336 ++++++++++++++++++++++++++++++++++++++++++++ row_builder.go | 202 ++++++++++++++++++++++++++ row_builder_test.go | 286 +++++++++++++++++++++++++++++++++++++ row_group.go | 65 --------- row_group_test.go | 336 -------------------------------------------- value.go | 3 + 7 files changed, 894 insertions(+), 401 deletions(-) create mode 100644 row_builder.go create mode 100644 row_builder_test.go diff --git a/merge.go b/merge.go index 114fc1c8..451ae1d3 100644 --- a/merge.go +++ b/merge.go @@ -6,6 +6,71 @@ import ( "io" ) +// MergeRowGroups constructs a row group which is a merged view of rowGroups. If +// rowGroups are sorted and the passed options include sorting, the merged row +// group will also be sorted. +// +// The function validates the input to ensure that the merge operation is +// possible, ensuring that the schemas match or can be converted to an +// optionally configured target schema passed as argument in the option list. +// +// The sorting columns of each row group are also consulted to determine whether +// the output can be represented. If sorting columns are configured on the merge +// they must be a prefix of sorting columns of all row groups being merged. +func MergeRowGroups(rowGroups []RowGroup, options ...RowGroupOption) (RowGroup, error) { + config, err := NewRowGroupConfig(options...) + if err != nil { + return nil, err + } + + schema := config.Schema + if len(rowGroups) == 0 { + return newEmptyRowGroup(schema), nil + } + if schema == nil { + schema = rowGroups[0].Schema() + + for _, rowGroup := range rowGroups[1:] { + if !nodesAreEqual(schema, rowGroup.Schema()) { + return nil, ErrRowGroupSchemaMismatch + } + } + } + + mergedRowGroups := make([]RowGroup, len(rowGroups)) + copy(mergedRowGroups, rowGroups) + + for i, rowGroup := range mergedRowGroups { + if rowGroupSchema := rowGroup.Schema(); !nodesAreEqual(schema, rowGroupSchema) { + conv, err := Convert(schema, rowGroupSchema) + if err != nil { + return nil, fmt.Errorf("cannot merge row groups: %w", err) + } + mergedRowGroups[i] = ConvertRowGroup(rowGroup, conv) + } + } + + m := &mergedRowGroup{sorting: config.Sorting.SortingColumns} + m.init(schema, mergedRowGroups) + + if len(m.sorting) == 0 { + // When the row group has no ordering, use a simpler version of the + // merger which simply concatenates rows from each of the row groups. + // This is preferable because it makes the output deterministic, the + // heap merge may otherwise reorder rows across groups. + return &m.multiRowGroup, nil + } + + for _, rowGroup := range m.rowGroups { + if !sortingColumnsHavePrefix(rowGroup.SortingColumns(), m.sorting) { + return nil, ErrRowGroupSortingColumnsMismatch + } + } + + m.compare = compareRowsFuncOf(schema, m.sorting) + return m, nil +} + type mergedRowGroup struct { multiRowGroup sorting []SortingColumn @@ -83,6 +148,8 @@ func (r *mergedRowGroupRows) Schema() *Schema { return r.schema } +// MergeRowReader constructs a RowReader which creates an ordered sequence of +// all the readers using the given compare function as the ordering predicate. func MergeRowReaders(readers []RowReader, compare func(Row, Row) int) RowReader { return &mergedRowReader{ compare: compare, diff --git a/merge_test.go b/merge_test.go index ab8cbe5a..16c21ea3 100644 --- a/merge_test.go +++ b/merge_test.go @@ -36,6 +36,342 @@ func (r *wrappedRows) Close() error { return r.Rows.Close() } +func TestMergeRowGroups(t *testing.T) { + tests := []struct { + scenario string + options []parquet.RowGroupOption + input []parquet.RowGroup + output parquet.RowGroup + }{ + { + scenario: "no row groups", + options: []parquet.RowGroupOption{ + parquet.SchemaOf(Person{}), + }, + output: sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SchemaOf(Person{}), + }, + ), + }, + + { + scenario: "a single row group", + input: []parquet.RowGroup{ + sortedRowGroup(nil, + Person{FirstName: "some", LastName: "one", Age: 30}, + Person{FirstName: "some", LastName: "one else", Age: 31}, + Person{FirstName: "and", LastName: "you", Age: 32}, + ), + }, + output: sortedRowGroup(nil, + Person{FirstName: "some", LastName: "one", Age: 30}, + Person{FirstName: "some", LastName: "one else", Age: 31}, + Person{FirstName: "and", LastName: "you", Age: 32}, + ), + }, + + { + scenario: "two row groups without ordering", + input: []parquet.RowGroup{ + sortedRowGroup(nil, Person{FirstName: "some", LastName: "one", Age: 30}), + sortedRowGroup(nil, Person{FirstName: "some", LastName: "one else", Age: 31}), + }, + output: sortedRowGroup(nil, + Person{FirstName: "some", LastName: "one", Age: 30}, + Person{FirstName: "some", LastName: "one else", Age: 31}, + ), + }, + + { + scenario: "three row groups without ordering", + input: []parquet.RowGroup{ + sortedRowGroup(nil, Person{FirstName: "some", LastName: "one", Age: 30}), + sortedRowGroup(nil, Person{FirstName: "some", LastName: "one else", Age: 31}), + sortedRowGroup(nil, Person{FirstName: "question", LastName: "answer", Age: 42}), + }, + output: sortedRowGroup(nil, + Person{FirstName: "some", LastName: "one", Age: 30}, + Person{FirstName: "some", LastName: "one else", Age: 31}, + Person{FirstName: "question", LastName: "answer", Age: 42}, + ), + }, + + { + scenario: "row groups sorted by ascending last name", + options: []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + input: []parquet.RowGroup{ + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + Person{FirstName: "Han", LastName: "Solo"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + ), + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + ), + }, + output: sortedRowGroup(nil, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + Person{FirstName: "Han", LastName: "Solo"}, + ), + }, + + { + scenario: "row groups sorted by descending last name", + options: []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Descending("LastName"), + ), + ), + }, + input: []parquet.RowGroup{ + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Descending("LastName"), + ), + ), + }, + Person{FirstName: "Han", LastName: "Solo"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + ), + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Descending("LastName"), + ), + ), + }, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + ), + }, + output: sortedRowGroup(nil, + Person{FirstName: "Han", LastName: "Solo"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + ), + }, + + { + scenario: "row groups sorted by ascending last and first name", + options: []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + parquet.Ascending("FirstName"), + ), + ), + }, + input: []parquet.RowGroup{ + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + parquet.Ascending("FirstName"), + ), + ), + }, + Person{FirstName: "Luke", LastName: "Skywalker"}, + Person{FirstName: "Han", LastName: "Solo"}, + ), + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + parquet.Ascending("FirstName"), + ), + ), + }, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + Person{FirstName: "Anakin", LastName: "Skywalker"}, + ), + }, + output: sortedRowGroup(nil, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + Person{FirstName: "Anakin", LastName: "Skywalker"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + Person{FirstName: "Han", LastName: "Solo"}, + ), + }, + + { + scenario: "row groups with conversion to a different schema", + options: []parquet.RowGroupOption{ + parquet.SchemaOf(LastNameOnly{}), + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + input: []parquet.RowGroup{ + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + Person{FirstName: "Han", LastName: "Solo"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + ), + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + Person{FirstName: "Anakin", LastName: "Skywalker"}, + ), + }, + output: sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + LastNameOnly{LastName: "Solo"}, + LastNameOnly{LastName: "Skywalker"}, + LastNameOnly{LastName: "Skywalker"}, + LastNameOnly{LastName: "Kenobi"}, + ), + }, + } + + for _, adapter := range []struct { + scenario string + function func(parquet.RowGroup) parquet.RowGroup + }{ + {scenario: "buffer", function: selfRowGroup}, + {scenario: "file", function: fileRowGroup}, + } { + t.Run(adapter.scenario, func(t *testing.T) { + for _, test := range tests { + t.Run(test.scenario, func(t *testing.T) { + input := make([]parquet.RowGroup, len(test.input)) + for i := range test.input { + input[i] = adapter.function(test.input[i]) + } + + merged, err := parquet.MergeRowGroups(test.input, test.options...) + if err != nil { + t.Fatal(err) + } + if merged.NumRows() != test.output.NumRows() { + t.Fatalf("the number of rows mismatch: want=%d got=%d", merged.NumRows(), test.output.NumRows()) + } + if merged.Schema() != test.output.Schema() { + t.Fatalf("the row group schemas mismatch:\n%v\n%v", test.output.Schema(), merged.Schema()) + } + + options := []parquet.RowGroupOption{parquet.SchemaOf(Person{})} + options = append(options, test.options...) + // We test two views of the resulting row group: the one originally + // returned by MergeRowGroups, and one where the merged row group + // has been copied into a new buffer. The intent is to exercise both + // the row-by-row read as well as optimized code paths when CopyRows + // bypasses the ReadRow/WriteRow calls and the row group is written + // directly to the buffer by calling WriteRowsTo/WriteRowGroup. + mergedCopy := parquet.NewBuffer(options...) + + totalRows := test.output.NumRows() + numRows, err := copyRowsAndClose(mergedCopy, merged.Rows()) + if err != nil { + t.Fatal(err) + } + if numRows != totalRows { + t.Fatalf("wrong number of rows copied: want=%d got=%d", totalRows, numRows) + } + + for _, merge := range []struct { + scenario string + rowGroup parquet.RowGroup + }{ + {scenario: "self", rowGroup: merged}, + {scenario: "copy", rowGroup: mergedCopy}, + } { + t.Run(merge.scenario, func(t *testing.T) { + var expectedRows = test.output.Rows() + var mergedRows = merge.rowGroup.Rows() + var row1 = make([]parquet.Row, 1) + var row2 = make([]parquet.Row, 1) + var numRows int64 + + defer expectedRows.Close() + defer mergedRows.Close() + + for { + _, err1 := expectedRows.ReadRows(row1) + n, err2 := mergedRows.ReadRows(row2) + + if err1 != err2 { + // ReadRows may or may not return io.EOF + // when it reads the last row, so we test + // that the reference RowReader has also + // reached the end. + if err1 == nil && err2 == io.EOF { + _, err1 = expectedRows.ReadRows(row1[:0]) + } + if err1 != io.EOF { + t.Fatalf("errors mismatched while comparing row %d/%d: want=%v got=%v", numRows, totalRows, err1, err2) + } + } + + if n != 0 { + if !row1[0].Equal(row2[0]) { + t.Errorf("row at index %d/%d mismatch: want=%+v got=%+v", numRows, totalRows, row1[0], row2[0]) + } + numRows++ + } + + if err1 != nil { + break + } + } + + if numRows != totalRows { + t.Errorf("expected to read %d rows but %d were found", totalRows, numRows) + } + }) + } + + }) + } + }) + } +} + func TestMergeRowGroupsCursorsAreClosed(t *testing.T) { type model struct { A int diff --git a/row_builder.go b/row_builder.go new file mode 100644 index 00000000..e7eb96a3 --- /dev/null +++ b/row_builder.go @@ -0,0 +1,202 @@ +package parquet + +// RowBuilder is a type which helps build parquet rows incrementally by adding +// values to columns. +type RowBuilder struct { + columns [][]Value + models []Value + levels []columnLevel + groups []*columnGroup +} + +type columnLevel struct { + repetitionDepth byte + repetitionLevel byte + definitionLevel byte +} + +type columnGroup struct { + baseColumn []Value + members []int16 + startIndex int16 + endIndex int16 + repetitionLevel byte + definitionLevel byte +} + +// NewRowBuilder constructs a RowBuilder which builds rows for the parquet +// schema passed as argument. +func NewRowBuilder(schema Node) *RowBuilder { + if schema.Leaf() { + panic("schema of row builder must be a group") + } + n := numLeafColumnsOf(schema) + b := &RowBuilder{ + columns: make([][]Value, n), + models: make([]Value, n), + levels: make([]columnLevel, n), + } + buffers := make([]Value, len(b.columns)) + for i := range b.columns { + b.columns[i] = buffers[i : i : i+1] + } + topGroup := &columnGroup{baseColumn: []Value{{}}} + endIndex := b.configure(schema, 0, columnLevel{}, topGroup) + topGroup.endIndex = endIndex + b.groups = append(b.groups, topGroup) + return b +} + +func (b *RowBuilder) configure(node Node, columnIndex int16, level columnLevel, group *columnGroup) (endIndex int16) { + switch { + case node.Optional(): + level.definitionLevel++ + endIndex = b.configure(Required(node), columnIndex, level, group) + + for i := columnIndex; i < endIndex; i++ { + b.models[i].kind = 0 // null if not set + b.models[i].ptr = nil + b.models[i].u64 = 0 + } + + case node.Repeated(): + level.definitionLevel++ + + group = &columnGroup{ + startIndex: columnIndex, + repetitionLevel: level.repetitionDepth, + definitionLevel: level.definitionLevel, + } + + level.repetitionDepth++ + endIndex = b.configure(Required(node), columnIndex, level, group) + + for i := columnIndex; i < endIndex; i++ { + b.models[i].kind = 0 // null if not set + b.models[i].ptr = nil + b.models[i].u64 = 0 + } + + group.endIndex = endIndex + b.groups = append(b.groups, group) + + case node.Leaf(): + typ := node.Type() + kind := typ.Kind() + model := makeValueKind(kind) + model.repetitionLevel = level.repetitionLevel + model.definitionLevel = level.definitionLevel + // FIXED_LEN_BYTE_ARRAY is the only type which needs to be given a + // non-nil zero-value if the field is required. + if kind == FixedLenByteArray { + zero := make([]byte, typ.Length()) + model.ptr = &zero[0] + model.u64 = uint64(len(zero)) + } + group.members = append(group.members, columnIndex) + b.models[columnIndex] = model + b.levels[columnIndex] = level + endIndex = columnIndex + 1 + + default: + endIndex = columnIndex + + for _, field := range node.Fields() { + endIndex = b.configure(field, endIndex, level, group) + } + } + return endIndex +} + +// Add adds columnValue to the column at columnIndex. +func (b *RowBuilder) Add(columnIndex int, columnValue Value) { + level := &b.levels[columnIndex] + columnValue.repetitionLevel = level.repetitionLevel + columnValue.definitionLevel = level.definitionLevel + columnValue.columnIndex = ^int16(columnIndex) + level.repetitionLevel = level.repetitionDepth + b.columns[columnIndex] = append(b.columns[columnIndex], columnValue) +} + +// Next must be called to indicate the start of a new repeated record for the +// column at the given index. +// +// If the column index is part of a repeated group, the builder automatically +// starts a new record for all adjacent columns, the application does not need +// to call this method for each column of the repeated group. +// +// Next must be called after adding a sequence of records. +func (b *RowBuilder) Next(columnIndex int) { + for _, group := range b.groups { + if group.startIndex <= int16(columnIndex) && int16(columnIndex) < group.endIndex { + for i := group.startIndex; i < group.endIndex; i++ { + if level := &b.levels[i]; level.repetitionLevel != 0 { + level.repetitionLevel = group.repetitionLevel + } + } + break + } + } +} + +// Reset clears the internal state of b, making it possible to reuse while +// retaining the internal buffers. +func (b *RowBuilder) Reset() { + for i, column := range b.columns { + clearValues(column) + b.columns[i] = column[:0] + } + for i := range b.levels { + b.levels[i].repetitionLevel = 0 + } +} + +// Row materializes the current state of b into a parquet row. +func (b *RowBuilder) Row() Row { + numValues := 0 + for _, column := range b.columns { + numValues += len(column) + } + return b.AppendRow(make(Row, 0, numValues)) +} + +// AppendRow appends the current state of b to row and returns it. +func (b *RowBuilder) AppendRow(row Row) Row { + for _, group := range b.groups { + maxColumn := group.baseColumn + + for _, columnIndex := range group.members { + if column := b.columns[columnIndex]; len(column) > len(maxColumn) { + maxColumn = column + } + } + + if len(maxColumn) != 0 { + columns := b.columns[group.startIndex:group.endIndex] + + for i, column := range columns { + if len(column) < len(maxColumn) { + n := len(column) + column = append(column, maxColumn[n:]...) + + columnIndex := group.startIndex + int16(i) + model := b.models[columnIndex] + + for n < len(column) { + v := &column[n] + v.kind = model.kind + v.ptr = model.ptr + v.u64 = model.u64 + v.definitionLevel = group.definitionLevel + v.columnIndex = ^columnIndex + n++ + } + + columns[i] = column + } + } + } + } + + return appendRow(row, b.columns) +} diff --git a/row_builder_test.go b/row_builder_test.go new file mode 100644 index 00000000..526c5f94 --- /dev/null +++ b/row_builder_test.go @@ -0,0 +1,286 @@ +package parquet_test + +import ( + "fmt" + "testing" + + "github.com/segmentio/parquet-go" +) + +func ExampleRowBuilder() { + builder := parquet.NewRowBuilder(parquet.Group{ + "birth_date": parquet.Optional(parquet.Date()), + "first_name": parquet.String(), + "last_name": parquet.String(), + }) + + builder.Add(1, parquet.ByteArrayValue([]byte("Luke"))) + builder.Add(2, parquet.ByteArrayValue([]byte("Skywalker"))) + + row := builder.Row() + row.Range(func(columnIndex int, columnValues []parquet.Value) bool { + fmt.Printf("%+v\n", columnValues[0]) + return true + }) + + // Output: + // C:0 D:0 R:0 V: + // C:1 D:0 R:0 V:Luke + // C:2 D:0 R:0 V:Skywalker +} + +func TestRowBuilder(t *testing.T) { + type ( + operation = func(*parquet.RowBuilder) + operations = []operation + ) + + add := func(columnIndex int, columnValue parquet.Value) operation { + return func(b *parquet.RowBuilder) { b.Add(columnIndex, columnValue) } + } + + next := func(columnIndex int) operation { + return func(b *parquet.RowBuilder) { b.Next(columnIndex) } + } + + tests := []struct { + scenario string + operations operations + want parquet.Row + schema parquet.Node + }{ + { + scenario: "add missing required column value", + want: parquet.Row{ + parquet.Int64Value(0).Level(0, 0, 0), + }, + schema: parquet.Group{ + "id": parquet.Int(64), + }, + }, + + { + scenario: "set required column value", + operations: operations{ + add(0, parquet.Int64Value(1)), + }, + want: parquet.Row{ + parquet.Int64Value(1).Level(0, 0, 0), + }, + schema: parquet.Group{ + "id": parquet.Int(64), + }, + }, + + { + scenario: "set repeated column values", + operations: operations{ + add(0, parquet.Int64Value(1)), + add(1, parquet.ByteArrayValue([]byte(`1`))), + add(1, parquet.ByteArrayValue([]byte(`2`))), + add(1, parquet.ByteArrayValue([]byte(`3`))), + }, + want: parquet.Row{ + parquet.Int64Value(1).Level(0, 0, 0), + parquet.ByteArrayValue([]byte(`1`)).Level(0, 1, 1), + parquet.ByteArrayValue([]byte(`2`)).Level(1, 1, 1), + parquet.ByteArrayValue([]byte(`3`)).Level(1, 1, 1), + }, + schema: parquet.Group{ + "id": parquet.Int(64), + "names": parquet.Repeated(parquet.String()), + }, + }, + + { + scenario: "add missing repeated column value", + operations: operations{ + add(0, parquet.Int64Value(1)), + }, + want: parquet.Row{ + parquet.Int64Value(1).Level(0, 0, 0), + parquet.NullValue().Level(0, 0, 1), + }, + schema: parquet.Group{ + "id": parquet.Int(64), + "names": parquet.Repeated(parquet.String()), + }, + }, + + { + scenario: "add missing optional column value", + operations: operations{ + add(0, parquet.Int64Value(1)), + }, + want: parquet.Row{ + parquet.Int64Value(1).Level(0, 0, 0), + parquet.NullValue().Level(0, 0, 1), + }, + schema: parquet.Group{ + "id": parquet.Int(64), + "name": parquet.Optional(parquet.String()), + }, + }, + + { + scenario: "add missing nested column values", + operations: operations{ + add(0, parquet.Int64Value(1)), + }, + want: parquet.Row{ + parquet.Int64Value(1).Level(0, 0, 0), + parquet.NullValue().Level(0, 0, 1), + parquet.ByteArrayValue(nil).Level(0, 0, 2), + parquet.ByteArrayValue(nil).Level(0, 0, 3), + }, + schema: parquet.Group{ + "id": parquet.Int(64), + "profile": parquet.Group{ + "first_name": parquet.String(), + "last_name": parquet.String(), + "birth_date": parquet.Optional(parquet.Date()), + }, + }, + }, + + { + scenario: "add missing repeated column group", + operations: operations{ + add(0, parquet.Int64Value(1)), + add(2, parquet.ByteArrayValue([]byte(`me`))), + add(1, parquet.Int32Value(0)), + add(1, parquet.Int32Value(123456)), + add(2, parquet.ByteArrayValue([]byte(`you`))), + }, + want: parquet.Row{ + parquet.Int64Value(1).Level(0, 0, 0), + + parquet.Int32Value(0).Level(0, 2, 1), + parquet.Int32Value(123456).Level(1, 2, 1), + + parquet.ByteArrayValue([]byte(`me`)).Level(0, 1, 2), + parquet.ByteArrayValue([]byte(`you`)).Level(1, 1, 2), + + parquet.NullValue().Level(0, 1, 3), + parquet.NullValue().Level(1, 1, 3), + }, + schema: parquet.Group{ + "id": parquet.Int(64), + "profiles": parquet.Repeated(parquet.Group{ + "first_name": parquet.String(), + "last_name": parquet.String(), + "birth_date": parquet.Optional(parquet.Date()), + }), + }, + }, + + { + scenario: "empty map", + want: parquet.Row{ + parquet.Value{}.Level(0, 0, 0), + parquet.Value{}.Level(0, 0, 1), + }, + schema: parquet.Group{ + "map": parquet.Repeated(parquet.Group{ + "key_value": parquet.Group{ + "key": parquet.String(), + "value": parquet.Optional(parquet.String()), + }, + }), + }, + }, + + { + scenario: "one nested maps", + operations: operations{ + add(0, parquet.ByteArrayValue([]byte(`A`))), + add(1, parquet.ByteArrayValue([]byte(`1`))), + add(0, parquet.ByteArrayValue([]byte(`B`))), + add(1, parquet.ByteArrayValue([]byte(`2`))), + }, + want: parquet.Row{ + // objects.attributes.key_value.key + parquet.ByteArrayValue([]byte(`A`)).Level(0, 2, 0), + parquet.ByteArrayValue([]byte(`B`)).Level(2, 2, 0), + // objects.attributes.key_value.value + parquet.ByteArrayValue([]byte(`1`)).Level(0, 3, 1), + parquet.ByteArrayValue([]byte(`2`)).Level(2, 3, 1), + }, + schema: parquet.Group{ + "objects": parquet.Repeated(parquet.Group{ + "attributes": parquet.Repeated(parquet.Group{ + "key_value": parquet.Group{ + "key": parquet.String(), + "value": parquet.Optional(parquet.String()), + }, + }), + }), + }, + }, + + { + scenario: "multiple nested maps", + operations: operations{ + add(0, parquet.ByteArrayValue([]byte(`A`))), + add(1, parquet.ByteArrayValue([]byte(`1`))), + add(0, parquet.ByteArrayValue([]byte(`B`))), + add(1, parquet.ByteArrayValue([]byte(`2`))), + next(1), // same as next(0) because the columns are in the same group + add(0, parquet.ByteArrayValue([]byte(`C`))), + add(1, parquet.ByteArrayValue([]byte(`3`))), + }, + want: parquet.Row{ + // objects.attributes.key_value.key + parquet.ByteArrayValue([]byte(`A`)).Level(0, 2, 0), + parquet.ByteArrayValue([]byte(`B`)).Level(2, 2, 0), + parquet.ByteArrayValue([]byte(`C`)).Level(1, 2, 0), + // objects.attributes.key_value.value + parquet.ByteArrayValue([]byte(`1`)).Level(0, 3, 1), + parquet.ByteArrayValue([]byte(`2`)).Level(2, 3, 1), + parquet.ByteArrayValue([]byte(`3`)).Level(1, 3, 1), + }, + schema: parquet.Group{ + "objects": parquet.Repeated(parquet.Group{ + "attributes": parquet.Repeated(parquet.Group{ + "key_value": parquet.Group{ + "key": parquet.String(), + "value": parquet.Optional(parquet.String()), + }, + }), + }), + }, + }, + } + + for _, test := range tests { + t.Run(test.scenario, func(t *testing.T) { + b := parquet.NewRowBuilder(test.schema) + + for i := 0; i < 2; i++ { + for _, op := range test.operations { + op(b) + } + + if got := b.Row(); !got.Equal(test.want) { + t.Fatalf("test %d: rows are not equal\nwant = %+v\ngot = %+v", i+1, test.want, got) + } + + b.Reset() + } + }) + } +} + +func BenchmarkRowBuilderAdd(b *testing.B) { + builder := parquet.NewRowBuilder(parquet.Group{ + "ids": parquet.Repeated(parquet.Int(64)), + }) + + for i := 0; i < b.N; i++ { + builder.Add(0, parquet.Int64Value(int64(i))) + + if (i % 128) == 0 { + builder.Reset() // so don't run out of memory ;) + } + } +} diff --git a/row_group.go b/row_group.go index c95afb69..3cda08cb 100644 --- a/row_group.go +++ b/row_group.go @@ -152,71 +152,6 @@ func sortingColumnsAreEqual(s1, s2 SortingColumn) bool { return path1.equal(path2) && s1.Descending() == s2.Descending() && s1.NullsFirst() == s2.NullsFirst() } -// MergeRowGroups constructs a row group which is a merged view of rowGroups. If -// rowGroups are sorted and the passed options include sorting, the merged row -// group will also be sorted. -// -// The function validates the input to ensure that the merge operation is -// possible, ensuring that the schemas match or can be converted to an -// optionally configured target schema passed as argument in the option list. -// -// The sorting columns of each row group are also consulted to determine whether -// the output can be represented. If sorting columns are configured on the merge -// they must be a prefix of sorting columns of all row groups being merged. -func MergeRowGroups(rowGroups []RowGroup, options ...RowGroupOption) (RowGroup, error) { - config, err := NewRowGroupConfig(options...) - if err != nil { - return nil, err - } - - schema := config.Schema - if len(rowGroups) == 0 { - return newEmptyRowGroup(schema), nil - } - if schema == nil { - schema = rowGroups[0].Schema() - - for _, rowGroup := range rowGroups[1:] { - if !nodesAreEqual(schema, rowGroup.Schema()) { - return nil, ErrRowGroupSchemaMismatch - } - } - } - - mergedRowGroups := make([]RowGroup, len(rowGroups)) - copy(mergedRowGroups, rowGroups) - - for i, rowGroup := range mergedRowGroups { - if rowGroupSchema := rowGroup.Schema(); !nodesAreEqual(schema, rowGroupSchema) { - conv, err := Convert(schema, rowGroupSchema) - if err != nil { - return nil, fmt.Errorf("cannot merge row groups: %w", err) - } - mergedRowGroups[i] = ConvertRowGroup(rowGroup, conv) - } - } - - m := &mergedRowGroup{sorting: config.Sorting.SortingColumns} - m.init(schema, mergedRowGroups) - - if len(m.sorting) == 0 { - // When the row group has no ordering, use a simpler version of the - // merger which simply concatenates rows from each of the row groups. - // This is preferable because it makes the output deterministic, the - // heap merge may otherwise reorder rows across groups. - return &m.multiRowGroup, nil - } - - for _, rowGroup := range m.rowGroups { - if !sortingColumnsHavePrefix(rowGroup.SortingColumns(), m.sorting) { - return nil, ErrRowGroupSortingColumnsMismatch - } - } - - m.compare = compareRowsFuncOf(schema, m.sorting) - return m, nil -} - type rowGroup struct { schema *Schema numRows int64 diff --git a/row_group_test.go b/row_group_test.go index cf3940e2..c52b3f5f 100644 --- a/row_group_test.go +++ b/row_group_test.go @@ -123,342 +123,6 @@ func fileRowGroup(rowGroup parquet.RowGroup) parquet.RowGroup { return f.RowGroups()[0] } -func TestMergeRowGroups(t *testing.T) { - tests := []struct { - scenario string - options []parquet.RowGroupOption - input []parquet.RowGroup - output parquet.RowGroup - }{ - { - scenario: "no row groups", - options: []parquet.RowGroupOption{ - parquet.SchemaOf(Person{}), - }, - output: sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SchemaOf(Person{}), - }, - ), - }, - - { - scenario: "a single row group", - input: []parquet.RowGroup{ - sortedRowGroup(nil, - Person{FirstName: "some", LastName: "one", Age: 30}, - Person{FirstName: "some", LastName: "one else", Age: 31}, - Person{FirstName: "and", LastName: "you", Age: 32}, - ), - }, - output: sortedRowGroup(nil, - Person{FirstName: "some", LastName: "one", Age: 30}, - Person{FirstName: "some", LastName: "one else", Age: 31}, - Person{FirstName: "and", LastName: "you", Age: 32}, - ), - }, - - { - scenario: "two row groups without ordering", - input: []parquet.RowGroup{ - sortedRowGroup(nil, Person{FirstName: "some", LastName: "one", Age: 30}), - sortedRowGroup(nil, Person{FirstName: "some", LastName: "one else", Age: 31}), - }, - output: sortedRowGroup(nil, - Person{FirstName: "some", LastName: "one", Age: 30}, - Person{FirstName: "some", LastName: "one else", Age: 31}, - ), - }, - - { - scenario: "three row groups without ordering", - input: []parquet.RowGroup{ - sortedRowGroup(nil, Person{FirstName: "some", LastName: "one", Age: 30}), - sortedRowGroup(nil, Person{FirstName: "some", LastName: "one else", Age: 31}), - sortedRowGroup(nil, Person{FirstName: "question", LastName: "answer", Age: 42}), - }, - output: sortedRowGroup(nil, - Person{FirstName: "some", LastName: "one", Age: 30}, - Person{FirstName: "some", LastName: "one else", Age: 31}, - Person{FirstName: "question", LastName: "answer", Age: 42}, - ), - }, - - { - scenario: "row groups sorted by ascending last name", - options: []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - input: []parquet.RowGroup{ - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - Person{FirstName: "Han", LastName: "Solo"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - ), - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - ), - }, - output: sortedRowGroup(nil, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - Person{FirstName: "Han", LastName: "Solo"}, - ), - }, - - { - scenario: "row groups sorted by descending last name", - options: []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Descending("LastName"), - ), - ), - }, - input: []parquet.RowGroup{ - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Descending("LastName"), - ), - ), - }, - Person{FirstName: "Han", LastName: "Solo"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - ), - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Descending("LastName"), - ), - ), - }, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - ), - }, - output: sortedRowGroup(nil, - Person{FirstName: "Han", LastName: "Solo"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - ), - }, - - { - scenario: "row groups sorted by ascending last and first name", - options: []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - parquet.Ascending("FirstName"), - ), - ), - }, - input: []parquet.RowGroup{ - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - parquet.Ascending("FirstName"), - ), - ), - }, - Person{FirstName: "Luke", LastName: "Skywalker"}, - Person{FirstName: "Han", LastName: "Solo"}, - ), - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - parquet.Ascending("FirstName"), - ), - ), - }, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - Person{FirstName: "Anakin", LastName: "Skywalker"}, - ), - }, - output: sortedRowGroup(nil, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - Person{FirstName: "Anakin", LastName: "Skywalker"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - Person{FirstName: "Han", LastName: "Solo"}, - ), - }, - - { - scenario: "row groups with conversion to a different schema", - options: []parquet.RowGroupOption{ - parquet.SchemaOf(LastNameOnly{}), - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - input: []parquet.RowGroup{ - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - Person{FirstName: "Han", LastName: "Solo"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - ), - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - Person{FirstName: "Anakin", LastName: "Skywalker"}, - ), - }, - output: sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - LastNameOnly{LastName: "Solo"}, - LastNameOnly{LastName: "Skywalker"}, - LastNameOnly{LastName: "Skywalker"}, - LastNameOnly{LastName: "Kenobi"}, - ), - }, - } - - for _, adapter := range []struct { - scenario string - function func(parquet.RowGroup) parquet.RowGroup - }{ - {scenario: "buffer", function: selfRowGroup}, - {scenario: "file", function: fileRowGroup}, - } { - t.Run(adapter.scenario, func(t *testing.T) { - for _, test := range tests { - t.Run(test.scenario, func(t *testing.T) { - input := make([]parquet.RowGroup, len(test.input)) - for i := range test.input { - input[i] = adapter.function(test.input[i]) - } - - merged, err := parquet.MergeRowGroups(test.input, test.options...) - if err != nil { - t.Fatal(err) - } - if merged.NumRows() != test.output.NumRows() { - t.Fatalf("the number of rows mismatch: want=%d got=%d", merged.NumRows(), test.output.NumRows()) - } - if merged.Schema() != test.output.Schema() { - t.Fatalf("the row group schemas mismatch:\n%v\n%v", test.output.Schema(), merged.Schema()) - } - - options := []parquet.RowGroupOption{parquet.SchemaOf(Person{})} - options = append(options, test.options...) - // We test two views of the resulting row group: the one originally - // returned by MergeRowGroups, and one where the merged row group - // has been copied into a new buffer. The intent is to exercise both - // the row-by-row read as well as optimized code paths when CopyRows - // bypasses the ReadRow/WriteRow calls and the row group is written - // directly to the buffer by calling WriteRowsTo/WriteRowGroup. - mergedCopy := parquet.NewBuffer(options...) - - totalRows := test.output.NumRows() - numRows, err := copyRowsAndClose(mergedCopy, merged.Rows()) - if err != nil { - t.Fatal(err) - } - if numRows != totalRows { - t.Fatalf("wrong number of rows copied: want=%d got=%d", totalRows, numRows) - } - - for _, merge := range []struct { - scenario string - rowGroup parquet.RowGroup - }{ - {scenario: "self", rowGroup: merged}, - {scenario: "copy", rowGroup: mergedCopy}, - } { - t.Run(merge.scenario, func(t *testing.T) { - var expectedRows = test.output.Rows() - var mergedRows = merge.rowGroup.Rows() - var row1 = make([]parquet.Row, 1) - var row2 = make([]parquet.Row, 1) - var numRows int64 - - defer expectedRows.Close() - defer mergedRows.Close() - - for { - _, err1 := expectedRows.ReadRows(row1) - n, err2 := mergedRows.ReadRows(row2) - - if err1 != err2 { - // ReadRows may or may not return io.EOF - // when it reads the last row, so we test - // that the reference RowReader has also - // reached the end. - if err1 == nil && err2 == io.EOF { - _, err1 = expectedRows.ReadRows(row1[:0]) - } - if err1 != io.EOF { - t.Fatalf("errors mismatched while comparing row %d/%d: want=%v got=%v", numRows, totalRows, err1, err2) - } - } - - if n != 0 { - if !row1[0].Equal(row2[0]) { - t.Errorf("row at index %d/%d mismatch: want=%+v got=%+v", numRows, totalRows, row1[0], row2[0]) - } - numRows++ - } - - if err1 != nil { - break - } - } - - if numRows != totalRows { - t.Errorf("expected to read %d rows but %d were found", totalRows, numRows) - } - }) - } - - }) - } - }) - } -} - func TestWriteRowGroupClosesRows(t *testing.T) { var rows []*wrappedRows rg := wrappedRowGroup{ diff --git a/value.go b/value.go index e223d98c..3c102e5e 100644 --- a/value.go +++ b/value.go @@ -214,6 +214,9 @@ func ValueOf(v interface{}) Value { return makeValue(k, nil, reflect.ValueOf(v)) } +// NulLValue constructs a null value, which is the zero-value of the Value type. +func NullValue() Value { return Value{} } + // ZeroValue constructs a zero value of the given kind. func ZeroValue(kind Kind) Value { return makeValueKind(kind) }