diff --git a/src/parquet/ReadParquet.cpp b/src/parquet/ReadParquet.cpp index 244276d1b1..5eac6a1a81 100644 --- a/src/parquet/ReadParquet.cpp +++ b/src/parquet/ReadParquet.cpp @@ -1,12 +1,14 @@ #include "ReadParquet.h" #include "UtilParquet.h" +// Returns the number of elements read template int64_t readColumn(void* chpl_arr, int64_t startIdx, std::shared_ptr column_reader, bool hasNonFloatNulls, int64_t i, int64_t numElems, int64_t batchSize, int64_t values_read, bool* where_null_chpl) { int16_t definition_level; // nullable type and only reading single records in batch auto chpl_ptr = (ChplType*)chpl_arr; + int64_t num_read = 0; ReaderType* reader = static_cast(column_reader.get()); startIdx -= reader->Skip(startIdx); @@ -17,6 +19,7 @@ int64_t readColumn(void* chpl_arr, int64_t startIdx, std::shared_ptrReadBatch(batchSize, nullptr, nullptr, &chpl_ptr[i], &values_read); i+=values_read; + num_read += values_read; } } else { @@ -27,9 +30,10 @@ int64_t readColumn(void* chpl_arr, int64_t startIdx, std::shared_ptr @@ -38,6 +42,7 @@ int64_t readColumnDbFl(void* chpl_arr, int64_t startIdx, std::shared_ptr(column_reader.get()); startIdx -= reader->Skip(startIdx); @@ -53,8 +58,10 @@ int64_t readColumnDbFl(void* chpl_arr, int64_t startIdx, std::shared_ptr @@ -174,7 +181,7 @@ int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* co int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl, const char* colname, int64_t numElems, int64_t startIdx, int64_t batchSize, int64_t byteLength, bool hasNonFloatNulls, char** errMsg) { try { int64_t ty = cpp_getType(filename, colname, errMsg); - + std::unique_ptr parquet_reader = parquet::ParquetFileReader::OpenFile(filename, false); @@ -182,7 +189,7 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_ int num_row_groups = file_metadata->num_row_groups(); int64_t i = 0; - for (int r = 0; r < num_row_groups; r++) { + for (int r = 0; (r < num_row_groups) && (i < numElems); r++) { std::shared_ptr row_group_reader = parquet_reader->RowGroup(r); @@ -191,7 +198,6 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_ std::shared_ptr column_reader; auto idx = file_metadata -> schema() -> ColumnIndex(colname); - auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed if(idx < 0) { std::string dname(colname); @@ -200,6 +206,7 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_ *errMsg = strdup(msg.c_str()); return ARROWERROR; } + auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed column_reader = row_group_reader->Column(idx); diff --git a/tests/io_test.py b/tests/io_test.py index c1c9cfb7ed..beddce5882 100644 --- a/tests/io_test.py +++ b/tests/io_test.py @@ -317,6 +317,31 @@ def test_edge_case_read_write(self, par_test_base_tmp, dtype, comp): else: assert (np_edge_case == pq_arr.to_ndarray()).all() + @pytest.mark.parametrize("prob_size", pytest.prob_size) + def test_large_parquet_io(self,par_test_base_tmp,prob_size) : + + with tempfile.TemporaryDirectory(dir=par_test_base_tmp) as tmp_dirname: + filename = f"{tmp_dirname}/pq_test_large_parquet" + size = 2**21 + 8 # A problem had been detected with parquet files of > 2**21 entries + bool_array = np.array((size//2)*[True,False]).tolist() + flt_array = np.arange(size).astype(np.float64).tolist() + int_array = np.arange(size).astype(np.int64).tolist() + str_array = np.array(["a"+str(i) for i in np.arange(size)]).tolist() + arrays = [bool_array, int_array, flt_array, str_array] + tuples = list(zip(*arrays)) + names = ['first','second','third','fourth'] + index = pd.MultiIndex.from_tuples(tuples,names=names) + s = pd.Series(np.random.randn(size),index=index) + df = s.to_frame() + df.to_parquet(filename) + ak_df = ak.DataFrame(ak.read_parquet(filename)) + # This check is on all of the random numbers generated in s + assert np.all(ak_df.to_pandas().values[:,0] == s.values) + # This check is on all of the elements of the MultiIndex + for i in range(len(names)) : + assert np.all( df.index.get_level_values(names[i]).to_numpy() == ak_df[names[i]].to_ndarray() ) + + @pytest.mark.parametrize("dtype", NUMERIC_AND_STR_TYPES) def test_get_datasets(self, par_test_base_tmp, dtype): ak_arr = make_ak_arrays(10, dtype)