diff --git a/reader.go b/reader.go index 1a355b6e3..29da95604 100644 --- a/reader.go +++ b/reader.go @@ -11,7 +11,9 @@ import ( // // This example showcases a typical use of parquet readers: // -// reader := parquet.NewReader(file) +// reader, err := parquet.NewReaderWithError(file) +// if err != nil { ... } +// // rows := []RowType{} // for { // row := RowType{} @@ -56,19 +58,39 @@ type Reader struct { // // handle the configuration error // ... // } else { -// // this call to create a reader is guaranteed not to panic +// // this call to create a reader is guaranteed not to panic because +// // of an invalid config, but may panic on other errors // reader := parquet.NewReader(input, config) // ... // } +// +// NewReader also panics if trying to read an invalid parquet file, or on any +// error. To inspect the error rather than panic, use NewReaderWithError. func NewReader(input io.ReaderAt, options ...ReaderOption) *Reader { - c, err := NewReaderConfig(options...) + r, err := NewReaderWithError(input, options...) if err != nil { panic(err) } + return r +} + +// NewReaderWithError constructs a parquet reader reading rows from the given +// io.ReaderAt, returning an error if anything goes wrong. +// +// In order to read parquet rows, the io.ReaderAt must be converted to a +// parquet.File. If r is already a parquet.File it is used directly; otherwise, +// the io.ReaderAt value is expected to either have a `Size() int64` method or +// implement io.Seeker in order to determine its size. +func NewReaderWithError(input io.ReaderAt, options ...ReaderOption) (*Reader, error) { + c, err := NewReaderConfig(options...) + if err != nil { + return nil, err + } + f, err := openFile(input) if err != nil { - panic(err) + return nil, err } r := &Reader{ @@ -80,11 +102,17 @@ func NewReader(input io.ReaderAt, options ...ReaderOption) *Reader { if c.Schema != nil { r.file.schema = c.Schema - r.file.rowGroup = convertRowGroupTo(r.file.rowGroup, c.Schema) + + r.file.rowGroup, err = convertRowGroupTo(r.file.rowGroup, c.Schema) + if err != nil { + _ = r.Close() + return nil, err + } } r.read.init(r.file.schema, r.file.rowGroup) - return r + + return r, nil } func openFile(input io.ReaderAt) (*File, error) { @@ -113,15 +141,30 @@ func fileRowGroupOf(f *File) RowGroup { } // NewRowGroupReader constructs a new Reader which reads rows from the RowGroup -// passed as argument. +// passed as an argument. This function panics if it encounters any error; use +// NewRowGroupReaderWithError to be able to handle failures more gracefully func NewRowGroupReader(rowGroup RowGroup, options ...ReaderOption) *Reader { - c, err := NewReaderConfig(options...) + r, err := NewRowGroupReaderWithError(rowGroup, options...) if err != nil { panic(err) } + return r +} + +// NewRowGroupReaderWithError constructs a new Reader which reads rows from the RowGroup +// passed as an argument. +func NewRowGroupReaderWithError(rowGroup RowGroup, options ...ReaderOption) (*Reader, error) { + c, err := NewReaderConfig(options...) + if err != nil { + return nil, err + } + if c.Schema != nil { - rowGroup = convertRowGroupTo(rowGroup, c.Schema) + rowGroup, err = convertRowGroupTo(rowGroup, c.Schema) + if err != nil { + return nil, err + } } r := &Reader{ @@ -132,21 +175,20 @@ func NewRowGroupReader(rowGroup RowGroup, options ...ReaderOption) *Reader { } r.read.init(r.file.schema, r.file.rowGroup) - return r + return r, nil } -func convertRowGroupTo(rowGroup RowGroup, schema *Schema) RowGroup { +func convertRowGroupTo(rowGroup RowGroup, schema *Schema) (RowGroup, error) { if rowGroupSchema := rowGroup.Schema(); !nodesAreEqual(schema, rowGroupSchema) { conv, err := Convert(schema, rowGroupSchema) if err != nil { - // TODO: this looks like something we should not be panicking on, - // but the current NewReader API does not offer a mechanism to - // report errors. - panic(err) + return nil, err } + rowGroup = ConvertRowGroup(rowGroup, conv) } - return rowGroup + + return rowGroup, nil } func sizeOf(r io.ReaderAt) (int64, error) { diff --git a/reader_go18.go b/reader_go18.go index bbbc225ce..530cf3701 100644 --- a/reader_go18.go +++ b/reader_go18.go @@ -3,6 +3,7 @@ package parquet import ( + "fmt" "io" "reflect" ) @@ -26,15 +27,37 @@ type GenericReader[T any] struct { // // If the option list may explicitly declare a schema, it must be compatible // with the schema generated from T. +// +// On an error such as an invalid parquet input, NewGenericReader will panic. If +// it would be preferable to inspect the error instead, use NewGenericReaderWithError func NewGenericReader[T any](input io.ReaderAt, options ...ReaderOption) *GenericReader[T] { - c, err := NewReaderConfig(options...) + r, err := NewGenericReaderWithError[T](input, options...) if err != nil { panic(err) } + return r +} + +// NewGenericReaderWithError is like NewReader but returns GenericReader[T] suited to write +// rows of Go type T. +// +// The type parameter T should be a map, struct, or any. Any other types will +// cause a panic at runtime. Type checking is a lot more effective when the +// generic parameter is a struct type, using map and interface types is somewhat +// similar to using a Writer. +// +// If the option list may explicitly declare a schema, it must be compatible +// with the schema generated from T. +func NewGenericReaderWithError[T any](input io.ReaderAt, options ...ReaderOption) (*GenericReader[T], error) { + c, err := NewReaderConfig(options...) + if err != nil { + return nil, err + } + f, err := openFile(input) if err != nil { - panic(err) + return nil, err } rowGroup := fileRowGroupOf(f) @@ -58,20 +81,44 @@ func NewGenericReader[T any](input io.ReaderAt, options ...ReaderOption) *Generi } if !nodesAreEqual(c.Schema, f.schema) { - r.base.file.rowGroup = convertRowGroupTo(r.base.file.rowGroup, c.Schema) + r.base.file.rowGroup, err = convertRowGroupTo(r.base.file.rowGroup, c.Schema) + if err != nil { + _ = r.Close() + return nil, err + } } r.base.read.init(r.base.file.schema, r.base.file.rowGroup) - r.read = readFuncOf[T](t, r.base.file.schema) - return r + + r.read, err = readFuncOf[T](t, r.base.file.schema) + if err != nil { + _ = r.Close() + return nil, err + } + + return r, nil } +// NewGenericRowGroupReader constructs a new GemericReader which reads rows from +// the RowGroup passed as an argument. This function panics if it encounters any error; +// use NewGenericRowGroupReaderWithError to be able to handle failures more gracefully func NewGenericRowGroupReader[T any](rowGroup RowGroup, options ...ReaderOption) *GenericReader[T] { - c, err := NewReaderConfig(options...) + r, err := NewGenericRowGroupReaderWithError[T](rowGroup, options...) if err != nil { panic(err) } + return r +} + +// NewGenericRowGroupReaderWithError constructs a new GemericReader which reads rows +// from the RowGrup passed as an argument. +func NewGenericRowGroupReaderWithError[T any](rowGroup RowGroup, options ...ReaderOption) (*GenericReader[T], error) { + c, err := NewReaderConfig(options...) + if err != nil { + return nil, err + } + t := typeOf[T]() if c.Schema == nil { if t == nil { @@ -91,12 +138,20 @@ func NewGenericRowGroupReader[T any](rowGroup RowGroup, options ...ReaderOption) } if !nodesAreEqual(c.Schema, rowGroup.Schema()) { - r.base.file.rowGroup = convertRowGroupTo(r.base.file.rowGroup, c.Schema) + r.base.file.rowGroup, err = convertRowGroupTo(r.base.file.rowGroup, c.Schema) + if err != nil { + return nil, err + } } r.base.read.init(r.base.file.schema, r.base.file.rowGroup) - r.read = readFuncOf[T](t, r.base.file.schema) - return r + + r.read, err = readFuncOf[T](t, r.base.file.schema) + if err != nil { + return nil, err + } + + return r, nil } func (r *GenericReader[T]) Reset() { @@ -160,21 +215,23 @@ var ( type readFunc[T any] func(*GenericReader[T], []T) (int, error) -func readFuncOf[T any](t reflect.Type, schema *Schema) readFunc[T] { +func readFuncOf[T any](t reflect.Type, schema *Schema) (readFunc[T], error) { if t == nil { - return (*GenericReader[T]).readRows + return (*GenericReader[T]).readRows, nil } + switch t.Kind() { case reflect.Interface, reflect.Map: - return (*GenericReader[T]).readRows + return (*GenericReader[T]).readRows, nil case reflect.Struct: - return (*GenericReader[T]).readRows + return (*GenericReader[T]).readRows, nil case reflect.Pointer: if e := t.Elem(); e.Kind() == reflect.Struct { - return (*GenericReader[T]).readRows + return (*GenericReader[T]).readRows, nil } } - panic("cannot create reader for values of type " + t.String()) + + return nil, fmt.Errorf("cannot create reader for values of type %s", t.String()) } diff --git a/reader_go18_test.go b/reader_go18_test.go index 64c3bc3be..fed61b7c9 100644 --- a/reader_go18_test.go +++ b/reader_go18_test.go @@ -71,7 +71,10 @@ func testGenericReaderRows[Row any](rows []Row) error { if err := writer.Close(); err != nil { return err } - reader := parquet.NewGenericReader[Row](bytes.NewReader(buffer.Bytes())) + reader, err := parquet.NewGenericReaderWithError[Row](bytes.NewReader(buffer.Bytes())) + if err != nil { + return err + } result := make([]Row, len(rows)) n, err := reader.Read(result) if err != nil && !errors.Is(err, io.EOF) { @@ -119,7 +122,11 @@ func benchmarkGenericReader[Row generator[Row]](b *testing.B) { buffer.Write(rows) b.Run("go1.17", func(b *testing.B) { - reader := parquet.NewRowGroupReader(buffer) + reader, err := parquet.NewRowGroupReaderWithError(buffer) + if err != nil { + b.Fatal(err) + } + benchmarkRowsPerSecond(b, func() int { for i := range rowbuf { if err := reader.Read(&rowbuf[i]); err != nil { @@ -135,7 +142,11 @@ func benchmarkGenericReader[Row generator[Row]](b *testing.B) { }) b.Run("go1.18", func(b *testing.B) { - reader := parquet.NewGenericRowGroupReader[Row](buffer) + reader, err := parquet.NewGenericRowGroupReaderWithError[Row](buffer) + if err != nil { + b.Fatal(err) + } + benchmarkRowsPerSecond(b, func() int { n, err := reader.Read(rowbuf) if err != nil { diff --git a/reader_test.go b/reader_test.go index ddd5f4819..71d9e2a8e 100644 --- a/reader_test.go +++ b/reader_test.go @@ -178,7 +178,10 @@ func TestReader(t *testing.T) { } file.Reset(buf.Bytes()) - r := parquet.NewReader(file, parquet.SchemaOf(test.model)) + r, err := parquet.NewReaderWithError(file, parquet.SchemaOf(test.model)) + if err != nil { + t.Fatal(err) + } for i, v := range rows { if err := r.Read(rowPtr.Interface()); err != nil { @@ -222,7 +225,11 @@ func BenchmarkReaderReadType(b *testing.B) { rowZero := reflect.Zero(rowType) rowValue := rowPtr.Elem() - r := parquet.NewReader(f) + r, err := parquet.NewReaderWithError(f) + if err != nil { + b.Fatal(err) + } + p := rowPtr.Interface() benchmarkRowsPerSecond(b, func() (n int) { @@ -262,7 +269,11 @@ func BenchmarkReaderReadRow(b *testing.B) { b.Fatal(err) } - r := parquet.NewReader(f) + r, err := parquet.NewReaderWithError(f) + if err != nil { + b.Fatal(err) + } + rowbuf := make([]parquet.Row, benchmarkRowsPerStep) benchmarkRowsPerSecond(b, func() int { @@ -299,7 +310,13 @@ func TestReaderReadSubset(t *testing.T) { t.Error(err) return false } - reader := parquet.NewReader(bytes.NewReader(buf.Bytes())) + + reader, err := parquet.NewReaderWithError(bytes.NewReader(buf.Bytes())) + if err != nil { + t.Error(err) + return false + } + for i := 0; ; i++ { row := Point2D{} err := reader.Read(&row) @@ -334,7 +351,11 @@ func TestReaderSeekToRow(t *testing.T) { t.Fatal(err) } - reader := parquet.NewReader(bytes.NewReader(buf.Bytes())) + reader, err := parquet.NewReaderWithError(bytes.NewReader(buf.Bytes())) + if err != nil { + t.Fatal(err) + } + for i := 0; i < 10; i++ { if err := reader.SeekToRow(int64(i)); err != nil { t.Fatalf("seek to row %d: %v", i, err) @@ -371,15 +392,19 @@ func TestSeekToRowNoDict(t *testing.T) { w.Close() // create reader - r := parquet.NewReader(bytes.NewReader(buf.Bytes())) + r, err := parquet.NewReaderWithError(bytes.NewReader(buf.Bytes())) + if err != nil { + t.Fatal(err) + } // read second row r.SeekToRow(1) row := new(rowType) - err := r.Read(row) + err = r.Read(row) if err != nil { t.Fatalf("reading row: %v", err) } + // fmt.Println(&sample, row) if *row != sample { t.Fatalf("read != write") @@ -405,12 +430,16 @@ func TestSeekToRowReadAll(t *testing.T) { w.Close() // create reader - r := parquet.NewReader(bytes.NewReader(buf.Bytes())) + r, err := parquet.NewReaderWithError(bytes.NewReader(buf.Bytes())) + if err != nil { + t.Fatal(err) + } // read first row r.SeekToRow(0) row := new(rowType) - err := r.Read(row) + + err = r.Read(row) if err != nil { t.Fatalf("reading row: %v", err) } @@ -446,12 +475,16 @@ func TestSeekToRowDictReadSecond(t *testing.T) { w.Close() // create reader - r := parquet.NewReader(bytes.NewReader(buf.Bytes())) + r, err := parquet.NewReaderWithError(bytes.NewReader(buf.Bytes())) + if err != nil { + t.Fatal(err) + } // read second row r.SeekToRow(1) row := new(rowType) - err := r.Read(row) + + err = r.Read(row) if err != nil { t.Fatalf("reading row: %v", err) } @@ -485,12 +518,16 @@ func TestSeekToRowDictReadMultiplePages(t *testing.T) { w.Close() // create reader - r := parquet.NewReader(bytes.NewReader(buf.Bytes())) + r, err := parquet.NewReaderWithError(bytes.NewReader(buf.Bytes())) + if err != nil { + t.Fatal(err) + } // read 11th row r.SeekToRow(10) row := new(rowType) - err := r.Read(row) + + err = r.Read(row) if err != nil { t.Fatalf("reading row: %v", err) }