Skip to content

Commit

Permalink
Add support OrcReader based on DwrfReader (facebookincubator#10194)
Browse files Browse the repository at this point in the history
Summary:
The ORC file format is used by many companies. Currently, the DWRF Reader in Velox can be used to read ORC files. This PR implements `OrcReaderFactory` based on `DwrfReader` and registers it in `HiveConnectorFactory#initialize()`. In this way, we can get a `Reader` that can read the ORC file format through `dwio::common::getReaderFactory(FileFormat::ORC)->createReader(..)`.

CC: Yuhta

Pull Request resolved: facebookincubator#10194

Reviewed By: Yuhta

Differential Revision: D58738201

Pulled By: kagamiori

fbshipit-source-id: e84f1cc049aeb994d002702f5e04336bd36559d7
  • Loading branch information
wypb authored and facebook-github-bot committed Jun 19, 2024
1 parent 11bdeb8 commit 1a50a8a
Show file tree
Hide file tree
Showing 17 changed files with 375 additions and 150 deletions.
1 change: 1 addition & 0 deletions velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ target_link_libraries(
velox_dwio_catalog_fbhive
velox_dwio_dwrf_reader
velox_dwio_dwrf_writer
velox_dwio_orc_reader
velox_dwio_parquet_reader
velox_dwio_parquet_writer
velox_file
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#endif
#include "velox/dwio/dwrf/reader/DwrfReader.h"
#include "velox/dwio/dwrf/writer/Writer.h"
#include "velox/dwio/orc/reader/OrcReader.h"
// Meta's buck build system needs this check.
#ifdef VELOX_ENABLE_PARQUET
#include "velox/dwio/parquet/RegisterParquetReader.h" // @manual
Expand Down Expand Up @@ -133,6 +134,7 @@ void HiveConnectorFactory::initialize() {
dwio::common::registerFileSinks();
dwrf::registerDwrfReaderFactory();
dwrf::registerDwrfWriterFactory();
orc::registerOrcReaderFactory();
// Meta's buck build system needs this check.
#ifdef VELOX_ENABLE_PARQUET
parquet::registerParquetReaderFactory();
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ target_link_libraries(
add_subdirectory(common)
add_subdirectory(catalog)
add_subdirectory(dwrf)
add_subdirectory(orc)
add_subdirectory(parquet)
7 changes: 7 additions & 0 deletions velox/dwio/dwrf/test/OrcTest.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ inline std::string getExampleFilePath(const std::string& fileName) {
"velox/dwio/dwrf/test", "examples/" + fileName);
}

std::unique_ptr<dwio::common::BufferedInput> createFileBufferedInput(
const std::string& path,
memory::MemoryPool& pool) {
return std::make_unique<dwio::common::BufferedInput>(
std::make_shared<LocalReadFile>(path), pool);
}

class MockStripeStreams : public StripeStreams {
public:
MockStripeStreams() : pool_{memory::memoryManager()->addLeafPool()} {};
Expand Down
150 changes: 0 additions & 150 deletions velox/dwio/dwrf/test/ReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <velox/buffer/Buffer.h>
#include "folly/Random.h"
#include "folly/executors/CPUThreadPoolExecutor.h"
#include "folly/lang/Assume.h"
Expand All @@ -28,7 +27,6 @@
#include "velox/dwio/dwrf/reader/DwrfReader.h"
#include "velox/dwio/dwrf/test/OrcTest.h"
#include "velox/dwio/dwrf/test/utils/E2EWriterTestUtil.h"
#include "velox/type/Type.h"
#include "velox/type/fbhive/HiveTypeParser.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
Expand Down Expand Up @@ -129,13 +127,6 @@ TEST_F(TestReader, testWriterVersions) {
"future - 99", writerVersionToString(static_cast<WriterVersion>(99)));
}

std::unique_ptr<BufferedInput> createFileBufferedInput(
const std::string& path,
memory::MemoryPool& pool) {
return std::make_unique<BufferedInput>(
std::make_shared<LocalReadFile>(path), pool);
}

// This relies on schema and data inside of our fm_small and fm_large orc files,
// and is not composeable with other schema/datas
void verifyFlatMapReading(
Expand Down Expand Up @@ -2029,67 +2020,6 @@ TEST_F(TestReader, testFlatmapAsMapFieldLifeCycle) {
testFlatmapAsMapFieldLifeCycle(pool(), schema, config, rng, batchSize, true);
}

TEST_F(TestReader, testOrcDecimal) {
const std::string simpleTest(getExampleFilePath("orc_decimal.orc"));
const std::shared_ptr<const RowType> expectedType =
std::dynamic_pointer_cast<const RowType>(
HiveTypeParser().parse("struct<i:decimal(38,6),j:decimal(9,2)>"));
dwio::common::ReaderOptions readerOpts{pool()};
// To make DwrfReader reads ORC file, setFileFormat to FileFormat::ORC
readerOpts.setFileFormat(dwio::common::FileFormat::ORC);
auto reader = DwrfReader::create(
createFileBufferedInput(simpleTest, readerOpts.memoryPool()), readerOpts);
// Check schema
auto rowType = reader->rowType();
EXPECT_TRUE(rowType->equivalent(*expectedType));

RowReaderOptions rowReaderOptions;
auto rowReader = reader->createRowReader(rowReaderOptions);

VectorPtr batch;
while (rowReader->next(500, batch)) {
auto rowVector = batch->as<RowVector>();
auto longDecimalCol = rowVector->childAt(0)->as<SimpleVector<int128_t>>();
auto shortDecimalCol = rowVector->childAt(1)->as<SimpleVector<int64_t>>();
auto longDecimalType = rowVector->type()->childAt(0);
auto shortDecimalType = rowVector->type()->childAt(1);
EXPECT_EQ(
DecimalUtil::toString(longDecimalCol->valueAt(0), longDecimalType),
"1242141234.123456");
EXPECT_EQ(
DecimalUtil::toString(shortDecimalCol->valueAt(0), shortDecimalType),
"321423.21");
}
}

TEST_F(TestReader, testOrcReaderSimple) {
const std::string simpleTest(
getExampleFilePath("TestStringDictionary.testRowIndex.orc"));
dwio::common::ReaderOptions readerOpts{pool()};
// To make DwrfReader reads ORC file, setFileFormat to FileFormat::ORC
readerOpts.setFileFormat(dwio::common::FileFormat::ORC);
auto reader = DwrfReader::create(
createFileBufferedInput(simpleTest, readerOpts.memoryPool()), readerOpts);

RowReaderOptions rowReaderOptions;
auto rowReader = reader->createRowReader(rowReaderOptions);

VectorPtr batch;
const std::string stringPrefix{"row "};
size_t rowNumber = 0;
while (rowReader->next(500, batch)) {
auto rowVector = batch->as<RowVector>();
auto strings = rowVector->childAt(0)->as<SimpleVector<StringView>>();
for (size_t i = 0; i < rowVector->size(); ++i) {
std::stringstream stream;
stream << std::setfill('0') << std::setw(6) << rowNumber;
EXPECT_EQ(stringPrefix + stream.str(), strings->valueAt(i).str());
rowNumber++;
}
}
EXPECT_EQ(rowNumber, 32768);
}

TEST_F(TestReader, testFooterWrapper) {
proto::Footer impl;
FooterWrapper wrapper(&impl);
Expand All @@ -2116,86 +2046,6 @@ TEST_F(TestReader, testOrcAndDwrfRowIndexStride) {
ASSERT_TRUE(dwrfFooterWrapper.hasRowIndexStride());
EXPECT_EQ(dwrfFooterWrapper.rowIndexStride(), 100);
}

TEST_F(TestReader, testOrcReaderComplexTypes) {
const std::string icebergOrc(getExampleFilePath("complextypes_iceberg.orc"));
const std::shared_ptr<const RowType> expectedType =
std::dynamic_pointer_cast<const RowType>(HiveTypeParser().parse("struct<\
id:bigint,int_array:array<int>,int_array_array:array<array<int>>,\
int_map:map<string,int>,int_map_array:array<map<string,int>>,\
nested_struct:struct<\
a:int,b:array<int>,c:struct<\
d:array<array<struct<\
e:int,f:string>>>>,\
g:map<string,struct<\
h:struct<\
i:array<double>>>>>>"));
dwio::common::ReaderOptions readerOpts{pool()};
readerOpts.setFileFormat(dwio::common::FileFormat::ORC);
auto reader = DwrfReader::create(
createFileBufferedInput(icebergOrc, readerOpts.memoryPool()), readerOpts);
auto rowType = reader->rowType();
EXPECT_TRUE(rowType->equivalent(*expectedType));
}

TEST_F(TestReader, testOrcReaderVarchar) {
const std::string varcharOrc(getExampleFilePath("orc_index_int_string.orc"));
dwio::common::ReaderOptions readerOpts{pool()};
readerOpts.setFileFormat(dwio::common::FileFormat::ORC);
auto reader = DwrfReader::create(
createFileBufferedInput(varcharOrc, readerOpts.memoryPool()), readerOpts);

RowReaderOptions rowReaderOptions;
auto rowReader = reader->createRowReader(rowReaderOptions);

VectorPtr batch;
int counter = 0;
while (rowReader->next(500, batch)) {
auto rowVector = batch->as<RowVector>();
auto ints = rowVector->childAt(0)->as<SimpleVector<int32_t>>();
auto strings = rowVector->childAt(1)->as<SimpleVector<StringView>>();
for (size_t i = 0; i < rowVector->size(); ++i) {
counter++;
EXPECT_EQ(counter, ints->valueAt(i));
std::stringstream stream;
stream << counter;
if (counter < 1000) {
stream << "a";
}
EXPECT_EQ(stream.str(), strings->valueAt(i).str());
}
}
EXPECT_EQ(counter, 6000);
}

TEST_F(TestReader, testOrcReaderDate) {
const std::string dateOrc(getExampleFilePath("TestOrcFile.testDate1900.orc"));
dwio::common::ReaderOptions readerOpts{pool()};
readerOpts.setFileFormat(dwio::common::FileFormat::ORC);
auto reader = DwrfReader::create(
createFileBufferedInput(dateOrc, readerOpts.memoryPool()), readerOpts);

RowReaderOptions rowReaderOptions;
auto rowReader = reader->createRowReader(rowReaderOptions);

VectorPtr batch;
int year = 1900;
while (rowReader->next(1000, batch)) {
auto rowVector = batch->as<RowVector>();
auto dates = rowVector->childAt(1)->as<SimpleVector<int32_t>>();

std::stringstream stream;
stream << year << "-12-25";
EXPECT_EQ(stream.str(), DATE()->toString(dates->valueAt(0)));

for (size_t i = 1; i < rowVector->size(); ++i) {
EXPECT_EQ(dates->valueAt(0), dates->valueAt(i));
}

year++;
}
}

namespace {

/*
Expand Down
Binary file removed velox/dwio/dwrf/test/examples/orc_decimal.orc
Binary file not shown.
19 changes: 19 additions & 0 deletions velox/dwio/orc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_subdirectory(reader)

if(${VELOX_BUILD_TESTING})
add_subdirectory(test)
endif()
17 changes: 17 additions & 0 deletions velox/dwio/orc/reader/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(velox_dwio_orc_reader OrcReader.cpp)

target_link_libraries(velox_dwio_orc_reader velox_dwio_dwrf_reader)
27 changes: 27 additions & 0 deletions velox/dwio/orc/reader/OrcReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/orc/reader/OrcReader.h"

namespace facebook::velox::orc {
void registerOrcReaderFactory() {
dwio::common::registerReaderFactory(std::make_shared<OrcReaderFactory>());
}

void unregisterOrcReaderFactory() {
dwio::common::unregisterReaderFactory(dwio::common::FileFormat::ORC);
}
} // namespace facebook::velox::orc
39 changes: 39 additions & 0 deletions velox/dwio/orc/reader/OrcReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/common/ReaderFactory.h"
#include "velox/dwio/dwrf/reader/DwrfReader.h"

namespace facebook::velox::orc {

class OrcReaderFactory : public dwio::common::ReaderFactory {
public:
OrcReaderFactory() : ReaderFactory(dwio::common::FileFormat::ORC) {}

std::unique_ptr<dwio::common::Reader> createReader(
std::unique_ptr<dwio::common::BufferedInput> input,
const dwio::common::ReaderOptions& options) override {
return velox::dwrf::DwrfReader::create(std::move(input), options);
}
};

void registerOrcReaderFactory();

void unregisterOrcReaderFactory();

} // namespace facebook::velox::orc
22 changes: 22 additions & 0 deletions velox/dwio/orc/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(velox_dwio_orc_reader_test ReaderTest.cpp)
add_test(
NAME velox_dwio_orc_reader_test
COMMAND velox_dwio_orc_reader_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})

target_link_libraries(velox_dwio_orc_reader_test velox_dwrf_test_utils
velox_dwio_common_test_utils gtest gtest_main gmock)
Loading

0 comments on commit 1a50a8a

Please sign in to comment.