diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp index d1bbcb483e01..9479279ee54a 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp @@ -22,6 +22,8 @@ namespace facebook::velox::filesystems { std::string_view HdfsFileSystem::kScheme("hdfs://"); +std::string_view HdfsFileSystem::kViewfsScheme("viewfs://"); + class HdfsFileSystem::Impl { public: // Keep config here for possible use in the future. @@ -35,8 +37,15 @@ class HdfsFileSystem::Impl { // connect to HDFS with the builder object hdfsBuilder* builder = driver_->NewBuilder(); - driver_->BuilderSetNameNode(builder, endpoint.host.c_str()); - driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data())); + if (endpoint.isViewfs) { + // The default NameNode configuration will be used (from the XML + // configuration files). See: + // https://github.com/facebookincubator/velox/blob/main/velox/external/hdfs/hdfs.h#L289 + driver_->BuilderSetNameNode(builder, "default"); + } else { + driver_->BuilderSetNameNode(builder, endpoint.host.c_str()); + driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data())); + } driver_->BuilderSetForceNewInstance(builder); hdfsClient_ = driver_->BuilderConnect(builder); VELOX_CHECK_NOT_NULL( @@ -82,13 +91,6 @@ std::string HdfsFileSystem::name() const { std::unique_ptr HdfsFileSystem::openFileForRead( std::string_view path, const FileOptions& /*unused*/) { - if (path.find(kScheme) == 0) { - path.remove_prefix(kScheme.length()); - } - if (auto index = path.find('/')) { - path.remove_prefix(index); - } - return std::make_unique( impl_->hdfsShim(), impl_->hdfsClient(), path); } @@ -101,7 +103,7 @@ std::unique_ptr HdfsFileSystem::openFileForWrite( } bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) { - return filePath.find(kScheme) == 0; + return (filePath.find(kScheme) == 0) || (filePath.find(kViewfsScheme) == 0); } /// Gets hdfs endpoint from a given file path. If not found, fall back to get a @@ -109,6 +111,10 @@ bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) { HdfsServiceEndpoint HdfsFileSystem::getServiceEndpoint( const std::string_view filePath, const config::ConfigBase* config) { + if (filePath.find(kViewfsScheme) == 0) { + return HdfsServiceEndpoint{"viewfs", "", true}; + } + auto endOfIdentityInfo = filePath.find('/', kScheme.size()); std::string hdfsIdentity{ filePath.data(), kScheme.size(), endOfIdentityInfo - kScheme.size()}; diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h index 4c93b125fb2e..fed15034b3c1 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h @@ -22,8 +22,11 @@ class LibHdfsShim; } struct HdfsServiceEndpoint { - HdfsServiceEndpoint(const std::string& hdfsHost, const std::string& hdfsPort) - : host(hdfsHost), port(hdfsPort) {} + HdfsServiceEndpoint( + const std::string& hdfsHost, + const std::string& hdfsPort, + bool isViewfs = false) + : host(hdfsHost), port(hdfsPort), isViewfs(isViewfs) {} /// In HDFS HA mode, the identity is a nameservice ID with no port, e.g., /// the identity is nameservice_id for @@ -36,6 +39,7 @@ struct HdfsServiceEndpoint { const std::string host; const std::string port; + bool isViewfs; }; /** @@ -98,6 +102,8 @@ class HdfsFileSystem : public FileSystem { static std::string_view kScheme; + static std::string_view kViewfsScheme; + protected: class Impl; std::shared_ptr impl_; diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp index 33dcac02393d..a9c85854b12b 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp @@ -41,6 +41,7 @@ static const std::string localhost = "localhost"; static const std::string fullDestinationPath = "hdfs://" + localhost + ":" + hdfsPort + destinationPath; static const std::string simpleDestinationPath = "hdfs://" + destinationPath; +static const std::string viewfsDestinationPath = "viewfs://" + destinationPath; static const std::unordered_map configurationValues( {{"hive.hdfs.host", localhost}, {"hive.hdfs.port", hdfsPort}}); @@ -239,9 +240,8 @@ TEST_F(HdfsFileSystemTest, initializeFsWithEndpointInfoInFilePath) { TEST_F(HdfsFileSystemTest, fallbackToUseConfig) { auto config = std::make_shared( std::unordered_map(configurationValues)); - auto hdfsFileSystem = - filesystems::getFileSystem(simpleDestinationPath, config); - auto readFile = hdfsFileSystem->openFileForRead(simpleDestinationPath); + auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config); + auto readFile = hdfsFileSystem->openFileForRead(fullDestinationPath); readData(readFile.get()); } @@ -262,7 +262,7 @@ TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) { hdfsFileSystem->openFileForRead( "hdfs://localhost:7777/path/that/does/not/exist"), error_code::kFileNotFound, - "Unable to get file path info for file: /path/that/does/not/exist. got error: FileNotFoundException: Path /path/that/does/not/exist does not exist."); + "Unable to get file path info for file: hdfs://localhost:7777/path/that/does/not/exist. got error: FileNotFoundException: Path hdfs://localhost:7777/path/that/does/not/exist does not exist."); } TEST_F(HdfsFileSystemTest, missingHost) { @@ -331,6 +331,10 @@ TEST_F(HdfsFileSystemTest, schemeMatching) { auto fs = std::dynamic_pointer_cast( filesystems::getFileSystem(fullDestinationPath, nullptr)); ASSERT_TRUE(fs->isHdfsFile(fullDestinationPath)); + + fs = std::dynamic_pointer_cast( + filesystems::getFileSystem(viewfsDestinationPath, nullptr)); + ASSERT_TRUE(fs->isHdfsFile(viewfsDestinationPath)); } TEST_F(HdfsFileSystemTest, writeNotSupported) {