diff --git a/ci/scripts/test.sh b/ci/scripts/test.sh index 9edfab81..9607e303 100755 --- a/ci/scripts/test.sh +++ b/ci/scripts/test.sh @@ -65,6 +65,7 @@ go test "${test_args[@]}" -tags ${tags},noasm ./... popd export PARQUET_TEST_DATA=${1}/parquet-testing/data +export PARQUET_TEST_BAD_DATA=${1}/parquet-testing/bad_data export ARROW_TEST_DATA=${1}/arrow-testing/data pushd "${source_dir}/parquet" diff --git a/parquet/internal/utils/bit_packing_avx2_amd64.go b/parquet/internal/utils/bit_packing_avx2_amd64.go index 0455ccc5..5f1923fa 100644 --- a/parquet/internal/utils/bit_packing_avx2_amd64.go +++ b/parquet/internal/utils/bit_packing_avx2_amd64.go @@ -33,12 +33,11 @@ func _unpack32_avx2(in, out unsafe.Pointer, batchSize, nbits int) (num int) func unpack32Avx2(in io.Reader, out []uint32, nbits int) int { batch := len(out) / 32 * 32 - if batch <= 0 { + n := batch * nbits / 8 + if n <= 0 { return 0 } - n := batch * nbits / 8 - buffer := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buffer) buffer.Reset() diff --git a/parquet/internal/utils/bit_packing_neon_arm64.go b/parquet/internal/utils/bit_packing_neon_arm64.go index 09154e3e..580f9a1f 100755 --- a/parquet/internal/utils/bit_packing_neon_arm64.go +++ b/parquet/internal/utils/bit_packing_neon_arm64.go @@ -33,12 +33,11 @@ func _unpack32_neon(in, out unsafe.Pointer, batchSize, nbits int) (num int) func unpack32NEON(in io.Reader, out []uint32, nbits int) int { batch := len(out) / 32 * 32 - if batch <= 0 { + n := batch * nbits / 8 + if n <= 0 { return 0 } - n := batch * nbits / 8 - buffer := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buffer) buffer.Reset() diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go index 393215b0..c9a83b8c 100755 --- a/parquet/pqarrow/file_reader.go +++ b/parquet/pqarrow/file_reader.go @@ -28,6 +28,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/arrio" "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/internal/utils" "github.com/apache/arrow-go/v18/parquet" "github.com/apache/arrow-go/v18/parquet/file" "github.com/apache/arrow-go/v18/parquet/schema" @@ -332,6 +333,13 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []in for i := 0; i < np; i++ { go func() { defer wg.Done() + defer func() { + if pErr := recover(); pErr != nil { + err := utils.FormatRecoveredError("panic while reading", pErr) + results <- resultPair{err: err} + } + }() + for { select { case r, ok := <-ch: diff --git a/parquet/pqarrow/file_reader_test.go b/parquet/pqarrow/file_reader_test.go index 61100aca..9010927e 100644 --- a/parquet/pqarrow/file_reader_test.go +++ b/parquet/pqarrow/file_reader_test.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "strings" "testing" @@ -373,3 +374,36 @@ func TestFileReaderColumnChunkBoundsErrors(t *testing.T) { assert.ErrorContains(t, tooHighErr, fmt.Sprintf("there are only %d columns", schema.NumFields())) } } + +func TestReadParquetFile(t *testing.T) { + dir := os.Getenv("PARQUET_TEST_BAD_DATA") + if dir == "" { + t.Skip("no path supplied with PARQUET_TEST_BAD_DATA") + } + assert.DirExists(t, dir) + filename := path.Join(dir, "ARROW-GH-43605.parquet") + ctx := context.TODO() + + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + + rdr, err := file.OpenParquetFile( + filename, + false, + file.WithReadProps(parquet.NewReaderProperties(mem)), + ) + require.NoError(t, err) + defer func() { + if err2 := rdr.Close(); err2 != nil { + t.Errorf("unexpected error: %v", err2) + } + }() + + arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{ + Parallel: false, + BatchSize: 0, + }, mem) + require.NoError(t, err) + + _, err = arrowRdr.ReadTable(ctx) + assert.NoError(t, err) +}