From 5ea2f7b52ab3cb9425cecf4f2530ab9cb10bc1a1 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 22 Nov 2024 11:07:32 -0500 Subject: [PATCH 1/2] feat(parquet/pqarrow): Add ForceLarge option --- parquet/pqarrow/encode_arrow_test.go | 45 ++++++++++++++++++++++++++++ parquet/pqarrow/file_reader.go | 6 +++- parquet/pqarrow/properties.go | 5 ++++ parquet/pqarrow/schema.go | 16 ++++++---- 4 files changed, 66 insertions(+), 6 deletions(-) diff --git a/parquet/pqarrow/encode_arrow_test.go b/parquet/pqarrow/encode_arrow_test.go index b75a5c01..266d8911 100644 --- a/parquet/pqarrow/encode_arrow_test.go +++ b/parquet/pqarrow/encode_arrow_test.go @@ -1945,6 +1945,51 @@ func TestParquetArrowIO(t *testing.T) { suite.Run(t, new(ParquetIOTestSuite)) } +func TestForceLargeTypes(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + sc := arrow.NewSchema([]arrow.Field{ + {Name: "str", Type: arrow.BinaryTypes.LargeString}, + {Name: "bin", Type: arrow.BinaryTypes.LargeBinary}, + }, nil) + + bldr := array.NewRecordBuilder(mem, sc) + defer bldr.Release() + + bldr.Field(0).(*array.LargeStringBuilder).AppendValues([]string{"hello", "foo", "bar"}, nil) + bldr.Field(1).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("hello"), []byte("foo"), []byte("bar")}, nil) + + rec := bldr.NewRecord() + defer rec.Release() + + var buf bytes.Buffer + wr, err := pqarrow.NewFileWriter(sc, &buf, + parquet.NewWriterProperties(), + pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))) + require.NoError(t, err) + + require.NoError(t, wr.Write(rec)) + require.NoError(t, wr.Close()) + + rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes())) + require.NoError(t, err) + defer rdr.Close() + + pqrdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{ + ForceLarge: true}, mem) + require.NoError(t, err) + + recrdr, err := pqrdr.GetRecordReader(context.Background(), nil, nil) + require.NoError(t, err) + defer recrdr.Release() + + got, err := recrdr.Read() + require.NoError(t, err) + + assert.Truef(t, array.RecordEqual(rec, got), "expected: %s\ngot: %s", rec, got) +} + func TestBufferedRecWrite(t *testing.T) { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go index c9a83b8c..d6eae17a 100755 --- a/parquet/pqarrow/file_reader.go +++ b/parquet/pqarrow/file_reader.go @@ -471,9 +471,13 @@ func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups nrows += fr.rdr.MetaData().RowGroup(rg).NumRows() } + batchSize := fr.Props.BatchSize + if fr.Props.BatchSize <= 0 { + batchSize = nrows + } return &recordReader{ numRows: nrows, - batchSize: fr.Props.BatchSize, + batchSize: batchSize, parallel: fr.Props.Parallel, sc: sc, fieldReaders: readers, diff --git a/parquet/pqarrow/properties.go b/parquet/pqarrow/properties.go index dc7f4e09..b721cc04 100755 --- a/parquet/pqarrow/properties.go +++ b/parquet/pqarrow/properties.go @@ -165,6 +165,11 @@ type ArrowReadProperties struct { Parallel bool // BatchSize is the size used for calls to NextBatch when reading whole columns BatchSize int64 + // Setting ForceLarge to true will force the reader to use LargeString/LargeBinary + // for string and binary columns respectively, instead of the default variants. This + // can be necessary if you know that there are columns which contain more than 2GB of + // data, which would prevent use of int32 offsets. + ForceLarge bool readDictIndices map[int]struct{} } diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go index 6d30359c..8afa6c92 100644 --- a/parquet/pqarrow/schema.go +++ b/parquet/pqarrow/schema.go @@ -494,9 +494,12 @@ func arrowFromInt64(logical schema.LogicalType) (arrow.DataType, error) { } } -func arrowFromByteArray(logical schema.LogicalType) (arrow.DataType, error) { +func arrowFromByteArray(ctx *schemaTree, logical schema.LogicalType) (arrow.DataType, error) { switch logtype := logical.(type) { case schema.StringLogicalType: + if ctx.props.ForceLarge { + return arrow.BinaryTypes.LargeString, nil + } return arrow.BinaryTypes.String, nil case schema.DecimalLogicalType: return arrowDecimal(logtype), nil @@ -504,6 +507,9 @@ func arrowFromByteArray(logical schema.LogicalType) (arrow.DataType, error) { schema.EnumLogicalType, schema.JSONLogicalType, schema.BSONLogicalType: + if ctx.props.ForceLarge { + return arrow.BinaryTypes.LargeBinary, nil + } return arrow.BinaryTypes.Binary, nil default: return nil, xerrors.New("unhandled logicaltype " + logical.String() + " for byte_array") @@ -607,7 +613,7 @@ func getParquetType(typ arrow.DataType, props *parquet.WriterProperties, arrprop } } -func getArrowType(physical parquet.Type, logical schema.LogicalType, typeLen int) (arrow.DataType, error) { +func getArrowType(ctx *schemaTree, physical parquet.Type, logical schema.LogicalType, typeLen int) (arrow.DataType, error) { if !logical.IsValid() || logical.Equals(schema.NullLogicalType{}) { return arrow.Null, nil } @@ -626,7 +632,7 @@ func getArrowType(physical parquet.Type, logical schema.LogicalType, typeLen int case parquet.Types.Double: return arrow.PrimitiveTypes.Float64, nil case parquet.Types.ByteArray: - return arrowFromByteArray(logical) + return arrowFromByteArray(ctx, logical) case parquet.Types.FixedLenByteArray: return arrowFromFLBA(logical, typeLen) default: @@ -708,7 +714,7 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s // } primitiveNode := listNode.(*schema.PrimitiveNode) colIndex := ctx.schema.ColumnIndexByNode(primitiveNode) - arrowType, err := getArrowType(primitiveNode.PhysicalType(), primitiveNode.LogicalType(), primitiveNode.TypeLength()) + arrowType, err := getArrowType(ctx, primitiveNode.PhysicalType(), primitiveNode.LogicalType(), primitiveNode.TypeLength()) if err != nil { return err } @@ -882,7 +888,7 @@ func nodeToSchemaField(n schema.Node, currentLevels file.LevelInfo, ctx *schemaT primitive := n.(*schema.PrimitiveNode) colIndex := ctx.schema.ColumnIndexByNode(primitive) - arrowType, err := getArrowType(primitive.PhysicalType(), primitive.LogicalType(), primitive.TypeLength()) + arrowType, err := getArrowType(ctx, primitive.PhysicalType(), primitive.LogicalType(), primitive.TypeLength()) if err != nil { return err } From 7e693de225d6c965e7c96bf9fa41f09188d879f9 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 6 Dec 2024 12:38:12 -0500 Subject: [PATCH 2/2] switch to per-column option --- parquet/pqarrow/encode_arrow_test.go | 6 +++-- parquet/pqarrow/properties.go | 37 +++++++++++++++++++++++----- parquet/pqarrow/schema.go | 34 ++++++++++++++++--------- 3 files changed, 58 insertions(+), 19 deletions(-) diff --git a/parquet/pqarrow/encode_arrow_test.go b/parquet/pqarrow/encode_arrow_test.go index 266d8911..78e9c684 100644 --- a/parquet/pqarrow/encode_arrow_test.go +++ b/parquet/pqarrow/encode_arrow_test.go @@ -1976,8 +1976,10 @@ func TestForceLargeTypes(t *testing.T) { require.NoError(t, err) defer rdr.Close() - pqrdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{ - ForceLarge: true}, mem) + props := pqarrow.ArrowReadProperties{} + props.SetForceLarge(0, true) + props.SetForceLarge(1, true) + pqrdr, err := pqarrow.NewFileReader(rdr, props, mem) require.NoError(t, err) recrdr, err := pqrdr.GetRecordReader(context.Background(), nil, nil) diff --git a/parquet/pqarrow/properties.go b/parquet/pqarrow/properties.go index b721cc04..d349a398 100755 --- a/parquet/pqarrow/properties.go +++ b/parquet/pqarrow/properties.go @@ -165,13 +165,38 @@ type ArrowReadProperties struct { Parallel bool // BatchSize is the size used for calls to NextBatch when reading whole columns BatchSize int64 - // Setting ForceLarge to true will force the reader to use LargeString/LargeBinary - // for string and binary columns respectively, instead of the default variants. This - // can be necessary if you know that there are columns which contain more than 2GB of - // data, which would prevent use of int32 offsets. - ForceLarge bool - readDictIndices map[int]struct{} + readDictIndices map[int]struct{} + forceLargeIndices map[int]struct{} +} + +// SetForceLarge determines whether a particular column, if it is String or Binary, +// will use the LargeString/LargeBinary variants (with int64 offsets) instead of int32 +// offsets. This is specifically useful if you know that particular columns contain more +// than 2GB worth of byte data which would prevent use of int32 offsets. +// +// Passing false will use the default variants while passing true will use the large +// variant. If the passed column index is not a string or binary column, then this will +// have no effect. +func (props *ArrowReadProperties) SetForceLarge(colIdx int, forceLarge bool) { + if props.forceLargeIndices == nil { + props.forceLargeIndices = make(map[int]struct{}) + } + + if forceLarge { + props.forceLargeIndices[colIdx] = struct{}{} + } else { + delete(props.forceLargeIndices, colIdx) + } +} + +func (props *ArrowReadProperties) ForceLarge(colIdx int) bool { + if props.forceLargeIndices == nil { + return false + } + + _, ok := props.forceLargeIndices[colIdx] + return ok } // SetReadDict determines whether to read a particular column as dictionary diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go index 8afa6c92..efb95519 100644 --- a/parquet/pqarrow/schema.go +++ b/parquet/pqarrow/schema.go @@ -494,12 +494,9 @@ func arrowFromInt64(logical schema.LogicalType) (arrow.DataType, error) { } } -func arrowFromByteArray(ctx *schemaTree, logical schema.LogicalType) (arrow.DataType, error) { +func arrowFromByteArray(logical schema.LogicalType) (arrow.DataType, error) { switch logtype := logical.(type) { case schema.StringLogicalType: - if ctx.props.ForceLarge { - return arrow.BinaryTypes.LargeString, nil - } return arrow.BinaryTypes.String, nil case schema.DecimalLogicalType: return arrowDecimal(logtype), nil @@ -507,9 +504,6 @@ func arrowFromByteArray(ctx *schemaTree, logical schema.LogicalType) (arrow.Data schema.EnumLogicalType, schema.JSONLogicalType, schema.BSONLogicalType: - if ctx.props.ForceLarge { - return arrow.BinaryTypes.LargeBinary, nil - } return arrow.BinaryTypes.Binary, nil default: return nil, xerrors.New("unhandled logicaltype " + logical.String() + " for byte_array") @@ -613,7 +607,7 @@ func getParquetType(typ arrow.DataType, props *parquet.WriterProperties, arrprop } } -func getArrowType(ctx *schemaTree, physical parquet.Type, logical schema.LogicalType, typeLen int) (arrow.DataType, error) { +func getArrowType(physical parquet.Type, logical schema.LogicalType, typeLen int) (arrow.DataType, error) { if !logical.IsValid() || logical.Equals(schema.NullLogicalType{}) { return arrow.Null, nil } @@ -632,7 +626,7 @@ func getArrowType(ctx *schemaTree, physical parquet.Type, logical schema.Logical case parquet.Types.Double: return arrow.PrimitiveTypes.Float64, nil case parquet.Types.ByteArray: - return arrowFromByteArray(ctx, logical) + return arrowFromByteArray(logical) case parquet.Types.FixedLenByteArray: return arrowFromFLBA(logical, typeLen) default: @@ -714,7 +708,7 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s // } primitiveNode := listNode.(*schema.PrimitiveNode) colIndex := ctx.schema.ColumnIndexByNode(primitiveNode) - arrowType, err := getArrowType(ctx, primitiveNode.PhysicalType(), primitiveNode.LogicalType(), primitiveNode.TypeLength()) + arrowType, err := getArrowType(primitiveNode.PhysicalType(), primitiveNode.LogicalType(), primitiveNode.TypeLength()) if err != nil { return err } @@ -723,6 +717,15 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s arrowType = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrowType} } + if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex) { + switch arrowType.ID() { + case arrow.STRING: + arrowType = arrow.BinaryTypes.LargeString + case arrow.BINARY: + arrowType = arrow.BinaryTypes.LargeBinary + } + } + itemField := arrow.Field{Name: listNode.Name(), Type: arrowType, Nullable: false, Metadata: createFieldMeta(int(listNode.FieldID()))} populateLeaf(colIndex, &itemField, currentLevels, ctx, out, &out.Children[0]) } @@ -888,7 +891,7 @@ func nodeToSchemaField(n schema.Node, currentLevels file.LevelInfo, ctx *schemaT primitive := n.(*schema.PrimitiveNode) colIndex := ctx.schema.ColumnIndexByNode(primitive) - arrowType, err := getArrowType(ctx, primitive.PhysicalType(), primitive.LogicalType(), primitive.TypeLength()) + arrowType, err := getArrowType(primitive.PhysicalType(), primitive.LogicalType(), primitive.TypeLength()) if err != nil { return err } @@ -897,6 +900,15 @@ func nodeToSchemaField(n schema.Node, currentLevels file.LevelInfo, ctx *schemaT arrowType = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrowType} } + if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex) { + switch arrowType.ID() { + case arrow.STRING: + arrowType = arrow.BinaryTypes.LargeString + case arrow.BINARY: + arrowType = arrow.BinaryTypes.LargeBinary + } + } + if primitive.RepetitionType() == parquet.Repetitions.Repeated { // one-level list encoding e.g. a: repeated int32; repeatedAncestorDefLevel := currentLevels.IncrementRepeated()