Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Add "WithError" versions of NewReader/NewGenericReader #305

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ var testdataFiles []string
func init() {
entries, _ := os.ReadDir("testdata")
for _, e := range entries {
if e.IsDir() {
continue
}

testdataFiles = append(testdataFiles, filepath.Join("testdata", e.Name()))
}
}
Expand Down
98 changes: 70 additions & 28 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,24 @@ import (
//
// This example showcases a typical use of parquet readers:
//
// reader := parquet.NewReader(file)
// rows := []RowType{}
// for {
// row := RowType{}
// err := reader.Read(&row)
// if err != nil {
// if err == io.EOF {
// break
// reader, err := parquet.NewReaderOrError(file)
// if err != nil { ... }
//
// rows := []RowType{}
// for {
// row := RowType{}
// err := reader.Read(&row)
// if err != nil {
// if err == io.EOF {
// break
// }
// ...
// }
// rows = append(rows, row)
// }
// if err := reader.Close(); err != nil {
// ...
// }
// rows = append(rows, row)
// }
// if err := reader.Close(); err != nil {
// ...
// }
//
// For programs building with Go 1.18 or later, the GenericReader[T] type
// supersedes this one.
Expand Down Expand Up @@ -56,19 +58,39 @@ type Reader struct {
// // handle the configuration error
// ...
// } else {
// // this call to create a reader is guaranteed not to panic
// // this call to create a reader is guaranteed not to panic because
// // of an invalid config, but may panic on other errors
// reader := parquet.NewReader(input, config)
// ...
// }
//
// NewReader also panics if trying to read an invalid parquet file, or on any
// error. To inspect the error rather than panic, use NewReaderOrError.
func NewReader(input io.ReaderAt, options ...ReaderOption) *Reader {
c, err := NewReaderConfig(options...)
r, err := NewReaderOrError(input, options...)
if err != nil {
panic(err)
}

return r
}

// NewReaderOrError constructs a parquet reader reading rows from the given
// io.ReaderAt, returning an error if anything goes wrong.
//
// In order to read parquet rows, the io.ReaderAt must be converted to a
// parquet.File. If r is already a parquet.File it is used directly; otherwise,
// the io.ReaderAt value is expected to either have a `Size() int64` method or
// implement io.Seeker in order to determine its size.
func NewReaderOrError(input io.ReaderAt, options ...ReaderOption) (*Reader, error) {
c, err := NewReaderConfig(options...)
if err != nil {
return nil, err
}

f, err := openFile(input)
if err != nil {
panic(err)
return nil, err
}

r := &Reader{
Expand All @@ -80,11 +102,17 @@ func NewReader(input io.ReaderAt, options ...ReaderOption) *Reader {

if c.Schema != nil {
r.file.schema = c.Schema
r.file.rowGroup = convertRowGroupTo(r.file.rowGroup, c.Schema)

r.file.rowGroup, err = convertRowGroupTo(r.file.rowGroup, c.Schema)
if err != nil {
_ = r.Close()
return nil, err
}
}

r.read.init(r.file.schema, r.file.rowGroup)
return r

return r, nil
}

func openFile(input io.ReaderAt) (*File, error) {
Expand Down Expand Up @@ -113,15 +141,30 @@ func fileRowGroupOf(f *File) RowGroup {
}

// NewRowGroupReader constructs a new Reader which reads rows from the RowGroup
// passed as argument.
// passed as an argument. This function panics if it encounters any error; use
// NewRowGroupReaderOrError to be able to handle failures more gracefully
func NewRowGroupReader(rowGroup RowGroup, options ...ReaderOption) *Reader {
c, err := NewReaderConfig(options...)
r, err := NewRowGroupReaderOrError(rowGroup, options...)
if err != nil {
panic(err)
}

return r
}

// NewRowGroupReaderOrError constructs a new Reader which reads rows from the RowGroup
// passed as an argument.
func NewRowGroupReaderOrError(rowGroup RowGroup, options ...ReaderOption) (*Reader, error) {
c, err := NewReaderConfig(options...)
if err != nil {
return nil, err
}

if c.Schema != nil {
rowGroup = convertRowGroupTo(rowGroup, c.Schema)
rowGroup, err = convertRowGroupTo(rowGroup, c.Schema)
if err != nil {
return nil, err
}
}

r := &Reader{
Expand All @@ -132,21 +175,20 @@ func NewRowGroupReader(rowGroup RowGroup, options ...ReaderOption) *Reader {
}

r.read.init(r.file.schema, r.file.rowGroup)
return r
return r, nil
}

func convertRowGroupTo(rowGroup RowGroup, schema *Schema) RowGroup {
func convertRowGroupTo(rowGroup RowGroup, schema *Schema) (RowGroup, error) {
if rowGroupSchema := rowGroup.Schema(); !nodesAreEqual(schema, rowGroupSchema) {
conv, err := Convert(schema, rowGroupSchema)
if err != nil {
// TODO: this looks like something we should not be panicking on,
// but the current NewReader API does not offer a mechanism to
// report errors.
panic(err)
return nil, err
}

rowGroup = ConvertRowGroup(rowGroup, conv)
}
return rowGroup

return rowGroup, nil
}

func sizeOf(r io.ReaderAt) (int64, error) {
Expand Down
87 changes: 72 additions & 15 deletions reader_go18.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package parquet

import (
"fmt"
"io"
"reflect"
)
Expand All @@ -26,15 +27,37 @@ type GenericReader[T any] struct {
//
// If the option list may explicitly declare a schema, it must be compatible
// with the schema generated from T.
//
// On an error such as an invalid parquet input, NewGenericReader will panic. If
// it would be preferable to inspect the error instead, use NewGenericReaderOrError
func NewGenericReader[T any](input io.ReaderAt, options ...ReaderOption) *GenericReader[T] {
c, err := NewReaderConfig(options...)
r, err := NewGenericReaderOrError[T](input, options...)
if err != nil {
panic(err)
}

return r
}

// NewGenericReaderOrError is like NewReader but returns GenericReader[T] suited to write
// rows of Go type T.
//
// The type parameter T should be a map, struct, or any. Any other types will
// cause a panic at runtime. Type checking is a lot more effective when the
// generic parameter is a struct type, using map and interface types is somewhat
// similar to using a Writer.
//
// If the option list may explicitly declare a schema, it must be compatible
// with the schema generated from T.
func NewGenericReaderOrError[T any](input io.ReaderAt, options ...ReaderOption) (*GenericReader[T], error) {
c, err := NewReaderConfig(options...)
if err != nil {
return nil, err
}

f, err := openFile(input)
if err != nil {
panic(err)
return nil, err
}

rowGroup := fileRowGroupOf(f)
Expand All @@ -58,20 +81,44 @@ func NewGenericReader[T any](input io.ReaderAt, options ...ReaderOption) *Generi
}

if !nodesAreEqual(c.Schema, f.schema) {
r.base.file.rowGroup = convertRowGroupTo(r.base.file.rowGroup, c.Schema)
r.base.file.rowGroup, err = convertRowGroupTo(r.base.file.rowGroup, c.Schema)
if err != nil {
_ = r.Close()
return nil, err
}
}

r.base.read.init(r.base.file.schema, r.base.file.rowGroup)
r.read = readFuncOf[T](t, r.base.file.schema)
return r

r.read, err = readFuncOf[T](t, r.base.file.schema)
if err != nil {
_ = r.Close()
return nil, err
}

return r, nil
}

// NewGenericRowGroupReader constructs a new GemericReader which reads rows from
// the RowGroup passed as an argument. This function panics if it encounters any error;
// use NewGenericRowGroupReaderOrError to be able to handle failures more gracefully
func NewGenericRowGroupReader[T any](rowGroup RowGroup, options ...ReaderOption) *GenericReader[T] {
c, err := NewReaderConfig(options...)
r, err := NewGenericRowGroupReaderOrError[T](rowGroup, options...)
if err != nil {
panic(err)
}

return r
}

// NewGenericRowGroupReaderOrError constructs a new GemericReader which reads rows
// from the RowGrup passed as an argument.
func NewGenericRowGroupReaderOrError[T any](rowGroup RowGroup, options ...ReaderOption) (*GenericReader[T], error) {
c, err := NewReaderConfig(options...)
if err != nil {
return nil, err
}

t := typeOf[T]()
if c.Schema == nil {
if t == nil {
Expand All @@ -91,12 +138,20 @@ func NewGenericRowGroupReader[T any](rowGroup RowGroup, options ...ReaderOption)
}

if !nodesAreEqual(c.Schema, rowGroup.Schema()) {
r.base.file.rowGroup = convertRowGroupTo(r.base.file.rowGroup, c.Schema)
r.base.file.rowGroup, err = convertRowGroupTo(r.base.file.rowGroup, c.Schema)
if err != nil {
return nil, err
}
}

r.base.read.init(r.base.file.schema, r.base.file.rowGroup)
r.read = readFuncOf[T](t, r.base.file.schema)
return r

r.read, err = readFuncOf[T](t, r.base.file.schema)
if err != nil {
return nil, err
}

return r, nil
}

func (r *GenericReader[T]) Reset() {
Expand Down Expand Up @@ -160,21 +215,23 @@ var (

type readFunc[T any] func(*GenericReader[T], []T) (int, error)

func readFuncOf[T any](t reflect.Type, schema *Schema) readFunc[T] {
func readFuncOf[T any](t reflect.Type, schema *Schema) (readFunc[T], error) {
if t == nil {
return (*GenericReader[T]).readRows
return (*GenericReader[T]).readRows, nil
}

switch t.Kind() {
case reflect.Interface, reflect.Map:
return (*GenericReader[T]).readRows
return (*GenericReader[T]).readRows, nil

case reflect.Struct:
return (*GenericReader[T]).readRows
return (*GenericReader[T]).readRows, nil

case reflect.Pointer:
if e := t.Elem(); e.Kind() == reflect.Struct {
return (*GenericReader[T]).readRows
return (*GenericReader[T]).readRows, nil
}
}
panic("cannot create reader for values of type " + t.String())

return nil, fmt.Errorf("cannot create reader for values of type %s", t.String())
}
35 changes: 32 additions & 3 deletions reader_go18_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"reflect"
"testing"

Expand Down Expand Up @@ -71,7 +73,10 @@ func testGenericReaderRows[Row any](rows []Row) error {
if err := writer.Close(); err != nil {
return err
}
reader := parquet.NewGenericReader[Row](bytes.NewReader(buffer.Bytes()))
reader, err := parquet.NewGenericReaderOrError[Row](bytes.NewReader(buffer.Bytes()))
if err != nil {
return err
}
result := make([]Row, len(rows))
n, err := reader.Read(result)
if err != nil && !errors.Is(err, io.EOF) {
Expand Down Expand Up @@ -119,7 +124,11 @@ func benchmarkGenericReader[Row generator[Row]](b *testing.B) {
buffer.Write(rows)

b.Run("go1.17", func(b *testing.B) {
reader := parquet.NewRowGroupReader(buffer)
reader, err := parquet.NewRowGroupReaderOrError(buffer)
if err != nil {
b.Fatal(err)
}

benchmarkRowsPerSecond(b, func() int {
for i := range rowbuf {
if err := reader.Read(&rowbuf[i]); err != nil {
Expand All @@ -135,7 +144,11 @@ func benchmarkGenericReader[Row generator[Row]](b *testing.B) {
})

b.Run("go1.18", func(b *testing.B) {
reader := parquet.NewGenericRowGroupReader[Row](buffer)
reader, err := parquet.NewGenericRowGroupReaderOrError[Row](buffer)
if err != nil {
b.Fatal(err)
}

benchmarkRowsPerSecond(b, func() int {
n, err := reader.Read(rowbuf)
if err != nil {
Expand All @@ -150,3 +163,19 @@ func benchmarkGenericReader[Row generator[Row]](b *testing.B) {
})
})
}

func TestGenericReaderFailure(t *testing.T) {
invalidFileName := filepath.Join("testdata", "invalid", "not_enough_columns.invalid.parquet")

file, err := os.Open(invalidFileName)
if err != nil {
t.Fatalf("failed to read %q: %s", invalidFileName, err.Error())
}

defer file.Close()

_, err = parquet.NewGenericReaderOrError[map[string]any](file)
if err == nil {
t.Fatalf("expected an error when trying to open invalid file %q, but didn't get one", invalidFileName)
}
}
Loading