Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Commit

Permalink
add WithError functions for creating (Generic)Reader instances
Browse files Browse the repository at this point in the history
New(Generic)Reader calls can fail at runtime if they encounter an
invalid parquet file, which means that this library is difficult to use
in a situation where an arbitrary or potentially untrusted parquet file
might need to be read.

This adds WithError versions of these instantiation functions, which
enables easier handling of errors
  • Loading branch information
SgtCoDFish committed Aug 11, 2022
1 parent f9e00f6 commit 3db15e7
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 47 deletions.
74 changes: 58 additions & 16 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
//
// This example showcases a typical use of parquet readers:
//
// reader := parquet.NewReader(file)
// reader, err := parquet.NewReaderWithError(file)
// if err != nil { ... }
//
// rows := []RowType{}
// for {
// row := RowType{}
Expand Down Expand Up @@ -56,19 +58,39 @@ type Reader struct {
// // handle the configuration error
// ...
// } else {
// // this call to create a reader is guaranteed not to panic
// // this call to create a reader is guaranteed not to panic because
// // of an invalid config, but may panic on other errors
// reader := parquet.NewReader(input, config)
// ...
// }
//
// NewReader also panics if trying to read an invalid parquet file, or on any
// error. To inspect the error rather than panic, use NewReaderWithError.
func NewReader(input io.ReaderAt, options ...ReaderOption) *Reader {
c, err := NewReaderConfig(options...)
r, err := NewReaderWithError(input, options...)
if err != nil {
panic(err)
}

return r
}

// NewReaderWithError constructs a parquet reader reading rows from the given
// io.ReaderAt, returning an error if anything goes wrong.
//
// In order to read parquet rows, the io.ReaderAt must be converted to a
// parquet.File. If r is already a parquet.File it is used directly; otherwise,
// the io.ReaderAt value is expected to either have a `Size() int64` method or
// implement io.Seeker in order to determine its size.
func NewReaderWithError(input io.ReaderAt, options ...ReaderOption) (*Reader, error) {
c, err := NewReaderConfig(options...)
if err != nil {
return nil, err
}

f, err := openFile(input)
if err != nil {
panic(err)
return nil, err
}

r := &Reader{
Expand All @@ -80,11 +102,17 @@ func NewReader(input io.ReaderAt, options ...ReaderOption) *Reader {

if c.Schema != nil {
r.file.schema = c.Schema
r.file.rowGroup = convertRowGroupTo(r.file.rowGroup, c.Schema)

r.file.rowGroup, err = convertRowGroupTo(r.file.rowGroup, c.Schema)
if err != nil {
_ = r.Close()
return nil, err
}
}

r.read.init(r.file.schema, r.file.rowGroup)
return r

return r, nil
}

func openFile(input io.ReaderAt) (*File, error) {
Expand Down Expand Up @@ -113,15 +141,30 @@ func fileRowGroupOf(f *File) RowGroup {
}

// NewRowGroupReader constructs a new Reader which reads rows from the RowGroup
// passed as argument.
// passed as an argument. This function panics if it encounters any error; use
// NewRowGroupReaderWithError to be able to handle failures more gracefully
func NewRowGroupReader(rowGroup RowGroup, options ...ReaderOption) *Reader {
c, err := NewReaderConfig(options...)
r, err := NewRowGroupReaderWithError(rowGroup, options...)
if err != nil {
panic(err)
}

return r
}

// NewRowGroupReaderWithError constructs a new Reader which reads rows from the RowGroup
// passed as an argument.
func NewRowGroupReaderWithError(rowGroup RowGroup, options ...ReaderOption) (*Reader, error) {
c, err := NewReaderConfig(options...)
if err != nil {
return nil, err
}

if c.Schema != nil {
rowGroup = convertRowGroupTo(rowGroup, c.Schema)
rowGroup, err = convertRowGroupTo(rowGroup, c.Schema)
if err != nil {
return nil, err
}
}

r := &Reader{
Expand All @@ -132,21 +175,20 @@ func NewRowGroupReader(rowGroup RowGroup, options ...ReaderOption) *Reader {
}

r.read.init(r.file.schema, r.file.rowGroup)
return r
return r, nil
}

func convertRowGroupTo(rowGroup RowGroup, schema *Schema) RowGroup {
func convertRowGroupTo(rowGroup RowGroup, schema *Schema) (RowGroup, error) {
if rowGroupSchema := rowGroup.Schema(); !nodesAreEqual(schema, rowGroupSchema) {
conv, err := Convert(schema, rowGroupSchema)
if err != nil {
// TODO: this looks like something we should not be panicking on,
// but the current NewReader API does not offer a mechanism to
// report errors.
panic(err)
return nil, err
}

rowGroup = ConvertRowGroup(rowGroup, conv)
}
return rowGroup

return rowGroup, nil
}

func sizeOf(r io.ReaderAt) (int64, error) {
Expand Down
87 changes: 72 additions & 15 deletions reader_go18.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package parquet

import (
"fmt"
"io"
"reflect"
)
Expand All @@ -26,15 +27,37 @@ type GenericReader[T any] struct {
//
// If the option list may explicitly declare a schema, it must be compatible
// with the schema generated from T.
//
// On an error such as an invalid parquet input, NewGenericReader will panic. If
// it would be preferable to inspect the error instead, use NewGenericReaderWithError
func NewGenericReader[T any](input io.ReaderAt, options ...ReaderOption) *GenericReader[T] {
c, err := NewReaderConfig(options...)
r, err := NewGenericReaderWithError[T](input, options...)
if err != nil {
panic(err)
}

return r
}

// NewGenericReaderWithError is like NewReader but returns GenericReader[T] suited to write
// rows of Go type T.
//
// The type parameter T should be a map, struct, or any. Any other types will
// cause a panic at runtime. Type checking is a lot more effective when the
// generic parameter is a struct type, using map and interface types is somewhat
// similar to using a Writer.
//
// If the option list may explicitly declare a schema, it must be compatible
// with the schema generated from T.
func NewGenericReaderWithError[T any](input io.ReaderAt, options ...ReaderOption) (*GenericReader[T], error) {
c, err := NewReaderConfig(options...)
if err != nil {
return nil, err
}

f, err := openFile(input)
if err != nil {
panic(err)
return nil, err
}

rowGroup := fileRowGroupOf(f)
Expand All @@ -58,20 +81,44 @@ func NewGenericReader[T any](input io.ReaderAt, options ...ReaderOption) *Generi
}

if !nodesAreEqual(c.Schema, f.schema) {
r.base.file.rowGroup = convertRowGroupTo(r.base.file.rowGroup, c.Schema)
r.base.file.rowGroup, err = convertRowGroupTo(r.base.file.rowGroup, c.Schema)
if err != nil {
_ = r.Close()
return nil, err
}
}

r.base.read.init(r.base.file.schema, r.base.file.rowGroup)
r.read = readFuncOf[T](t, r.base.file.schema)
return r

r.read, err = readFuncOf[T](t, r.base.file.schema)
if err != nil {
_ = r.Close()
return nil, err
}

return r, nil
}

// NewGenericRowGroupReader constructs a new GemericReader which reads rows from
// the RowGroup passed as an argument. This function panics if it encounters any error;
// use NewGenericRowGroupReaderWithError to be able to handle failures more gracefully
func NewGenericRowGroupReader[T any](rowGroup RowGroup, options ...ReaderOption) *GenericReader[T] {
c, err := NewReaderConfig(options...)
r, err := NewGenericRowGroupReaderWithError[T](rowGroup, options...)
if err != nil {
panic(err)
}

return r
}

// NewGenericRowGroupReaderWithError constructs a new GemericReader which reads rows
// from the RowGrup passed as an argument.
func NewGenericRowGroupReaderWithError[T any](rowGroup RowGroup, options ...ReaderOption) (*GenericReader[T], error) {
c, err := NewReaderConfig(options...)
if err != nil {
return nil, err
}

t := typeOf[T]()
if c.Schema == nil {
if t == nil {
Expand All @@ -91,12 +138,20 @@ func NewGenericRowGroupReader[T any](rowGroup RowGroup, options ...ReaderOption)
}

if !nodesAreEqual(c.Schema, rowGroup.Schema()) {
r.base.file.rowGroup = convertRowGroupTo(r.base.file.rowGroup, c.Schema)
r.base.file.rowGroup, err = convertRowGroupTo(r.base.file.rowGroup, c.Schema)
if err != nil {
return nil, err
}
}

r.base.read.init(r.base.file.schema, r.base.file.rowGroup)
r.read = readFuncOf[T](t, r.base.file.schema)
return r

r.read, err = readFuncOf[T](t, r.base.file.schema)
if err != nil {
return nil, err
}

return r, nil
}

func (r *GenericReader[T]) Reset() {
Expand Down Expand Up @@ -160,21 +215,23 @@ var (

type readFunc[T any] func(*GenericReader[T], []T) (int, error)

func readFuncOf[T any](t reflect.Type, schema *Schema) readFunc[T] {
func readFuncOf[T any](t reflect.Type, schema *Schema) (readFunc[T], error) {
if t == nil {
return (*GenericReader[T]).readRows
return (*GenericReader[T]).readRows, nil
}

switch t.Kind() {
case reflect.Interface, reflect.Map:
return (*GenericReader[T]).readRows
return (*GenericReader[T]).readRows, nil

case reflect.Struct:
return (*GenericReader[T]).readRows
return (*GenericReader[T]).readRows, nil

case reflect.Pointer:
if e := t.Elem(); e.Kind() == reflect.Struct {
return (*GenericReader[T]).readRows
return (*GenericReader[T]).readRows, nil
}
}
panic("cannot create reader for values of type " + t.String())

return nil, fmt.Errorf("cannot create reader for values of type %s", t.String())
}
17 changes: 14 additions & 3 deletions reader_go18_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func testGenericReaderRows[Row any](rows []Row) error {
if err := writer.Close(); err != nil {
return err
}
reader := parquet.NewGenericReader[Row](bytes.NewReader(buffer.Bytes()))
reader, err := parquet.NewGenericReaderWithError[Row](bytes.NewReader(buffer.Bytes()))
if err != nil {
return err
}
result := make([]Row, len(rows))
n, err := reader.Read(result)
if err != nil && !errors.Is(err, io.EOF) {
Expand Down Expand Up @@ -119,7 +122,11 @@ func benchmarkGenericReader[Row generator[Row]](b *testing.B) {
buffer.Write(rows)

b.Run("go1.17", func(b *testing.B) {
reader := parquet.NewRowGroupReader(buffer)
reader, err := parquet.NewRowGroupReaderWithError(buffer)
if err != nil {
b.Fatal(err)
}

benchmarkRowsPerSecond(b, func() int {
for i := range rowbuf {
if err := reader.Read(&rowbuf[i]); err != nil {
Expand All @@ -135,7 +142,11 @@ func benchmarkGenericReader[Row generator[Row]](b *testing.B) {
})

b.Run("go1.18", func(b *testing.B) {
reader := parquet.NewGenericRowGroupReader[Row](buffer)
reader, err := parquet.NewGenericRowGroupReaderWithError[Row](buffer)
if err != nil {
b.Fatal(err)
}

benchmarkRowsPerSecond(b, func() int {
n, err := reader.Read(rowbuf)
if err != nil {
Expand Down
Loading

0 comments on commit 3db15e7

Please sign in to comment.