Skip to content

Commit

Permalink
feat: Support viewfs file system in velox (#11811)
Browse files Browse the repository at this point in the history
Summary:
Support [Viewfs ](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/ViewFs.html)file system in velox.

Pull Request resolved: #11811

Reviewed By: xiaoxmeng

Differential Revision: D67119187

Pulled By: kgpai

fbshipit-source-id: d3b7de723e46532d50bfd041b6af9ac64d84ed00
  • Loading branch information
JkSelf authored and facebook-github-bot committed Dec 17, 2024
1 parent 4c6ab14 commit e937db3
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 16 deletions.
26 changes: 16 additions & 10 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
namespace facebook::velox::filesystems {
std::string_view HdfsFileSystem::kScheme("hdfs://");

std::string_view HdfsFileSystem::kViewfsScheme("viewfs://");

class HdfsFileSystem::Impl {
public:
// Keep config here for possible use in the future.
Expand All @@ -35,8 +37,15 @@ class HdfsFileSystem::Impl {

// connect to HDFS with the builder object
hdfsBuilder* builder = driver_->NewBuilder();
driver_->BuilderSetNameNode(builder, endpoint.host.c_str());
driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
if (endpoint.isViewfs) {
// The default NameNode configuration will be used (from the XML
// configuration files). See:
// https://github.com/facebookincubator/velox/blob/main/velox/external/hdfs/hdfs.h#L289
driver_->BuilderSetNameNode(builder, "default");
} else {
driver_->BuilderSetNameNode(builder, endpoint.host.c_str());
driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
}
driver_->BuilderSetForceNewInstance(builder);
hdfsClient_ = driver_->BuilderConnect(builder);
VELOX_CHECK_NOT_NULL(
Expand Down Expand Up @@ -82,13 +91,6 @@ std::string HdfsFileSystem::name() const {
std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& /*unused*/) {
if (path.find(kScheme) == 0) {
path.remove_prefix(kScheme.length());
}
if (auto index = path.find('/')) {
path.remove_prefix(index);
}

return std::make_unique<HdfsReadFile>(
impl_->hdfsShim(), impl_->hdfsClient(), path);
}
Expand All @@ -101,14 +103,18 @@ std::unique_ptr<WriteFile> HdfsFileSystem::openFileForWrite(
}

bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) {
return filePath.find(kScheme) == 0;
return (filePath.find(kScheme) == 0) || (filePath.find(kViewfsScheme) == 0);
}

/// Gets hdfs endpoint from a given file path. If not found, fall back to get a
/// fixed one from configuration.
HdfsServiceEndpoint HdfsFileSystem::getServiceEndpoint(
const std::string_view filePath,
const config::ConfigBase* config) {
if (filePath.find(kViewfsScheme) == 0) {
return HdfsServiceEndpoint{"viewfs", "", true};
}

auto endOfIdentityInfo = filePath.find('/', kScheme.size());
std::string hdfsIdentity{
filePath.data(), kScheme.size(), endOfIdentityInfo - kScheme.size()};
Expand Down
10 changes: 8 additions & 2 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ class LibHdfsShim;
}

struct HdfsServiceEndpoint {
HdfsServiceEndpoint(const std::string& hdfsHost, const std::string& hdfsPort)
: host(hdfsHost), port(hdfsPort) {}
HdfsServiceEndpoint(
const std::string& hdfsHost,
const std::string& hdfsPort,
bool isViewfs = false)
: host(hdfsHost), port(hdfsPort), isViewfs(isViewfs) {}

/// In HDFS HA mode, the identity is a nameservice ID with no port, e.g.,
/// the identity is nameservice_id for
Expand All @@ -36,6 +39,7 @@ struct HdfsServiceEndpoint {

const std::string host;
const std::string port;
bool isViewfs;
};

/**
Expand Down Expand Up @@ -98,6 +102,8 @@ class HdfsFileSystem : public FileSystem {

static std::string_view kScheme;

static std::string_view kViewfsScheme;

protected:
class Impl;
std::shared_ptr<Impl> impl_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static const std::string localhost = "localhost";
static const std::string fullDestinationPath =
"hdfs://" + localhost + ":" + hdfsPort + destinationPath;
static const std::string simpleDestinationPath = "hdfs://" + destinationPath;
static const std::string viewfsDestinationPath = "viewfs://" + destinationPath;
static const std::unordered_map<std::string, std::string> configurationValues(
{{"hive.hdfs.host", localhost}, {"hive.hdfs.port", hdfsPort}});

Expand Down Expand Up @@ -239,9 +240,8 @@ TEST_F(HdfsFileSystemTest, initializeFsWithEndpointInfoInFilePath) {
TEST_F(HdfsFileSystemTest, fallbackToUseConfig) {
auto config = std::make_shared<const config::ConfigBase>(
std::unordered_map<std::string, std::string>(configurationValues));
auto hdfsFileSystem =
filesystems::getFileSystem(simpleDestinationPath, config);
auto readFile = hdfsFileSystem->openFileForRead(simpleDestinationPath);
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath);
readData(readFile.get());
}

Expand All @@ -262,7 +262,7 @@ TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) {
hdfsFileSystem->openFileForRead(
"hdfs://localhost:7777/path/that/does/not/exist"),
error_code::kFileNotFound,
"Unable to get file path info for file: /path/that/does/not/exist. got error: FileNotFoundException: Path /path/that/does/not/exist does not exist.");
"Unable to get file path info for file: hdfs://localhost:7777/path/that/does/not/exist. got error: FileNotFoundException: Path hdfs://localhost:7777/path/that/does/not/exist does not exist.");
}

TEST_F(HdfsFileSystemTest, missingHost) {
Expand Down Expand Up @@ -331,6 +331,10 @@ TEST_F(HdfsFileSystemTest, schemeMatching) {
auto fs = std::dynamic_pointer_cast<filesystems::HdfsFileSystem>(
filesystems::getFileSystem(fullDestinationPath, nullptr));
ASSERT_TRUE(fs->isHdfsFile(fullDestinationPath));

fs = std::dynamic_pointer_cast<filesystems::HdfsFileSystem>(
filesystems::getFileSystem(viewfsDestinationPath, nullptr));
ASSERT_TRUE(fs->isHdfsFile(viewfsDestinationPath));
}

TEST_F(HdfsFileSystemTest, writeNotSupported) {
Expand Down

0 comments on commit e937db3

Please sign in to comment.