Skip to content

Commit

Permalink
map/struct/list
Browse files Browse the repository at this point in the history
  • Loading branch information
yaqi-zhao committed Oct 26, 2023
1 parent 48a8297 commit c042e46
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 12 deletions.
3 changes: 2 additions & 1 deletion velox/dwio/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,8 @@ createAsyncDecompressor(
case CompressionKind::CompressionKind_GZIP:
return std::make_unique<GzipIAADecompressor>(bufferSize, streamDebugInfo);
default:
LOG(WARNING) << "Asynchronous mode not support for compression codec " << kind;
LOG(WARNING) << "Asynchronous mode not support for compression codec "
<< kind;
return nullptr;
}
return nullptr;
Expand Down
6 changes: 3 additions & 3 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ void PageReader::prefetchDictionary(const thrift::PageHeader& pageHeader) {
return;
}

const bool FOLLY_NONNULL PageReader::iaaDecompressGzip(
const bool PageReader::iaaDecompressGzip(
const char* pageData,
uint32_t compressedSize,
uint32_t uncompressedSize,
Expand Down Expand Up @@ -1218,7 +1218,7 @@ const bool FOLLY_NONNULL PageReader::iaaDecompressGzip(
return true;
}

const bool FOLLY_NONNULL PageReader::getDecompRes(int job_id) {
const bool PageReader::getDecompRes(int job_id) {
auto streamDebugInfo =
fmt::format("Page Reader: Stream {}", inputStream_->getName());
std::unique_ptr<dwio::common::compression::AsyncDecompressor> decompressor =
Expand All @@ -1228,7 +1228,7 @@ const bool FOLLY_NONNULL PageReader::getDecompRes(int job_id) {
}

PageReader::~PageReader() {
if (data_qpl_job_id >= 0 || dict_qpl_job_id >= 0) {
if (data_qpl_job_id > 0 || dict_qpl_job_id > 0) {
auto streamDebugInfo =
fmt::format("Page Reader: Stream {}", inputStream_->getName());
std::unique_ptr<dwio::common::compression::AsyncDecompressor> decompressor =
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ class PageReader {
void prefetchDataPageV1(const thrift::PageHeader& pageHeader);
void prefetchDataPageV2(const thrift::PageHeader& pageHeader);
void prefetchDictionary(const thrift::PageHeader& pageHeader);
const bool FOLLY_NONNULL getDecompRes(int job_id);
const bool getDecompRes(int job_id);
void prefetchNextPage();
bool seekToPreDecompPage(int64_t row);
const bool FOLLY_NONNULL iaaDecompressGzip(
const bool iaaDecompressGzip(
const char* FOLLY_NONNULL pageData,
uint32_t compressedSize,
uint32_t uncompressedSize,
Expand Down
23 changes: 23 additions & 0 deletions velox/dwio/parquet/reader/RepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ void skipUnreadLengthsAndNulls(dwio::common::SelectiveColumnReader& reader) {
}
}

void preDecompChild(
dwio::common::SelectiveColumnReader* reader,
uint32_t index) {
#ifdef VELOX_ENABLE_QPL
auto children = reader->children();
if (children.empty()) {
reader->formatData().as<ParquetData>().preDecompRowGroup(index);
return;
}
for (auto* child : children) {
preDecompChild(child, index);
}
#endif
}

void enqueueChildren(
dwio::common::SelectiveColumnReader* reader,
uint32_t index,
Expand Down Expand Up @@ -135,6 +150,10 @@ void MapColumnReader::enqueueRowGroup(
enqueueChildren(this, index, input);
}

void MapColumnReader::preDecompRowGroup(uint32_t index) {
preDecompChild(this, index);
}

void MapColumnReader::seekToRowGroup(uint32_t index) {
SelectiveMapColumnReader::seekToRowGroup(index);
readOffset_ = 0;
Expand Down Expand Up @@ -241,6 +260,10 @@ void ListColumnReader::enqueueRowGroup(
enqueueChildren(this, index, input);
}

void ListColumnReader::preDecompRowGroup(uint32_t index) {
preDecompChild(this, index);
}

void ListColumnReader::seekToRowGroup(uint32_t index) {
SelectiveListColumnReader::seekToRowGroup(index);
readOffset_ = 0;
Expand Down
5 changes: 2 additions & 3 deletions velox/dwio/parquet/reader/RepeatedColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
void seekToRowGroup(uint32_t index) override;

void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input);
void preDecompRowGroup(uint32_t index);

void read(
vector_size_t offset,
Expand Down Expand Up @@ -128,9 +129,7 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {

void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input);

bool preDecompRowGroup(uint32_t index) {
return true;
}
void preDecompRowGroup(uint32_t index);
void read(
vector_size_t offset,
RowSet rows,
Expand Down
6 changes: 3 additions & 3 deletions velox/dwio/parquet/reader/StructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ bool StructColumnReader::preDecompRowGroup(uint32_t index) {
return false;
}
if (auto structChild = dynamic_cast<StructColumnReader*>(child)) {
continue;
structChild->preDecompRowGroup(index);
} else if (auto listChild = dynamic_cast<ListColumnReader*>(child)) {
continue;
listChild->preDecompRowGroup(index);
} else if (auto mapChild = dynamic_cast<MapColumnReader*>(child)) {
continue;
mapChild->preDecompRowGroup(index);
} else {
needPreDecomp =
child->formatData().as<ParquetData>().preDecompRowGroup(index);
Expand Down

0 comments on commit c042e46

Please sign in to comment.