Skip to content

Commit

Permalink
Velox side Verax support
Browse files Browse the repository at this point in the history
  • Loading branch information
Orri Erling committed Dec 22, 2024
1 parent 61b0b39 commit 1403fe8
Show file tree
Hide file tree
Showing 28 changed files with 1,024 additions and 783 deletions.
20 changes: 20 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "velox/common/caching/ScanTracker.h"
#include "velox/common/future/VeloxPromise.h"
#include "velox/core/ExpressionEvaluator.h"
#include "velox/type/Subfield.h"
#include "velox/vector/ComplexVector.h"

#include <folly/Synchronized.h>
Expand Down Expand Up @@ -75,6 +76,10 @@ class ColumnHandle : public ISerializable {
public:
virtual ~ColumnHandle() = default;

virtual const std::string& name() const {
VELOX_UNSUPPORTED();
}

folly::dynamic serialize() const override;

protected:
Expand All @@ -98,6 +103,13 @@ class ConnectorTableHandle : public ISerializable {
return connectorId_;
}

/// Returns the connector-dependent table name. Used with
/// ConnectorMetadata. Implementations need to supply a definition
/// to work with metadata.
virtual const std::string& name() const {
VELOX_UNSUPPORTED();
}

virtual folly::dynamic serialize() const override;

protected:
Expand Down Expand Up @@ -404,6 +416,8 @@ class ConnectorQueryCtx {
bool selectiveNimbleReaderEnabled_{false};
};

class ConnectorMetadata;

class Connector {
public:
explicit Connector(const std::string& id) : id_(id) {}
Expand All @@ -425,6 +439,12 @@ class Connector {
return false;
}

/// Returns a ConnectorMetadata for accessing table
/// information.
virtual ConnectorMetadata* metadata() const {
VELOX_UNSUPPORTED();
}

virtual std::unique_ptr<DataSource> createDataSource(
const RowTypePtr& outputType,
const std::shared_ptr<ConnectorTableHandle>& tableHandle,
Expand Down
8 changes: 8 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,12 @@ bool HiveConfig::cacheNoRetention(const config::ConfigBase* session) const {
config_->get<bool>(kCacheNoRetention, /*defaultValue=*/false));
}

std::string HiveConfig::hiveLocalDataPath() const {
return config_->get<std::string>(kLocalDataPath, "");
}

std::string HiveConfig::hiveLocalFileFormat() const {
return config_->get<std::string>(kLocalFileFormat, "");
}

} // namespace facebook::velox::connector::hive
11 changes: 11 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ class HiveConfig {

static constexpr const char* kCacheNoRetention = "cache.no_retention";
static constexpr const char* kCacheNoRetentionSession = "cache.no_retention";
static constexpr const char* kLocalDataPath = "hive_local_data_path";
static constexpr const char* kLocalFileFormat = "hive_local_file_format";

InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const config::ConfigBase* session) const;
Expand Down Expand Up @@ -299,6 +301,15 @@ class HiveConfig {
/// locality.
bool cacheNoRetention(const config::ConfigBase* session) const;

/// Returns the file system path containing local data. If non-empty,
/// initializes LocalHiveConnectorMetadata to provide metadata for the tables
/// in the directory.
std::string hiveLocalDataPath() const;

/// Returns the name of the file format to use in interpreting the contents of
/// hiveLocalDataPath().
std::string hiveLocalFileFormat() const;

HiveConfig(std::shared_ptr<const config::ConfigBase> config) {
VELOX_CHECK_NOT_NULL(
config, "Config is null for HiveConfig initialization");
Expand Down
21 changes: 21 additions & 0 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/expression/FieldReference.h"

#include <boost/lexical_cast.hpp>
Expand All @@ -31,6 +32,14 @@ using namespace facebook::velox::dwrf;

namespace facebook::velox::connector::hive {

namespace {
std::vector<std::unique_ptr<HiveConnectorMetadataFactory>>&
hiveConnectorMetadataFactories() {
static std::vector<std::unique_ptr<HiveConnectorMetadataFactory>> factories;
return factories;
}
} // namespace

HiveConnector::HiveConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
Expand All @@ -52,6 +61,12 @@ HiveConnector::HiveConnector(
LOG(INFO) << "Hive connector " << connectorId()
<< " created with file handle cache disabled";
}
for (auto& factory : hiveConnectorMetadataFactories()) {
metadata_ = factory->create(this);
if (metadata_ != nullptr) {
break;
}
}
}

std::unique_ptr<DataSource> HiveConnector::createDataSource(
Expand Down Expand Up @@ -180,4 +195,10 @@ void registerHivePartitionFunctionSerDe() {
"HivePartitionFunctionSpec", HivePartitionFunctionSpec::deserialize);
}

bool registerHiveConnectorMetadataFactory(
std::unique_ptr<HiveConnectorMetadataFactory> factory) {
hiveConnectorMetadataFactories().push_back(std::move(factory));
return true;
}

} // namespace facebook::velox::connector::hive
23 changes: 23 additions & 0 deletions velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class HiveConnector : public Connector {
return true;
}

ConnectorMetadata* metadata() const override {
VELOX_CHECK_NOT_NULL(metadata_);
return metadata_.get();
}

std::unique_ptr<DataSource> createDataSource(
const RowTypePtr& outputType,
const std::shared_ptr<ConnectorTableHandle>& tableHandle,
Expand Down Expand Up @@ -79,6 +84,7 @@ class HiveConnector : public Connector {
const std::shared_ptr<HiveConfig> hiveConfig_;
FileHandleFactory fileHandleFactory_;
folly::Executor* executor_;
std::shared_ptr<ConnectorMetadata> metadata_;
};

class HiveConnectorFactory : public ConnectorFactory {
Expand Down Expand Up @@ -150,4 +156,21 @@ class HivePartitionFunctionSpec : public core::PartitionFunctionSpec {

void registerHivePartitionFunctionSerDe();

/// Hook for connecting metadata functions to a HiveConnector. Each registered
/// factory is called after initializing a HiveConnector until one of these
/// returns a ConnectorMetadata instance.
class HiveConnectorMetadataFactory {
public:
virtual ~HiveConnectorMetadataFactory() = default;

/// Returns a ConnectorMetadata to complete'hiveConnector' if 'this'
/// recognizes a data source, e.g. local file system or remote metadata
/// service associated to configs in 'hiveConnector'.
virtual std::shared_ptr<ConnectorMetadata> create(
HiveConnector* connector) = 0;
};

bool registerHiveConnectorMetadataFactory(
std::unique_ptr<HiveConnectorMetadataFactory>);

} // namespace facebook::velox::connector::hive
6 changes: 5 additions & 1 deletion velox/connectors/hive/TableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class HiveColumnHandle : public ColumnHandle {
hiveType_->toString());
}

const std::string& name() const {
const std::string& name() const override {
return name_;
}

Expand Down Expand Up @@ -135,6 +135,10 @@ class HiveTableHandle : public ConnectorTableHandle {
return tableName_;
}

const std::string& name() const override {
return tableName();
}

bool isFilterPushdownEnabled() const {
return filterPushdownEnabled_;
}
Expand Down
14 changes: 12 additions & 2 deletions velox/dwio/common/Statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,13 @@ class ColumnStatistics {
std::optional<uint64_t> valueCount,
std::optional<bool> hasNull,
std::optional<uint64_t> rawSize,
std::optional<uint64_t> size)
std::optional<uint64_t> size,
std::optional<int64_t> numDistinct = std::nullopt)
: valueCount_(valueCount),
hasNull_(hasNull),
rawSize_(rawSize),
size_(size) {}
size_(size),
numDistinct_(numDistinct) {}

virtual ~ColumnStatistics() = default;

Expand Down Expand Up @@ -123,6 +125,13 @@ class ColumnStatistics {
return size_;
}

std::optional<uint64_t> numDistinct() const {
return numDistinct_;
}
void setNumDistinct(int64_t count) {
numDistinct_ = count;
}

/**
* return string representation of this stats object
*/
Expand All @@ -145,6 +154,7 @@ class ColumnStatistics {
std::optional<bool> hasNull_;
std::optional<uint64_t> rawSize_;
std::optional<uint64_t> size_;
std::optional<uint64_t> numDistinct_;
};

/**
Expand Down
16 changes: 14 additions & 2 deletions velox/dwio/dwrf/test/TestStatisticsBuilderUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include <velox/common/memory/HashStringAllocator.h>
#include <velox/common/memory/Memory.h>
#include <cmath>

using namespace facebook::velox::dwio::common;
using namespace facebook::velox;
using namespace facebook::velox::dwrf;

StatisticsBuilderOptions options{16};

template <typename T>
std::shared_ptr<FlatVector<T>> makeFlatVector(
facebook::velox::memory::MemoryPool* pool,
Expand Down Expand Up @@ -58,8 +57,15 @@ class TestStatisticsBuilderUtils : public testing::Test {
memory::MemoryManager::testingSetInstance({});
}

void SetUp() override {
StatisticsBuilderOptions options{16};
}

const std::shared_ptr<memory::MemoryPool> pool_ =
memory::memoryManager()->addLeafPool();
std::unique_ptr<HashStringAllocator> allocator_ =
std::make_unique<HashStringAllocator>(pool_.get());
StatisticsBuilderOptions options{16, 100, true, allocator_.get()};
};

TEST_F(TestStatisticsBuilderUtils, addIntegerValues) {
Expand All @@ -85,6 +91,7 @@ TEST_F(TestStatisticsBuilderUtils, addIntegerValues) {
EXPECT_EQ(10, intStats->getMaximum().value());
EXPECT_EQ(1, intStats->getMinimum().value());
EXPECT_EQ(55, intStats->getSum());
EXPECT_EQ(10, intStats->numDistinct());
}

// add values with null
Expand All @@ -103,6 +110,7 @@ TEST_F(TestStatisticsBuilderUtils, addIntegerValues) {
EXPECT_EQ(10, intStats->getMaximum().value());
EXPECT_EQ(1, intStats->getMinimum().value());
EXPECT_EQ(106, intStats->getSum().value());
EXPECT_EQ(10, intStats->numDistinct());
}
}

Expand All @@ -129,6 +137,7 @@ TEST_F(TestStatisticsBuilderUtils, addDoubleValues) {
EXPECT_EQ(10, doubleStats->getMaximum().value());
EXPECT_EQ(1, doubleStats->getMinimum().value());
EXPECT_EQ(55, doubleStats->getSum());
EXPECT_EQ(10, doubleStats->numDistinct().value());
}

// add values with null
Expand All @@ -147,6 +156,7 @@ TEST_F(TestStatisticsBuilderUtils, addDoubleValues) {
EXPECT_EQ(10, doubleStats->getMaximum().value());
EXPECT_EQ(1, doubleStats->getMinimum().value());
EXPECT_EQ(106, doubleStats->getSum());
EXPECT_EQ(10, doubleStats->numDistinct().value());
}
}

Expand Down Expand Up @@ -174,6 +184,7 @@ TEST_F(TestStatisticsBuilderUtils, addStringValues) {
EXPECT_EQ("j", strStats->getMaximum().value());
EXPECT_EQ("a", strStats->getMinimum().value());
EXPECT_EQ(10, strStats->getTotalLength());
EXPECT_EQ(10, strStats->numDistinct());
}

// add values with null
Expand All @@ -191,6 +202,7 @@ TEST_F(TestStatisticsBuilderUtils, addStringValues) {
EXPECT_EQ("j", strStats->getMaximum().value());
EXPECT_EQ("a", strStats->getMinimum().value());
EXPECT_EQ(19, strStats->getTotalLength().value());
EXPECT_EQ(10, strStats->numDistinct());
}
}

Expand Down
1 change: 1 addition & 0 deletions velox/dwio/dwrf/writer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ velox_add_library(

velox_link_libraries(
velox_dwio_dwrf_writer
velox_common_hyperloglog
velox_dwio_common
velox_dwio_dwrf_common
velox_dwio_dwrf_utils
Expand Down
15 changes: 13 additions & 2 deletions velox/dwio/dwrf/writer/StatisticsBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ void StatisticsBuilder::merge(
// Merge size
mergeCount(size_, other.getSize());
}
if (hll_) {
if (auto* otherBuilder = dynamic_cast<const StatisticsBuilder*>(&other)) {
if (otherBuilder->hll_) {
hll_->mergeWith(*otherBuilder->hll_);
}
}
}
}

void StatisticsBuilder::toProto(proto::ColumnStatistics& stats) const {
Expand All @@ -115,8 +122,12 @@ std::unique_ptr<dwio::common::ColumnStatistics> StatisticsBuilder::build()
proto::ColumnStatistics stats;
toProto(stats);
StatsContext context{WriterVersion_CURRENT};
return buildColumnStatisticsFromProto(
ColumnStatisticsWrapper(&stats), context);
auto result =
buildColumnStatisticsFromProto(ColumnStatisticsWrapper(&stats), context);
if (hll_) {
result->setNumDistinct(hll_->cardinality());
}
return result;
}

std::unique_ptr<StatisticsBuilder> StatisticsBuilder::create(
Expand Down
Loading

0 comments on commit 1403fe8

Please sign in to comment.