Skip to content

Commit

Permalink
Back out "Add runner for local distributed execution" (#11475)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11475

Reverting PR : #11261 due to broken OSS CI.

Original commit changeset: c882d6abe7a8

Original Phabricator Diff: D64694474

Reviewed By: xiaoxmeng

Differential Revision: D65631783

fbshipit-source-id: 6cf86c88d7c81bbfa5f93ed39011700718fc9fe2
  • Loading branch information
Krishna Pai authored and facebook-github-bot committed Nov 8, 2024
1 parent e0939ea commit c5232cd
Show file tree
Hide file tree
Showing 28 changed files with 120 additions and 2,170 deletions.
8 changes: 5 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,11 @@ if(${VELOX_BUILD_TESTING})

# 'gRPC_CARES_PROVIDER' is set as 'package', which means c-ares library needs
# to be installed on the system, instead of being built by gRPC.
# set_source(c-ares) resolve_dependency(c-ares)
set_source(c-ares)
resolve_dependency(c-ares)

# set_source(gRPC) resolve_dependency(gRPC)
set_source(gRPC)
resolve_dependency(gRPC)
endif()

if(VELOX_ENABLE_REMOTE_FUNCTIONS)
Expand Down Expand Up @@ -604,7 +606,7 @@ endif()
set_source(xsimd)
resolve_dependency(xsimd 10.0.0)

set(stemmer_SOURCE BUNDLED)
set_source(stemmer)
resolve_dependency(stemmer)

if(VELOX_BUILD_TESTING)
Expand Down
1 change: 0 additions & 1 deletion velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ add_subdirectory(connectors)

if(${VELOX_ENABLE_EXEC})
add_subdirectory(exec)
add_subdirectory(runner)
endif()

if(${VELOX_ENABLE_DUCKDB})
Expand Down
108 changes: 0 additions & 108 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,112 +109,4 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
static void registerSerDe();
};

class HiveConnectorSplitBuilder {
public:
explicit HiveConnectorSplitBuilder(std::string filePath)
: filePath_{std::move(filePath)} {
infoColumns_["$path"] = filePath_;
}

HiveConnectorSplitBuilder& start(uint64_t start) {
start_ = start;
return *this;
}

HiveConnectorSplitBuilder& length(uint64_t length) {
length_ = length;
return *this;
}

HiveConnectorSplitBuilder& splitWeight(int64_t splitWeight) {
splitWeight_ = splitWeight;
return *this;
}

HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) {
fileFormat_ = format;
return *this;
}

HiveConnectorSplitBuilder& infoColumn(
const std::string& name,
const std::string& value) {
infoColumns_.emplace(std::move(name), std::move(value));
return *this;
}

HiveConnectorSplitBuilder& partitionKey(
std::string name,
std::optional<std::string> value) {
partitionKeys_.emplace(std::move(name), std::move(value));
return *this;
}

HiveConnectorSplitBuilder& tableBucketNumber(int32_t bucket) {
tableBucketNumber_ = bucket;
infoColumns_["$bucket"] = std::to_string(bucket);
return *this;
}

HiveConnectorSplitBuilder& customSplitInfo(
const std::unordered_map<std::string, std::string>& customSplitInfo) {
customSplitInfo_ = customSplitInfo;
return *this;
}

HiveConnectorSplitBuilder& extraFileInfo(
const std::shared_ptr<std::string>& extraFileInfo) {
extraFileInfo_ = extraFileInfo;
return *this;
}

HiveConnectorSplitBuilder& serdeParameters(
const std::unordered_map<std::string, std::string>& serdeParameters) {
serdeParameters_ = serdeParameters;
return *this;
}

HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) {
connectorId_ = connectorId;
return *this;
}

HiveConnectorSplitBuilder& fileProperties(FileProperties fileProperties) {
fileProperties_ = fileProperties;
return *this;
}

std::shared_ptr<connector::hive::HiveConnectorSplit> build() const {
return std::make_shared<connector::hive::HiveConnectorSplit>(
connectorId_,
filePath_,
fileFormat_,
start_,
length_,
partitionKeys_,
tableBucketNumber_,
customSplitInfo_,
extraFileInfo_,
serdeParameters_,
splitWeight_,
infoColumns_,
fileProperties_);
}

private:
const std::string filePath_;
dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF};
uint64_t start_{0};
uint64_t length_{std::numeric_limits<uint64_t>::max()};
std::unordered_map<std::string, std::optional<std::string>> partitionKeys_;
std::optional<int32_t> tableBucketNumber_;
std::unordered_map<std::string, std::string> customSplitInfo_ = {};
std::shared_ptr<std::string> extraFileInfo_ = {};
std::unordered_map<std::string, std::string> serdeParameters_ = {};
std::unordered_map<std::string, std::string> infoColumns_ = {};
std::string connectorId_;
int64_t splitWeight_{0};
std::optional<FileProperties> fileProperties_;
};

} // namespace facebook::velox::connector::hive
1 change: 0 additions & 1 deletion velox/connectors/hive/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ add_executable(
HiveConnectorSerDeTest.cpp
HivePartitionFunctionTest.cpp
HivePartitionUtilTest.cpp
HiveSplitTest.cpp
PartitionIdGeneratorTest.cpp
TableHandleTest.cpp)
add_test(velox_hive_connector_test velox_hive_connector_test)
Expand Down
64 changes: 0 additions & 64 deletions velox/connectors/hive/tests/HiveSplitTest.cpp

