Skip to content

Commit

Permalink
Fixes zeroes from reading large Parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
drculhane committed Dec 31, 2024
1 parent a77be87 commit ac9c597
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
17 changes: 12 additions & 5 deletions src/parquet/ReadParquet.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "ReadParquet.h"
#include "UtilParquet.h"

// Returns the number of elements read
template <typename ReaderType, typename ChplType>
int64_t readColumn(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
bool hasNonFloatNulls, int64_t i, int64_t numElems, int64_t batchSize,
int64_t values_read, bool* where_null_chpl) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (ChplType*)chpl_arr;
int64_t num_read = 0;
ReaderType* reader =
static_cast<ReaderType*>(column_reader.get());
startIdx -= reader->Skip(startIdx);
Expand All @@ -17,6 +19,7 @@ int64_t readColumn(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet::Co
batchSize = numElems - i;
(void)reader->ReadBatch(batchSize, nullptr, nullptr, &chpl_ptr[i], &values_read);
i+=values_read;
num_read += values_read;
}
}
else {
Expand All @@ -27,9 +30,10 @@ int64_t readColumn(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet::Co
where_null_chpl[i] = true;
}
i++;
num_read++;
}
}
return i;
return num_read;
}

template <typename ReaderType, typename ChplType, typename PqType>
Expand All @@ -38,6 +42,7 @@ int64_t readColumnDbFl(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet
int64_t values_read, bool* where_null_chpl) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (ChplType*)chpl_arr;
int64_t num_read = 0;
ReaderType* reader =
static_cast<ReaderType*>(column_reader.get());
startIdx -= reader->Skip(startIdx);
Expand All @@ -53,8 +58,10 @@ int64_t readColumnDbFl(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet
chpl_ptr[i] = NAN;
}
i++;
num_read++;
}
return i;
//return i;
return num_read;
}

template <typename ReaderType, typename ChplType, typename PqType>
Expand Down Expand Up @@ -174,15 +181,15 @@ int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* co
int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl, const char* colname, int64_t numElems, int64_t startIdx, int64_t batchSize, int64_t byteLength, bool hasNonFloatNulls, char** errMsg) {
try {
int64_t ty = cpp_getType(filename, colname, errMsg);

std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::OpenFile(filename, false);

std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
int num_row_groups = file_metadata->num_row_groups();

int64_t i = 0;
for (int r = 0; r < num_row_groups; r++) {
for (int r = 0; (r < num_row_groups) && (i < numElems); r++) {
std::shared_ptr<parquet::RowGroupReader> row_group_reader =
parquet_reader->RowGroup(r);

Expand All @@ -191,7 +198,6 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_
std::shared_ptr<parquet::ColumnReader> column_reader;

auto idx = file_metadata -> schema() -> ColumnIndex(colname);
auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed

if(idx < 0) {
std::string dname(colname);
Expand All @@ -200,6 +206,7 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_
*errMsg = strdup(msg.c_str());
return ARROWERROR;
}
auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed

column_reader = row_group_reader->Column(idx);

Expand Down
25 changes: 25 additions & 0 deletions tests/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,31 @@ def test_edge_case_read_write(self, par_test_base_tmp, dtype, comp):
else:
assert (np_edge_case == pq_arr.to_ndarray()).all()

@pytest.mark.parametrize("prob_size", pytest.prob_size)
def test_large_parquet_io(self,par_test_base_tmp,prob_size) :

with tempfile.TemporaryDirectory(dir=par_test_base_tmp) as tmp_dirname:
filename = f"{tmp_dirname}/pq_test_large_parquet"
size = 2**21 + 8 # A problem had been detected with parquet files of > 2**21 entries
bool_array = np.array((size//2)*[True,False]).tolist()
flt_array = np.arange(size).astype(np.float64).tolist()
int_array = np.arange(size).astype(np.int64).tolist()
str_array = np.array(["a"+str(i) for i in np.arange(size)]).tolist()
arrays = [bool_array, int_array, flt_array, str_array]
tuples = list(zip(*arrays))
names = ['first','second','third','fourth']
index = pd.MultiIndex.from_tuples(tuples,names=names)
s = pd.Series(np.random.randn(size),index=index)
df = s.to_frame()
df.to_parquet(filename)
ak_df = ak.DataFrame(ak.read_parquet(filename))
# This check is on all of the random numbers generated in s
assert np.all(ak_df.to_pandas().values[:,0] == s.values)
# This check is on all of the elements of the MultiIndex
for i in range(len(names)) :
assert np.all( df.index.get_level_values(names[i]).to_numpy() == ak_df[names[i]].to_ndarray() )


@pytest.mark.parametrize("dtype", NUMERIC_AND_STR_TYPES)
def test_get_datasets(self, par_test_base_tmp, dtype):
ak_arr = make_ak_arrays(10, dtype)
Expand Down

0 comments on commit ac9c597

Please sign in to comment.