Skip to content

Commit

Permalink
feat(parquet/pqarrow): Add ForceLarge option (#197)
Browse files Browse the repository at this point in the history
### Rationale for this change
closes #195

For parquet files that contain more than 2GB of data in a column, we
should allow a user to force using the LargeString/LargeBinary variants
without requiring a stored schema.

### What changes are included in this PR?
Adds `ForceLarge` option to `pqarrow.ArrowReadProperties` and enables it
to force usage of `LargeString` and `LargeBinary` data types.

### Are these changes tested?
Yes, a unit test is added.

### Are there any user-facing changes?
No breaking changes, only the addition of a new option.
  • Loading branch information
zeroshade authored Dec 9, 2024
1 parent 9d44f34 commit 370dc98
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 2 deletions.
47 changes: 47 additions & 0 deletions parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,53 @@ func TestParquetArrowIO(t *testing.T) {
suite.Run(t, new(ParquetIOTestSuite))
}

func TestForceLargeTypes(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

sc := arrow.NewSchema([]arrow.Field{
{Name: "str", Type: arrow.BinaryTypes.LargeString},
{Name: "bin", Type: arrow.BinaryTypes.LargeBinary},
}, nil)

bldr := array.NewRecordBuilder(mem, sc)
defer bldr.Release()

bldr.Field(0).(*array.LargeStringBuilder).AppendValues([]string{"hello", "foo", "bar"}, nil)
bldr.Field(1).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("hello"), []byte("foo"), []byte("bar")}, nil)

rec := bldr.NewRecord()
defer rec.Release()

var buf bytes.Buffer
wr, err := pqarrow.NewFileWriter(sc, &buf,
parquet.NewWriterProperties(),
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
require.NoError(t, err)

require.NoError(t, wr.Write(rec))
require.NoError(t, wr.Close())

rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
defer rdr.Close()

props := pqarrow.ArrowReadProperties{}
props.SetForceLarge(0, true)
props.SetForceLarge(1, true)
pqrdr, err := pqarrow.NewFileReader(rdr, props, mem)
require.NoError(t, err)

recrdr, err := pqrdr.GetRecordReader(context.Background(), nil, nil)
require.NoError(t, err)
defer recrdr.Release()

got, err := recrdr.Read()
require.NoError(t, err)

assert.Truef(t, array.RecordEqual(rec, got), "expected: %s\ngot: %s", rec, got)
}

func TestBufferedRecWrite(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
Expand Down
6 changes: 5 additions & 1 deletion parquet/pqarrow/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,13 @@ func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups
nrows += fr.rdr.MetaData().RowGroup(rg).NumRows()
}

batchSize := fr.Props.BatchSize
if fr.Props.BatchSize <= 0 {
batchSize = nrows
}
return &recordReader{
numRows: nrows,
batchSize: fr.Props.BatchSize,
batchSize: batchSize,
parallel: fr.Props.Parallel,
sc: sc,
fieldReaders: readers,
Expand Down
32 changes: 31 additions & 1 deletion parquet/pqarrow/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,37 @@ type ArrowReadProperties struct {
// BatchSize is the size used for calls to NextBatch when reading whole columns
BatchSize int64

readDictIndices map[int]struct{}
readDictIndices map[int]struct{}
forceLargeIndices map[int]struct{}
}

// SetForceLarge determines whether a particular column, if it is String or Binary,
// will use the LargeString/LargeBinary variants (with int64 offsets) instead of int32
// offsets. This is specifically useful if you know that particular columns contain more
// than 2GB worth of byte data which would prevent use of int32 offsets.
//
// Passing false will use the default variants while passing true will use the large
// variant. If the passed column index is not a string or binary column, then this will
// have no effect.
func (props *ArrowReadProperties) SetForceLarge(colIdx int, forceLarge bool) {
if props.forceLargeIndices == nil {
props.forceLargeIndices = make(map[int]struct{})
}

if forceLarge {
props.forceLargeIndices[colIdx] = struct{}{}
} else {
delete(props.forceLargeIndices, colIdx)
}
}

func (props *ArrowReadProperties) ForceLarge(colIdx int) bool {
if props.forceLargeIndices == nil {
return false
}

_, ok := props.forceLargeIndices[colIdx]
return ok
}

// SetReadDict determines whether to read a particular column as dictionary
Expand Down
18 changes: 18 additions & 0 deletions parquet/pqarrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,15 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s
arrowType = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrowType}
}

if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex) {
switch arrowType.ID() {
case arrow.STRING:
arrowType = arrow.BinaryTypes.LargeString
case arrow.BINARY:
arrowType = arrow.BinaryTypes.LargeBinary
}
}

itemField := arrow.Field{Name: listNode.Name(), Type: arrowType, Nullable: false, Metadata: createFieldMeta(int(listNode.FieldID()))}
populateLeaf(colIndex, &itemField, currentLevels, ctx, out, &out.Children[0])
}
Expand Down Expand Up @@ -891,6 +900,15 @@ func nodeToSchemaField(n schema.Node, currentLevels file.LevelInfo, ctx *schemaT
arrowType = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrowType}
}

if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex) {
switch arrowType.ID() {
case arrow.STRING:
arrowType = arrow.BinaryTypes.LargeString
case arrow.BINARY:
arrowType = arrow.BinaryTypes.LargeBinary
}
}

if primitive.RepetitionType() == parquet.Repetitions.Repeated {
// one-level list encoding e.g. a: repeated int32;
repeatedAncestorDefLevel := currentLevels.IncrementRepeated()
Expand Down

0 comments on commit 370dc98

Please sign in to comment.