Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parquet/pqarrow): Add ForceLarge option #197

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,51 @@ func TestParquetArrowIO(t *testing.T) {
suite.Run(t, new(ParquetIOTestSuite))
}

func TestForceLargeTypes(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

sc := arrow.NewSchema([]arrow.Field{
{Name: "str", Type: arrow.BinaryTypes.LargeString},
{Name: "bin", Type: arrow.BinaryTypes.LargeBinary},
}, nil)

bldr := array.NewRecordBuilder(mem, sc)
defer bldr.Release()

bldr.Field(0).(*array.LargeStringBuilder).AppendValues([]string{"hello", "foo", "bar"}, nil)
bldr.Field(1).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("hello"), []byte("foo"), []byte("bar")}, nil)

rec := bldr.NewRecord()
defer rec.Release()

var buf bytes.Buffer
wr, err := pqarrow.NewFileWriter(sc, &buf,
parquet.NewWriterProperties(),
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
require.NoError(t, err)

require.NoError(t, wr.Write(rec))
require.NoError(t, wr.Close())

rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
defer rdr.Close()

pqrdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{
ForceLarge: true}, mem)
require.NoError(t, err)

recrdr, err := pqrdr.GetRecordReader(context.Background(), nil, nil)
require.NoError(t, err)
defer recrdr.Release()

got, err := recrdr.Read()
require.NoError(t, err)

assert.Truef(t, array.RecordEqual(rec, got), "expected: %s\ngot: %s", rec, got)
}

func TestBufferedRecWrite(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
Expand Down
6 changes: 5 additions & 1 deletion parquet/pqarrow/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,13 @@ func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups
nrows += fr.rdr.MetaData().RowGroup(rg).NumRows()
}

batchSize := fr.Props.BatchSize
if fr.Props.BatchSize <= 0 {
batchSize = nrows
}
return &recordReader{
numRows: nrows,
batchSize: fr.Props.BatchSize,
batchSize: batchSize,
parallel: fr.Props.Parallel,
sc: sc,
fieldReaders: readers,
Expand Down
5 changes: 5 additions & 0 deletions parquet/pqarrow/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ type ArrowReadProperties struct {
Parallel bool
// BatchSize is the size used for calls to NextBatch when reading whole columns
BatchSize int64
// Setting ForceLarge to true will force the reader to use LargeString/LargeBinary
// for string and binary columns respectively, instead of the default variants. This
// can be necessary if you know that there are columns which contain more than 2GB of
// data, which would prevent use of int32 offsets.
ForceLarge bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI there was an attempt in parquet-cpp but the solution might not be expected: apache/arrow#35825

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the original poster that it's rather weird that we can generate Parquet files that we can't otherwise read. However I also agree with Antoine that it might be good to make this option per-column if possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if you don't use this option, you can still read the parquet file, it would just require manually shrinking the batch size. I can definitely change this to make it a per-column option. That's fine, albeit a larger change since we don't currently expose which column we're determining the type for to the function that does the arrow type.

Alternately, we could utilize the column metadata for the row groups and decide ahead of time to switch to utilizing the Large variant for a column if the metadata says that it is large enough to warrant it, but that would make things really complex with row groups that may or may not be large enough to require it, etc.

The other alternative would be to forcibly reduce the batchsize when reading to accomodate?

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing it automatically would be surprising to users IMO. It would also potentially make inconsistent schemas when reading multiple files.

Reducing the batch size may make sense; alternatively an option to use StringView?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree that automatically changing the type could be confusing. Regardless of the approach used to convert types, the behavior of automatically reducing the batch size instead of exceeding the max offset of a variable width type would be very nice IMO.


readDictIndices map[int]struct{}
}
Expand Down
16 changes: 11 additions & 5 deletions parquet/pqarrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,16 +494,22 @@ func arrowFromInt64(logical schema.LogicalType) (arrow.DataType, error) {
}
}

func arrowFromByteArray(logical schema.LogicalType) (arrow.DataType, error) {
func arrowFromByteArray(ctx *schemaTree, logical schema.LogicalType) (arrow.DataType, error) {
switch logtype := logical.(type) {
case schema.StringLogicalType:
if ctx.props.ForceLarge {
return arrow.BinaryTypes.LargeString, nil
}
return arrow.BinaryTypes.String, nil
case schema.DecimalLogicalType:
return arrowDecimal(logtype), nil
case schema.NoLogicalType,
schema.EnumLogicalType,
schema.JSONLogicalType,
schema.BSONLogicalType:
if ctx.props.ForceLarge {
return arrow.BinaryTypes.LargeBinary, nil
}
return arrow.BinaryTypes.Binary, nil
default:
return nil, xerrors.New("unhandled logicaltype " + logical.String() + " for byte_array")
Expand Down Expand Up @@ -607,7 +613,7 @@ func getParquetType(typ arrow.DataType, props *parquet.WriterProperties, arrprop
}
}

func getArrowType(physical parquet.Type, logical schema.LogicalType, typeLen int) (arrow.DataType, error) {
func getArrowType(ctx *schemaTree, physical parquet.Type, logical schema.LogicalType, typeLen int) (arrow.DataType, error) {
if !logical.IsValid() || logical.Equals(schema.NullLogicalType{}) {
return arrow.Null, nil
}
Expand All @@ -626,7 +632,7 @@ func getArrowType(physical parquet.Type, logical schema.LogicalType, typeLen int
case parquet.Types.Double:
return arrow.PrimitiveTypes.Float64, nil
case parquet.Types.ByteArray:
return arrowFromByteArray(logical)
return arrowFromByteArray(ctx, logical)
case parquet.Types.FixedLenByteArray:
return arrowFromFLBA(logical, typeLen)
default:
Expand Down Expand Up @@ -708,7 +714,7 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s
// }
primitiveNode := listNode.(*schema.PrimitiveNode)
colIndex := ctx.schema.ColumnIndexByNode(primitiveNode)
arrowType, err := getArrowType(primitiveNode.PhysicalType(), primitiveNode.LogicalType(), primitiveNode.TypeLength())
arrowType, err := getArrowType(ctx, primitiveNode.PhysicalType(), primitiveNode.LogicalType(), primitiveNode.TypeLength())
if err != nil {
return err
}
Expand Down Expand Up @@ -882,7 +888,7 @@ func nodeToSchemaField(n schema.Node, currentLevels file.LevelInfo, ctx *schemaT

primitive := n.(*schema.PrimitiveNode)
colIndex := ctx.schema.ColumnIndexByNode(primitive)
arrowType, err := getArrowType(primitive.PhysicalType(), primitive.LogicalType(), primitive.TypeLength())
arrowType, err := getArrowType(ctx, primitive.PhysicalType(), primitive.LogicalType(), primitive.TypeLength())
if err != nil {
return err
}
Expand Down
Loading