This file was deleted.

30 changes: 14 additions & 16 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class TableScanTest : public virtual HiveConnectorTestBase {
const std::string& filePath,
const TypePtr& partitionType,
const std::optional<std::string>& partitionValue) {
auto split = exec::test::HiveConnectorSplitBuilder(filePath)
auto split = HiveConnectorSplitBuilder(filePath)
.partitionKey("pkey", partitionValue)
.build();
auto outputType =
Expand Down Expand Up @@ -411,7 +411,7 @@ TEST_F(TableScanTest, partitionKeyAlias) {
{"a", regularColumn("c0", BIGINT())},
{"ds_alias", partitionKey("ds", VARCHAR())}};

auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
auto split = HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("ds", "2021-12-02")
.build();

Expand Down Expand Up @@ -1806,8 +1806,7 @@ TEST_F(TableScanTest, splitOffsetAndLength) {
}

TEST_F(TableScanTest, fileNotFound) {
auto split =
exec::test::HiveConnectorSplitBuilder("/path/to/nowhere.orc").build();
auto split = HiveConnectorSplitBuilder("/path/to/nowhere.orc").build();
auto assertMissingFile = [&](bool ignoreMissingFiles) {
AssertQueryBuilder(tableScanNode())
.connectorSessionProperty(
Expand All @@ -1830,7 +1829,7 @@ TEST_F(TableScanTest, validFileNoData) {

auto filePath = facebook::velox::test::getDataFilePath(
"velox/exec/tests", "data/emptyPresto.dwrf");
auto split = exec::test::HiveConnectorSplitBuilder(filePath)
auto split = HiveConnectorSplitBuilder(filePath)
.start(0)
.length(fs::file_size(filePath) / 2)
.build();
Expand Down Expand Up @@ -1950,7 +1949,7 @@ TEST_F(TableScanTest, partitionedTableDateKey) {

// Test partition filter on date column.
{
auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
auto split = HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("pkey", partitionValue)
.build();
auto outputType = ROW({"pkey", "c0", "c1"}, {DATE(), BIGINT(), DOUBLE()});
Expand Down Expand Up @@ -2852,10 +2851,9 @@ TEST_F(TableScanTest, bucket) {
writeToFile(filePaths[i]->getPath(), rowVector);
rowVectors.emplace_back(rowVector);

splits.emplace_back(
exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucket)
.build());
splits.emplace_back(HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucket)
.build());
}

createDuckDbTable(rowVectors);
Expand All @@ -2879,7 +2877,7 @@ TEST_F(TableScanTest, bucket) {

for (int i = 0; i < buckets.size(); ++i) {
int bucketValue = buckets[i];
auto hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath())
auto hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucketValue)
.build();

Expand All @@ -2899,7 +2897,7 @@ TEST_F(TableScanTest, bucket) {

// Filter on bucket column, but don't project it out
auto rowTypes = ROW({"c0", "c1"}, {INTEGER(), BIGINT()});
hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath())
hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucketValue)
.build();
op = PlanBuilder()
Expand Down Expand Up @@ -4143,7 +4141,7 @@ TEST_F(TableScanTest, reuseRowVector) {
.tableScan(rowType, {}, "c0 < 5")
.project({"c1.c0"})
.planNode();
auto split = exec::test::HiveConnectorSplitBuilder(file->getPath()).build();
auto split = HiveConnectorSplitBuilder(file->getPath()).build();
auto expected = makeRowVector(
{makeFlatVector<int32_t>(10, [](auto i) { return i % 5; })});
AssertQueryBuilder(plan).splits({split, split}).assertResults(expected);
Expand Down Expand Up @@ -4724,7 +4722,7 @@ TEST_F(TableScanTest, varbinaryPartitionKey) {
{"a", regularColumn("c0", BIGINT())},
{"ds_alias", partitionKey("ds", VARBINARY())}};

auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
auto split = HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("ds", "2021-12-02")
.build();

Expand Down Expand Up @@ -4763,7 +4761,7 @@ TEST_F(TableScanTest, timestampPartitionKey) {
ColumnHandleMap assignments = {{"t", partitionKey("t", TIMESTAMP())}};
std::vector<std::shared_ptr<connector::ConnectorSplit>> splits;
for (auto& t : inputs) {
splits.push_back(exec::test::HiveConnectorSplitBuilder(filePath->getPath())
splits.push_back(HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("t", t)
.build());
}
Expand All @@ -4782,7 +4780,7 @@ TEST_F(TableScanTest, partitionKeyNotMatchPartitionKeysHandle) {
writeToFile(filePath->getPath(), vectors);
createDuckDbTable(vectors);

auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
auto split = HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("ds", "2021-12-02")
.build();

Expand Down
3 changes: 0 additions & 3 deletions velox/exec/tests/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ add_library(
AssertQueryBuilder.cpp
ArbitratorTestUtil.cpp
Cursor.cpp
DistributedPlanBuilder.cpp
HiveConnectorTestBase.cpp
LocalExchangeSource.cpp
LocalRunnerTestBase.cpp
OperatorTestBase.cpp
PlanBuilder.cpp
QueryAssertions.cpp
Expand All @@ -37,7 +35,6 @@ add_library(

target_link_libraries(
velox_exec_test_lib
velox_local_runner
velox_vector_test_lib
velox_temp_path
velox_core
Expand Down
Loading

0 comments on commit c5232cd

Please sign in to comment.