diff --git a/parquet/pqarrow/encode_arrow_test.go b/parquet/pqarrow/encode_arrow_test.go index b75a5c01..78e9c684 100644 --- a/parquet/pqarrow/encode_arrow_test.go +++ b/parquet/pqarrow/encode_arrow_test.go @@ -1945,6 +1945,53 @@ func TestParquetArrowIO(t *testing.T) { suite.Run(t, new(ParquetIOTestSuite)) } +func TestForceLargeTypes(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + sc := arrow.NewSchema([]arrow.Field{ + {Name: "str", Type: arrow.BinaryTypes.LargeString}, + {Name: "bin", Type: arrow.BinaryTypes.LargeBinary}, + }, nil) + + bldr := array.NewRecordBuilder(mem, sc) + defer bldr.Release() + + bldr.Field(0).(*array.LargeStringBuilder).AppendValues([]string{"hello", "foo", "bar"}, nil) + bldr.Field(1).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("hello"), []byte("foo"), []byte("bar")}, nil) + + rec := bldr.NewRecord() + defer rec.Release() + + var buf bytes.Buffer + wr, err := pqarrow.NewFileWriter(sc, &buf, + parquet.NewWriterProperties(), + pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))) + require.NoError(t, err) + + require.NoError(t, wr.Write(rec)) + require.NoError(t, wr.Close()) + + rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes())) + require.NoError(t, err) + defer rdr.Close() + + props := pqarrow.ArrowReadProperties{} + props.SetForceLarge(0, true) + props.SetForceLarge(1, true) + pqrdr, err := pqarrow.NewFileReader(rdr, props, mem) + require.NoError(t, err) + + recrdr, err := pqrdr.GetRecordReader(context.Background(), nil, nil) + require.NoError(t, err) + defer recrdr.Release() + + got, err := recrdr.Read() + require.NoError(t, err) + + assert.Truef(t, array.RecordEqual(rec, got), "expected: %s\ngot: %s", rec, got) +} + func TestBufferedRecWrite(t *testing.T) { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go index c9a83b8c..d6eae17a 100755 --- a/parquet/pqarrow/file_reader.go +++ b/parquet/pqarrow/file_reader.go @@ -471,9 +471,13 @@ func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups nrows += fr.rdr.MetaData().RowGroup(rg).NumRows() } + batchSize := fr.Props.BatchSize + if fr.Props.BatchSize <= 0 { + batchSize = nrows + } return &recordReader{ numRows: nrows, - batchSize: fr.Props.BatchSize, + batchSize: batchSize, parallel: fr.Props.Parallel, sc: sc, fieldReaders: readers, diff --git a/parquet/pqarrow/properties.go b/parquet/pqarrow/properties.go index dc7f4e09..d349a398 100755 --- a/parquet/pqarrow/properties.go +++ b/parquet/pqarrow/properties.go @@ -166,7 +166,37 @@ type ArrowReadProperties struct { // BatchSize is the size used for calls to NextBatch when reading whole columns BatchSize int64 - readDictIndices map[int]struct{} + readDictIndices map[int]struct{} + forceLargeIndices map[int]struct{} +} + +// SetForceLarge determines whether a particular column, if it is String or Binary, +// will use the LargeString/LargeBinary variants (with int64 offsets) instead of int32 +// offsets. This is specifically useful if you know that particular columns contain more +// than 2GB worth of byte data which would prevent use of int32 offsets. +// +// Passing false will use the default variants while passing true will use the large +// variant. If the passed column index is not a string or binary column, then this will +// have no effect. +func (props *ArrowReadProperties) SetForceLarge(colIdx int, forceLarge bool) { + if props.forceLargeIndices == nil { + props.forceLargeIndices = make(map[int]struct{}) + } + + if forceLarge { + props.forceLargeIndices[colIdx] = struct{}{} + } else { + delete(props.forceLargeIndices, colIdx) + } +} + +func (props *ArrowReadProperties) ForceLarge(colIdx int) bool { + if props.forceLargeIndices == nil { + return false + } + + _, ok := props.forceLargeIndices[colIdx] + return ok } // SetReadDict determines whether to read a particular column as dictionary diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go index 6d30359c..efb95519 100644 --- a/parquet/pqarrow/schema.go +++ b/parquet/pqarrow/schema.go @@ -717,6 +717,15 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s arrowType = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrowType} } + if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex) { + switch arrowType.ID() { + case arrow.STRING: + arrowType = arrow.BinaryTypes.LargeString + case arrow.BINARY: + arrowType = arrow.BinaryTypes.LargeBinary + } + } + itemField := arrow.Field{Name: listNode.Name(), Type: arrowType, Nullable: false, Metadata: createFieldMeta(int(listNode.FieldID()))} populateLeaf(colIndex, &itemField, currentLevels, ctx, out, &out.Children[0]) } @@ -891,6 +900,15 @@ func nodeToSchemaField(n schema.Node, currentLevels file.LevelInfo, ctx *schemaT arrowType = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrowType} } + if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex) { + switch arrowType.ID() { + case arrow.STRING: + arrowType = arrow.BinaryTypes.LargeString + case arrow.BINARY: + arrowType = arrow.BinaryTypes.LargeBinary + } + } + if primitive.RepetitionType() == parquet.Repetitions.Repeated { // one-level list encoding e.g. a: repeated int32; repeatedAncestorDefLevel := currentLevels.IncrementRepeated()