Skip to content

Commit

Permalink
[Parquet] Recover from panic in file reader (#124)
Browse files Browse the repository at this point in the history
see apache/arrow#43607

original author: @don4get
  • Loading branch information
zeroshade authored Sep 13, 2024
1 parent f17963e commit 5ce5806
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 6 deletions.
1 change: 1 addition & 0 deletions ci/scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ go test "${test_args[@]}" -tags ${tags},noasm ./...
popd

export PARQUET_TEST_DATA=${1}/parquet-testing/data
export PARQUET_TEST_BAD_DATA=${1}/parquet-testing/bad_data
export ARROW_TEST_DATA=${1}/arrow-testing/data
pushd "${source_dir}/parquet"

Expand Down
5 changes: 2 additions & 3 deletions parquet/internal/utils/bit_packing_avx2_amd64.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ func _unpack32_avx2(in, out unsafe.Pointer, batchSize, nbits int) (num int)

func unpack32Avx2(in io.Reader, out []uint32, nbits int) int {
batch := len(out) / 32 * 32
if batch <= 0 {
n := batch * nbits / 8
if n <= 0 {
return 0
}

n := batch * nbits / 8

buffer := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buffer)
buffer.Reset()
Expand Down
5 changes: 2 additions & 3 deletions parquet/internal/utils/bit_packing_neon_arm64.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ func _unpack32_neon(in, out unsafe.Pointer, batchSize, nbits int) (num int)

func unpack32NEON(in io.Reader, out []uint32, nbits int) int {
batch := len(out) / 32 * 32
if batch <= 0 {
n := batch * nbits / 8
if n <= 0 {
return 0
}

n := batch * nbits / 8

buffer := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buffer)
buffer.Reset()
Expand Down
8 changes: 8 additions & 0 deletions parquet/pqarrow/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/arrio"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/schema"
Expand Down Expand Up @@ -332,6 +333,13 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []in
for i := 0; i < np; i++ {
go func() {
defer wg.Done()
defer func() {
if pErr := recover(); pErr != nil {
err := utils.FormatRecoveredError("panic while reading", pErr)
results <- resultPair{err: err}
}
}()

for {
select {
case r, ok := <-ch:
Expand Down
34 changes: 34 additions & 0 deletions parquet/pqarrow/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"testing"
Expand Down Expand Up @@ -373,3 +374,36 @@ func TestFileReaderColumnChunkBoundsErrors(t *testing.T) {
assert.ErrorContains(t, tooHighErr, fmt.Sprintf("there are only %d columns", schema.NumFields()))
}
}

func TestReadParquetFile(t *testing.T) {
dir := os.Getenv("PARQUET_TEST_BAD_DATA")
if dir == "" {
t.Skip("no path supplied with PARQUET_TEST_BAD_DATA")
}
assert.DirExists(t, dir)
filename := path.Join(dir, "ARROW-GH-43605.parquet")
ctx := context.TODO()

mem := memory.NewCheckedAllocator(memory.DefaultAllocator)

rdr, err := file.OpenParquetFile(
filename,
false,
file.WithReadProps(parquet.NewReaderProperties(mem)),
)
require.NoError(t, err)
defer func() {
if err2 := rdr.Close(); err2 != nil {
t.Errorf("unexpected error: %v", err2)
}
}()

arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{
Parallel: false,
BatchSize: 0,
}, mem)
require.NoError(t, err)

_, err = arrowRdr.ReadTable(ctx)
assert.NoError(t, err)
}

0 comments on commit 5ce5806

Please sign in to comment.