diff --git a/.github/workflows/linux-build.yml b/.github/workflows/linux-build.yml
index b4e26478702b..b897a345f562 100644
--- a/.github/workflows/linux-build.yml
+++ b/.github/workflows/linux-build.yml
@@ -134,6 +134,7 @@ jobs:
LIBHDFS3_CONF: "${{ github.workspace }}/scripts/hdfs-client.xml"
working-directory: _build/release
run: |
+ export CLASSPATH=`/usr/local/hadoop/bin/hdfs classpath --glob`
ctest -j 8 --output-on-failure --no-tests=error
ubuntu-debug:
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2408c8c89d05..1dd401d3e147 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -259,11 +259,9 @@ if(VELOX_ENABLE_ABFS)
endif()
if(VELOX_ENABLE_HDFS)
- find_library(
- LIBHDFS3
- NAMES libhdfs3.so libhdfs3.dylib
- HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
- add_definitions(-DVELOX_ENABLE_HDFS3)
+ add_definitions(-DVELOX_ENABLE_HDFS)
+ # JVM libhdfs requires arrow dependency.
+ set(VELOX_ENABLE_ARROW ON)
endif()
if(VELOX_ENABLE_PARQUET)
diff --git a/NOTICE.txt b/NOTICE.txt
index 58655beb3ca7..8b812aa41ab2 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -9,3 +9,103 @@ This product includes software from the QT project (BSD, 3-clause).
This product includes software from HowardHinnant's date library (MIT License).
* https://github.com/HowardHinnant/date/tree/master
+
+This product includes software from the The Arrow project.
+* https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/arrow/io/hdfs_internal.h
+* https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/arrow/io/hdfs_internal.cc
+Which contain the following NOTICE file:
+-------
+ Apache Arrow
+ Copyright 2016-2024 The Apache Software Foundation
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+ This product includes software from the SFrame project (BSD, 3-clause).
+ * Copyright (C) 2015 Dato, Inc.
+ * Copyright (c) 2009 Carnegie Mellon University.
+ This product includes software from the Feather project (Apache 2.0)
+ https://github.com/wesm/feather
+ This product includes software from the DyND project (BSD 2-clause)
+ https://github.com/libdynd
+ This product includes software from the LLVM project
+ * distributed under the University of Illinois Open Source
+ This product includes software from the google-lint project
+ * Copyright (c) 2009 Google Inc. All rights reserved.
+ This product includes software from the mman-win32 project
+ * Copyright https://code.google.com/p/mman-win32/
+ * Licensed under the MIT License;
+ This product includes software from the LevelDB project
+ * Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+ * Use of this source code is governed by a BSD-style license that can be
+ * Moved from Kudu http://github.com/cloudera/kudu
+ This product includes software from the CMake project
+ * Copyright 2001-2009 Kitware, Inc.
+ * Copyright 2012-2014 Continuum Analytics, Inc.
+ * All rights reserved.
+ This product includes software from https://github.com/matthew-brett/multibuild (BSD 2-clause)
+ * Copyright (c) 2013-2016, Matt Terry and Matthew Brett; all rights reserved.
+ This product includes software from the Ibis project (Apache 2.0)
+ * Copyright (c) 2015 Cloudera, Inc.
+ * https://github.com/cloudera/ibis
+ This product includes software from Dremio (Apache 2.0)
+ * Copyright (C) 2017-2018 Dremio Corporation
+ * https://github.com/dremio/dremio-oss
+ This product includes software from Google Guava (Apache 2.0)
+ * Copyright (C) 2007 The Guava Authors
+ * https://github.com/google/guava
+ This product include software from CMake (BSD 3-Clause)
+ * CMake - Cross Platform Makefile Generator
+ * Copyright 2000-2019 Kitware, Inc. and Contributors
+ The web site includes files generated by Jekyll.
+ --------------------------------------------------------------------------------
+ This product includes code from Apache Kudu, which includes the following in
+ its NOTICE file:
+ Apache Kudu
+ Copyright 2016 The Apache Software Foundation
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+ Portions of this software were developed at
+ Cloudera, Inc (http://www.cloudera.com/).
+ --------------------------------------------------------------------------------
+ This product includes code from Apache ORC, which includes the following in
+ its NOTICE file:
+ Apache ORC
+ Copyright 2013-2019 The Apache Software Foundation
+ This product includes software developed by The Apache Software
+ Foundation (http://www.apache.org/).
+ This product includes software developed by Hewlett-Packard:
+ (c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P
+-------
+
+This product includes software from the The Hadoop project.
+* https://github.com/apache/hadoop/blob/release-3.3.0-RC0/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h
+Which contains the following NOTICE file:
+----
+ Apache Hadoop
+ Copyright 2006 and onwards The Apache Software Foundation.
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+ Export Control Notice
+ ---------------------
+ This distribution includes cryptographic software. The country in
+ which you currently reside may have restrictions on the import,
+ possession, use, and/or re-export to another country, of
+ encryption software. BEFORE using any encryption software, please
+ check your country's laws, regulations and policies concerning the
+ import, possession, or use, and re-export of encryption software, to
+ see if this is permitted. See for more
+ information.
+ The U.S. Government Department of Commerce, Bureau of Industry and
+ Security (BIS), has classified this software as Export Commodity
+ Control Number (ECCN) 5D002.C.1, which includes information security
+ software using or performing cryptographic functions with asymmetric
+ algorithms. The form and manner of this Apache Software Foundation
+ distribution makes it eligible for export under the License Exception
+ ENC Technology Software Unrestricted (TSU) exception (see the BIS
+ Export Administration Regulations, Section 740.13) for both object
+ code and source code.
+ The following provides more details on the included cryptographic software:
+ This software uses the SSL libraries from the Jetty project written
+ by mortbay.org.
+ Hadoop Yarn Server Web Proxy uses the BouncyCastle Java
+ cryptography APIs written by the Legion of the Bouncy Castle Inc.
+----
diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt
index 00c969ccce7b..06ae8bf1c053 100644
--- a/velox/CMakeLists.txt
+++ b/velox/CMakeLists.txt
@@ -24,6 +24,7 @@ add_subdirectory(row)
add_subdirectory(flag_definitions)
add_subdirectory(external/date)
add_subdirectory(external/md5)
+add_subdirectory(external/hdfs)
#
# examples depend on expression
diff --git a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt
index 6c1e84aec404..44aa7be3489c 100644
--- a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt
+++ b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt
@@ -23,7 +23,12 @@ if(VELOX_ENABLE_HDFS)
HdfsFileSystem.cpp
HdfsReadFile.cpp
HdfsWriteFile.cpp)
- velox_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd)
+ velox_link_libraries(
+ velox_hdfs
+ velox_external_hdfs
+ velox_dwio_common
+ Folly::folly
+ xsimd)
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
index 4e7b9ddc0ec5..9d3f6e30a67a 100644
--- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
+++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
@@ -14,11 +14,11 @@
* limitations under the License.
*/
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
-#include
#include
#include "velox/common/config/Config.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h"
+#include "velox/external/hdfs/ArrowHdfsInternal.h"
namespace facebook::velox::filesystems {
std::string_view HdfsFileSystem::kScheme("hdfs://");
@@ -29,21 +29,27 @@ class HdfsFileSystem::Impl {
explicit Impl(
const config::ConfigBase* config,
const HdfsServiceEndpoint& endpoint) {
- auto builder = hdfsNewBuilder();
- hdfsBuilderSetNameNode(builder, endpoint.host.c_str());
- hdfsBuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
- hdfsClient_ = hdfsBuilderConnect(builder);
- hdfsFreeBuilder(builder);
+ auto status = filesystems::arrow::io::internal::ConnectLibHdfs(&driver_);
+ if (!status.ok()) {
+ LOG(ERROR) << "ConnectLibHdfs failed ";
+ }
+
+ // connect to HDFS with the builder object
+ hdfsBuilder* builder = driver_->NewBuilder();
+ driver_->BuilderSetNameNode(builder, endpoint.host.c_str());
+ driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
+ driver_->BuilderSetForceNewInstance(builder);
+ hdfsClient_ = driver_->BuilderConnect(builder);
VELOX_CHECK_NOT_NULL(
hdfsClient_,
"Unable to connect to HDFS: {}, got error: {}.",
endpoint.identity(),
- hdfsGetLastError());
+ driver_->GetLastExceptionRootCause());
}
~Impl() {
LOG(INFO) << "Disconnecting HDFS file system";
- int disconnectResult = hdfsDisconnect(hdfsClient_);
+ int disconnectResult = driver_->Disconnect(hdfsClient_);
if (disconnectResult != 0) {
LOG(WARNING) << "hdfs disconnect failure in HdfsReadFile close: "
<< errno;
@@ -54,8 +60,13 @@ class HdfsFileSystem::Impl {
return hdfsClient_;
}
+ filesystems::arrow::io::internal::LibHdfsShim* hdfsShim() {
+ return driver_;
+ }
+
private:
hdfsFS hdfsClient_;
+ filesystems::arrow::io::internal::LibHdfsShim* driver_;
};
HdfsFileSystem::HdfsFileSystem(
@@ -79,13 +90,15 @@ std::unique_ptr HdfsFileSystem::openFileForRead(
path.remove_prefix(index);
}
- return std::make_unique(impl_->hdfsClient(), path);
+ return std::make_unique(
+ impl_->hdfsShim(), impl_->hdfsClient(), path);
}
std::unique_ptr HdfsFileSystem::openFileForWrite(
std::string_view path,
const FileOptions& /*unused*/) {
- return std::make_unique(impl_->hdfsClient(), path);
+ return std::make_unique(
+ impl_->hdfsShim(), impl_->hdfsClient(), path);
}
bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) {
diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h
index 25602a470f16..1ae7d28b94e5 100644
--- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h
+++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h
@@ -15,7 +15,12 @@
*/
#include "velox/common/file/FileSystems.h"
+namespace velox::filesystems::arrow::io::internal {
+class LibHdfsShim;
+}
+
namespace facebook::velox::filesystems {
+
struct HdfsServiceEndpoint {
HdfsServiceEndpoint(const std::string& hdfsHost, const std::string& hdfsPort)
: host(hdfsHost), port(hdfsPort) {}
diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
index dedc2bb4a4c9..d48dd373d344 100644
--- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
+++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
@@ -16,20 +16,65 @@
#include "HdfsReadFile.h"
#include
-#include
+#include "velox/external/hdfs/ArrowHdfsInternal.h"
namespace facebook::velox {
-HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path)
- : hdfsClient_(hdfs), filePath_(path) {
- fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data());
+struct HdfsFile {
+ filesystems::arrow::io::internal::LibHdfsShim* driver_;
+ hdfsFS client_;
+ hdfsFile handle_;
+
+ HdfsFile() : driver_(nullptr), client_(nullptr), handle_(nullptr) {}
+ ~HdfsFile() {
+ if (handle_ && driver_->CloseFile(client_, handle_) == -1) {
+ LOG(ERROR) << "Unable to close file, errno: " << errno;
+ }
+ }
+
+ void open(
+ filesystems::arrow::io::internal::LibHdfsShim* driver,
+ hdfsFS client,
+ const std::string& path) {
+ driver_ = driver;
+ client_ = client;
+ handle_ = driver->OpenFile(client, path.data(), O_RDONLY, 0, 0, 0);
+ VELOX_CHECK_NOT_NULL(
+ handle_,
+ "Unable to open file {}. got error: {}",
+ path,
+ driver_->GetLastExceptionRootCause());
+ }
+
+ void seek(uint64_t offset) const {
+ VELOX_CHECK_EQ(
+ driver_->Seek(client_, handle_, offset),
+ 0,
+ "Cannot seek through HDFS file, error is : {}",
+ driver_->GetLastExceptionRootCause());
+ }
+
+ int32_t read(char* pos, uint64_t length) const {
+ auto bytesRead = driver_->Read(client_, handle_, pos, length);
+ VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal.");
+ return bytesRead;
+ }
+};
+
+HdfsReadFile::HdfsReadFile(
+ filesystems::arrow::io::internal::LibHdfsShim* driver,
+ hdfsFS hdfs,
+ const std::string_view path)
+ : driver_(driver), hdfsClient_(hdfs), filePath_(path) {
+ fileInfo_ = driver_->GetPathInfo(hdfsClient_, filePath_.data());
if (fileInfo_ == nullptr) {
- auto error = hdfsGetLastError();
+ auto error = fmt::format(
+ "FileNotFoundException: Path {} does not exist.", filePath_);
auto errMsg = fmt::format(
"Unable to get file path info for file: {}. got error: {}",
filePath_,
error);
- if (std::strstr(error, "FileNotFoundException") != nullptr) {
+ if (error.find("FileNotFoundException") != std::string::npos) {
VELOX_FILE_NOT_FOUND_ERROR(errMsg);
}
VELOX_FAIL(errMsg);
@@ -38,19 +83,22 @@ HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path)
HdfsReadFile::~HdfsReadFile() {
// should call hdfsFreeFileInfo to avoid memory leak
- hdfsFreeFileInfo(fileInfo_, 1);
+ if (fileInfo_) {
+ driver_->FreeFileInfo(fileInfo_, 1);
+ }
}
void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
const {
checkFileReadParameters(offset, length);
- if (!file_->handle_) {
- file_->open(hdfsClient_, filePath_);
+ folly::ThreadLocal file;
+ if (!file->handle_) {
+ file->open(driver_, hdfsClient_, filePath_);
}
- file_->seek(offset);
+ file->seek(offset);
uint64_t totalBytesRead = 0;
while (totalBytesRead < length) {
- auto bytesRead = file_->read(pos, length - totalBytesRead);
+ auto bytesRead = file->read(pos, length - totalBytesRead);
totalBytesRead += bytesRead;
pos += bytesRead;
}
diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h
index 1d531956f0ee..b63c2dd933dd 100644
--- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h
+++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h
@@ -14,53 +14,24 @@
* limitations under the License.
*/
-#include
#include "velox/common/file/File.h"
+#include "velox/external/hdfs/hdfs.h"
namespace facebook::velox {
-struct HdfsFile {
- hdfsFS client_;
- hdfsFile handle_;
-
- HdfsFile() : client_(nullptr), handle_(nullptr) {}
- ~HdfsFile() {
- if (handle_ && hdfsCloseFile(client_, handle_) == -1) {
- LOG(ERROR) << "Unable to close file, errno: " << errno;
- }
- }
-
- void open(hdfsFS client, const std::string& path) {
- client_ = client;
- handle_ = hdfsOpenFile(client, path.data(), O_RDONLY, 0, 0, 0);
- VELOX_CHECK_NOT_NULL(
- handle_,
- "Unable to open file {}. got error: {}",
- path,
- hdfsGetLastError());
- }
-
- void seek(uint64_t offset) const {
- VELOX_CHECK_EQ(
- hdfsSeek(client_, handle_, offset),
- 0,
- "Cannot seek through HDFS file, error is : {}",
- std::string(hdfsGetLastError()));
- }
-
- int32_t read(char* pos, uint64_t length) const {
- auto bytesRead = hdfsRead(client_, handle_, pos, length);
- VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal.");
- return bytesRead;
- }
-};
+namespace filesystems::arrow::io::internal {
+class LibHdfsShim;
+}
/**
* Implementation of hdfs read file.
*/
class HdfsReadFile final : public ReadFile {
public:
- explicit HdfsReadFile(hdfsFS hdfs, std::string_view path);
+ explicit HdfsReadFile(
+ filesystems::arrow::io::internal::LibHdfsShim* driver,
+ hdfsFS hdfs,
+ std::string_view path);
~HdfsReadFile() override;
std::string_view pread(uint64_t offset, uint64_t length, void* buf)
@@ -86,10 +57,10 @@ class HdfsReadFile final : public ReadFile {
void preadInternal(uint64_t offset, uint64_t length, char* pos) const;
void checkFileReadParameters(uint64_t offset, uint64_t length) const;
+ filesystems::arrow::io::internal::LibHdfsShim* driver_;
hdfsFS hdfsClient_;
hdfsFileInfo* fileInfo_;
std::string filePath_;
- folly::ThreadLocal file_;
};
} // namespace facebook::velox
diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp
index 60f98a88c972..be668a3133e1 100644
--- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp
+++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp
@@ -15,23 +15,24 @@
*/
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h"
-#include
+#include "velox/external/hdfs/ArrowHdfsInternal.h"
namespace facebook::velox {
HdfsWriteFile::HdfsWriteFile(
+ filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfsClient,
std::string_view path,
int bufferSize,
short replication,
int blockSize)
- : hdfsClient_(hdfsClient), filePath_(path) {
+ : driver_(driver), hdfsClient_(hdfsClient), filePath_(path) {
auto pos = filePath_.rfind("/");
auto parentDir = filePath_.substr(0, pos + 1);
- if (hdfsExists(hdfsClient_, parentDir.c_str()) == -1) {
- hdfsCreateDirectory(hdfsClient_, parentDir.c_str());
+ if (driver_->Exists(hdfsClient_, parentDir.c_str()) == -1) {
+ driver_->MakeDirectory(hdfsClient_, parentDir.c_str());
}
- hdfsFile_ = hdfsOpenFile(
+ hdfsFile_ = driver_->OpenFile(
hdfsClient_,
filePath_.c_str(),
O_WRONLY,
@@ -42,7 +43,7 @@ HdfsWriteFile::HdfsWriteFile(
hdfsFile_,
"Failed to open hdfs file: {}, with error: {}",
filePath_,
- std::string(hdfsGetLastError()));
+ driver_->GetLastExceptionRootCause());
}
HdfsWriteFile::~HdfsWriteFile() {
@@ -52,12 +53,12 @@ HdfsWriteFile::~HdfsWriteFile() {
}
void HdfsWriteFile::close() {
- int success = hdfsCloseFile(hdfsClient_, hdfsFile_);
+ int success = driver_->CloseFile(hdfsClient_, hdfsFile_);
VELOX_CHECK_EQ(
success,
0,
"Failed to close hdfs file: {}",
- std::string(hdfsGetLastError()));
+ driver_->GetLastExceptionRootCause());
hdfsFile_ = nullptr;
}
@@ -66,9 +67,9 @@ void HdfsWriteFile::flush() {
hdfsFile_,
"Cannot flush HDFS file because file handle is null, file path: {}",
filePath_);
- int success = hdfsFlush(hdfsClient_, hdfsFile_);
+ int success = driver_->Flush(hdfsClient_, hdfsFile_);
VELOX_CHECK_EQ(
- success, 0, "Hdfs flush error: {}", std::string(hdfsGetLastError()));
+ success, 0, "Hdfs flush error: {}", driver_->GetLastExceptionRootCause());
}
void HdfsWriteFile::append(std::string_view data) {
@@ -79,20 +80,20 @@ void HdfsWriteFile::append(std::string_view data) {
hdfsFile_,
"Cannot append to HDFS file because file handle is null, file path: {}",
filePath_);
- int64_t totalWrittenBytes =
- hdfsWrite(hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size());
+ int64_t totalWrittenBytes = driver_->Write(
+ hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size());
VELOX_CHECK_EQ(
totalWrittenBytes,
data.size(),
"Write failure in HDFSWriteFile::append {}",
- std::string(hdfsGetLastError()));
+ driver_->GetLastExceptionRootCause());
}
uint64_t HdfsWriteFile::size() const {
- auto fileInfo = hdfsGetPathInfo(hdfsClient_, filePath_.c_str());
+ auto fileInfo = driver_->GetPathInfo(hdfsClient_, filePath_.c_str());
uint64_t size = fileInfo->mSize;
// should call hdfsFreeFileInfo to avoid memory leak
- hdfsFreeFileInfo(fileInfo, 1);
+ driver_->FreeFileInfo(fileInfo, 1);
return size;
}
diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h
index 7ed1819cd61f..fb311b1a6c3d 100644
--- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h
+++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h
@@ -15,11 +15,15 @@
*/
#pragma once
-#include
#include "velox/common/file/File.h"
+#include "velox/external/hdfs/hdfs.h"
namespace facebook::velox {
+namespace filesystems::arrow::io::internal {
+class LibHdfsShim;
+}
+
/// Implementation of hdfs write file. Nothing written to the file should be
/// read back until it is closed.
class HdfsWriteFile : public WriteFile {
@@ -34,6 +38,7 @@ class HdfsWriteFile : public WriteFile {
/// @param blockSize Size of block - pass 0 if you want to use the
/// default configured values.
HdfsWriteFile(
+ filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfsClient,
std::string_view path,
int bufferSize = 0,
@@ -55,6 +60,7 @@ class HdfsWriteFile : public WriteFile {
void close() override;
private:
+ filesystems::arrow::io::internal::LibHdfsShim* driver_;
/// The configured hdfs filesystem handle.
hdfsFS hdfsClient_;
/// The hdfs file handle for write.
diff --git a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp
index bdff4a7a4fdc..1f23179f0a72 100644
--- a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp
+++ b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-#ifdef VELOX_ENABLE_HDFS3
+#ifdef VELOX_ENABLE_HDFS
#include "folly/concurrency/ConcurrentHashMap.h"
#include "velox/common/config/Config.h"
@@ -25,7 +25,7 @@
namespace facebook::velox::filesystems {
-#ifdef VELOX_ENABLE_HDFS3
+#ifdef VELOX_ENABLE_HDFS
std::mutex mtx;
std::function
#include
-#include
#include
#include
#include "HdfsMiniCluster.h"
#include "gtest/gtest.h"
+#include "velox/common/base/Exceptions.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h"
#include "velox/core/QueryConfig.h"
#include "velox/exec/tests/utils/TempFilePath.h"
+#include "velox/external/hdfs/ArrowHdfsInternal.h"
+
+#include
using namespace facebook::velox;
+using filesystems::arrow::io::internal::LibHdfsShim;
+
constexpr int kOneMB = 1 << 20;
static const std::string destinationPath = "/test_file.txt";
static const std::string hdfsPort = "7878";
static const std::string localhost = "localhost";
static const std::string fullDestinationPath =
"hdfs://" + localhost + ":" + hdfsPort + destinationPath;
-static const std::string simpleDestinationPath = "hdfs:///" + destinationPath;
+static const std::string simpleDestinationPath = "hdfs://" + destinationPath;
static const std::unordered_map configurationValues(
{{"hive.hdfs.host", localhost}, {"hive.hdfs.port", hdfsPort}});
@@ -55,6 +60,7 @@ class HdfsFileSystemTest : public testing::Test {
if (!miniCluster->isRunning()) {
miniCluster->start();
}
+ filesystems::registerHdfsFileSystem();
}
static void TearDownTestSuite() {
@@ -119,6 +125,7 @@ void checkReadErrorMessages(
} catch (VeloxException const& error) {
EXPECT_THAT(error.message(), testing::HasSubstr(errorMessage));
}
+
try {
auto buf = std::make_unique(8);
readFile->pread(10 + kOneMB, endpoint, buf.get());
@@ -128,9 +135,18 @@ void checkReadErrorMessages(
}
}
-void verifyFailures(hdfsFS hdfs) {
- HdfsReadFile readFile(hdfs, destinationPath);
- HdfsReadFile readFile2(hdfs, destinationPath);
+bool checkMiniClusterStop(ReadFile* readFile, const std::string& errorMessage) {
+ try {
+ readFile->pread(0, 1);
+ return false;
+ } catch (const VeloxException& error) {
+ return error.message().find(errorMessage) != std::string::npos;
+ }
+}
+
+void verifyFailures(LibHdfsShim* driver, hdfsFS hdfs) {
+ HdfsReadFile readFile(driver, hdfs, destinationPath);
+ HdfsReadFile readFile2(driver, hdfs, destinationPath);
auto startPoint = 10 + kOneMB;
auto size = 15 + kOneMB;
auto endpoint = 10 + 2 * kOneMB;
@@ -139,43 +155,60 @@ void verifyFailures(hdfsFS hdfs) {
"(%d vs. %d) Cannot read HDFS file beyond its size: %d, offset: %d, end point: %d") %
size % endpoint % size % startPoint % endpoint)
.str();
- auto serverAddress = (boost::format("%s:%s") % localhost % hdfsPort).str();
+
auto readFailErrorMessage =
(boost::format(
- "Unable to open file %s. got error: HdfsIOException: InputStreamImpl: cannot open file: %s.\t"
- "Caused by: Hdfs::HdfsRpcException: HdfsFailoverException: Failed to invoke RPC call \"getBlockLocations\" on server \"%s\"\t\t"
- "Caused by: HdfsNetworkConnectException: Connect to \"%s\" failed") %
- destinationPath % destinationPath % serverAddress % serverAddress)
- .str();
- auto builderErrorMessage =
- (boost::format(
- "Unable to connect to HDFS: %s, got error: Hdfs::HdfsRpcException: HdfsFailoverException: "
- "Failed to invoke RPC call \"getFsStats\" on server \"%s\"\tCaused by: "
- "HdfsNetworkConnectException: Connect to \"%s\" failed") %
- serverAddress % serverAddress % serverAddress)
+ "Unable to open file %s. got error: ConnectException: Connection refused") %
+ destinationPath)
.str();
+
checkReadErrorMessages(&readFile, offsetErrorMessage, kOneMB);
HdfsFileSystemTest::miniCluster->stop();
- checkReadErrorMessages(&readFile2, readFailErrorMessage, 1);
- try {
- auto config = std::make_shared(
- std::unordered_map(configurationValues));
- filesystems::HdfsFileSystem hdfsFileSystem(
- config,
- filesystems::HdfsFileSystem::getServiceEndpoint(
- simpleDestinationPath, config.get()));
- FAIL() << "expected VeloxException";
- } catch (VeloxException const& error) {
- EXPECT_THAT(error.message(), testing::HasSubstr(builderErrorMessage));
+
+ constexpr auto kMaxRetries = 10;
+ int retries = 0;
+ while (true) {
+ if (checkMiniClusterStop(&readFile2, readFailErrorMessage)) {
+ checkReadErrorMessages(&readFile2, readFailErrorMessage, 1);
+ break;
+ } else {
+ if (retries >= kMaxRetries) {
+ FAIL() << "MiniCluster doesn't stop after kMaxRetries try";
+ } else {
+ sleep(1);
+ retries++;
+ }
+ }
}
}
+hdfsFS connectHdfsDriver(
+ filesystems::arrow::io::internal::LibHdfsShim** driver) {
+ filesystems::arrow::io::internal::LibHdfsShim* libhdfs_shim;
+ auto status = filesystems::arrow::io::internal::ConnectLibHdfs(&libhdfs_shim);
+ if (!status.ok()) {
+ LOG(ERROR) << "ConnectLibHdfs failed ";
+ }
+
+ // Connect to HDFS with the builder object
+ hdfsBuilder* builder = libhdfs_shim->NewBuilder();
+ libhdfs_shim->BuilderSetNameNode(builder, localhost.c_str());
+ libhdfs_shim->BuilderSetNameNodePort(builder, 7878);
+ libhdfs_shim->BuilderSetForceNewInstance(builder);
+
+ auto hdfs = libhdfs_shim->BuilderConnect(builder);
+ VELOX_CHECK_NOT_NULL(
+ hdfs,
+ "Unable to connect to HDFS: {}, got error",
+ std::string(localhost.c_str()) + ":7878");
+ *driver = libhdfs_shim;
+ return hdfs;
+}
+
TEST_F(HdfsFileSystemTest, read) {
- struct hdfsBuilder* builder = hdfsNewBuilder();
- hdfsBuilderSetNameNode(builder, localhost.c_str());
- hdfsBuilderSetNameNodePort(builder, 7878);
- auto hdfs = hdfsBuilderConnect(builder);
- HdfsReadFile readFile(hdfs, destinationPath);
+ filesystems::arrow::io::internal::LibHdfsShim* driver;
+ auto hdfs = connectHdfsDriver(&driver);
+ HdfsReadFile readFile(driver, hdfs, destinationPath);
readData(&readFile);
}
@@ -224,6 +257,7 @@ TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) {
auto config = std::make_shared(
std::unordered_map(configurationValues));
auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config);
+
VELOX_ASSERT_RUNTIME_THROW_CODE(
hdfsFileSystem->openFileForRead(
"hdfs://localhost:7777/path/that/does/not/exist"),
@@ -271,11 +305,9 @@ TEST_F(HdfsFileSystemTest, missingPort) {
TEST_F(HdfsFileSystemTest, missingFileViaReadFile) {
try {
- struct hdfsBuilder* builder = hdfsNewBuilder();
- hdfsBuilderSetNameNode(builder, localhost.c_str());
- hdfsBuilderSetNameNodePort(builder, std::stoi(hdfsPort));
- auto hdfs = hdfsBuilderConnect(builder);
- HdfsReadFile readFile(hdfs, "/path/that/does/not/exist");
+ filesystems::arrow::io::internal::LibHdfsShim* driver;
+ auto hdfs = connectHdfsDriver(&driver);
+ HdfsReadFile readFile(driver, hdfs, "/path/that/does/not/exist");
FAIL() << "expected VeloxException";
} catch (VeloxException const& error) {
EXPECT_THAT(
@@ -288,13 +320,13 @@ TEST_F(HdfsFileSystemTest, missingFileViaReadFile) {
TEST_F(HdfsFileSystemTest, schemeMatching) {
try {
auto fs = std::dynamic_pointer_cast(
- filesystems::getFileSystem("/", nullptr));
+ filesystems::getFileSystem("file://", nullptr));
FAIL() << "expected VeloxException";
} catch (VeloxException const& error) {
EXPECT_THAT(
error.message(),
testing::HasSubstr(
- "No registered file system matched with file path '/'"));
+ "No registered file system matched with file path 'file://'"));
}
auto fs = std::dynamic_pointer_cast(
filesystems::getFileSystem(fullDestinationPath, nullptr));
@@ -327,11 +359,10 @@ TEST_F(HdfsFileSystemTest, removeNotSupported) {
TEST_F(HdfsFileSystemTest, multipleThreadsWithReadFile) {
startThreads = false;
- struct hdfsBuilder* builder = hdfsNewBuilder();
- hdfsBuilderSetNameNode(builder, localhost.c_str());
- hdfsBuilderSetNameNodePort(builder, 7878);
- auto hdfs = hdfsBuilderConnect(builder);
- HdfsReadFile readFile(hdfs, destinationPath);
+
+ filesystems::arrow::io::internal::LibHdfsShim* driver;
+ auto hdfs = connectHdfsDriver(&driver);
+
std::vector threads;
std::mt19937 generator(std::random_device{}());
std::vector sleepTimesInMicroseconds = {0, 500, 50000};
@@ -339,13 +370,14 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithReadFile) {
0, sleepTimesInMicroseconds.size() - 1);
for (int i = 0; i < 25; i++) {
auto thread = std::thread(
- [&readFile, &distribution, &generator, &sleepTimesInMicroseconds] {
+ [&driver, &hdfs, &distribution, &generator, &sleepTimesInMicroseconds] {
int index = distribution(generator);
while (!HdfsFileSystemTest::startThreads) {
std::this_thread::yield();
}
std::this_thread::sleep_for(
std::chrono::microseconds(sleepTimesInMicroseconds[index]));
+ HdfsReadFile readFile(driver, hdfs, destinationPath);
readData(&readFile);
});
threads.emplace_back(std::move(thread));
@@ -441,9 +473,7 @@ TEST_F(HdfsFileSystemTest, writeWithParentDirNotExist) {
}
TEST_F(HdfsFileSystemTest, readFailures) {
- struct hdfsBuilder* builder = hdfsNewBuilder();
- hdfsBuilderSetNameNode(builder, localhost.c_str());
- hdfsBuilderSetNameNodePort(builder, stoi(hdfsPort));
- auto hdfs = hdfsBuilderConnect(builder);
- verifyFailures(hdfs);
+ filesystems::arrow::io::internal::LibHdfsShim* driver;
+ auto hdfs = connectHdfsDriver(&driver);
+ verifyFailures(driver, hdfs);
}
diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp b/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp
index 2765d73142e2..bc0f56b9423d 100644
--- a/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp
+++ b/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp
@@ -17,6 +17,7 @@
#include
#include "gtest/gtest.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
+#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.h"
#include "velox/exec/TableWriter.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
@@ -36,6 +37,7 @@ class InsertIntoHdfsTest : public HiveConnectorTestBase {
public:
void SetUp() override {
HiveConnectorTestBase::SetUp();
+ filesystems::registerHdfsFileSystem();
if (miniCluster == nullptr) {
miniCluster = std::make_shared();
miniCluster->start();
@@ -104,7 +106,7 @@ TEST_F(InsertIntoHdfsTest, insertIntoHdfsTest) {
plan = PlanBuilder().tableScan(rowType_).planNode();
auto splits = HiveConnectorTestBase::makeHiveConnectorSplits(
- fmt::format("{}/{}", outputDirectory, writeFileName),
+ fmt::format("{}{}", outputDirectory, writeFileName),
1,
dwio::common::FileFormat::DWRF);
auto copy = AssertQueryBuilder(plan).split(splits[0]).copyResults(pool());
diff --git a/velox/external/hdfs/ArrowHdfsInternal.cpp b/velox/external/hdfs/ArrowHdfsInternal.cpp
new file mode 100644
index 000000000000..c9a0c7dd81f0
--- /dev/null
+++ b/velox/external/hdfs/ArrowHdfsInternal.cpp
@@ -0,0 +1,641 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This shim interface to libhdfs (for runtime shared library loading) has been
+// adapted from the SFrame project, released under the ASF-compatible 3-clause
+// BSD license
+//
+// Using this required having the $JAVA_HOME and $HADOOP_HOME environment
+// variables set, so that libjvm and libhdfs can be located easily
+
+// Copyright (C) 2015 Dato, Inc.
+// All rights reserved.
+//
+// This software may be modified and distributed under the terms
+// of the BSD license. See the LICENSE file for details.
+
+#include "ArrowHdfsInternal.h"
+
+#include
+#include
+#include
+#include // IWYU pragma: keep
+#include
+#include
+#include
+
+#ifndef _WIN32
+#include
+#endif
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/io_util.h"
+#include "arrow/util/logging.h"
+
+namespace facebook::velox::filesystems::arrow {
+
+using ::arrow::internal::GetEnvVarNative;
+using ::arrow::internal::PlatformFilename;
+#ifdef _WIN32
+using internal::WinErrorMessage;
+#endif
+
+namespace io {
+namespace internal {
+
+namespace {
+
+void* GetLibrarySymbol(LibraryHandle handle, const char* symbol) {
+ if (handle == NULL) {
+ printf("handle is null\n");
+ return NULL;
+ }
+
+#ifndef _WIN32
+ return dlsym(handle, symbol);
+#else
+
+ void* ret = reinterpret_cast(GetProcAddress(, symbol));
+ if (ret == NULL) {
+ printf("ret is null\n");
+ // logstream(LOG_INFO) << "GetProcAddress error: "
+ // << get_last_err_str(GetLastError()) << std::endl;
+ }
+ return ret;
+#endif
+}
+
+#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME) \
+ do { \
+ if (!SHIM->SYMBOL_NAME) { \
+ *reinterpret_cast(&SHIM->SYMBOL_NAME) = \
+ GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \
+ } \
+ if (!SHIM->SYMBOL_NAME) \
+ return ::arrow::Status::IOError("Getting symbol " #SYMBOL_NAME \
+ "failed"); \
+ } while (0)
+
+#define GET_SYMBOL(SHIM, SYMBOL_NAME) \
+ if (!SHIM->SYMBOL_NAME) { \
+ *reinterpret_cast(&SHIM->SYMBOL_NAME) = \
+ GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \
+ }
+
+LibraryHandle libjvm_handle = nullptr;
+
+// Helper functions for dlopens
+::arrow::Result> get_potential_libjvm_paths();
+::arrow::Result> get_potential_libhdfs_paths();
+::arrow::Result try_dlopen(
+ const std::vector& potential_paths,
+ const char* name);
+
+::arrow::Result> MakeFilenameVector(
+ const std::vector& names) {
+ std::vector filenames(names.size());
+ for (size_t i = 0; i < names.size(); ++i) {
+ ARROW_ASSIGN_OR_RAISE(filenames[i], PlatformFilename::FromString(names[i]));
+ }
+ return filenames;
+}
+
+void AppendEnvVarFilename(
+ const char* var_name,
+ std::vector* filenames) {
+ auto maybe_env_var = GetEnvVarNative(var_name);
+ if (maybe_env_var.ok()) {
+ filenames->emplace_back(std::move(*maybe_env_var));
+ }
+}
+
+void AppendEnvVarFilename(
+ const char* var_name,
+ const char* suffix,
+ std::vector* filenames) {
+ auto maybe_env_var = GetEnvVarNative(var_name);
+ if (maybe_env_var.ok()) {
+ auto maybe_env_var_with_suffix =
+ PlatformFilename(std::move(*maybe_env_var)).Join(suffix);
+ if (maybe_env_var_with_suffix.ok()) {
+ filenames->emplace_back(std::move(*maybe_env_var_with_suffix));
+ }
+ }
+}
+
+void InsertEnvVarFilename(
+ const char* var_name,
+ std::vector* filenames) {
+ auto maybe_env_var = GetEnvVarNative(var_name);
+ if (maybe_env_var.ok()) {
+ filenames->emplace(
+ filenames->begin(), PlatformFilename(std::move(*maybe_env_var)));
+ }
+}
+
+::arrow::Result> get_potential_libhdfs_paths() {
+ std::vector potential_paths;
+ std::string file_name;
+
+// OS-specific file name
+#ifdef _WIN32
+ file_name = "hdfs.dll";
+#elif __APPLE__
+ file_name = "libhdfs.dylib";
+#else
+ file_name = "libhdfs.so";
+#endif
+
+ // Common paths
+ ARROW_ASSIGN_OR_RAISE(auto search_paths, MakeFilenameVector({"", "."}));
+
+ // Path from environment variable
+ AppendEnvVarFilename("HADOOP_HOME", "lib/native", &search_paths);
+ AppendEnvVarFilename("ARROW_LIBHDFS_DIR", &search_paths);
+
+ // All paths with file name
+ for (const auto& path : search_paths) {
+ ARROW_ASSIGN_OR_RAISE(auto full_path, path.Join(file_name));
+ potential_paths.push_back(std::move(full_path));
+ }
+
+ return potential_paths;
+}
+
+::arrow::Result> get_potential_libjvm_paths() {
+ std::vector potential_paths;
+
+ std::vector search_prefixes;
+ std::vector search_suffixes;
+ std::string file_name;
+
+// From heuristics
+#ifdef _WIN32
+ ARROW_ASSIGN_OR_RAISE(search_prefixes, MakeFilenameVector({""}));
+ ARROW_ASSIGN_OR_RAISE(
+ search_suffixes, MakeFilenameVector({"/jre/bin/server", "/bin/server"}));
+ file_name = "jvm.dll";
+#elif __APPLE__
+ ARROW_ASSIGN_OR_RAISE(search_prefixes, MakeFilenameVector({""}));
+ ARROW_ASSIGN_OR_RAISE(
+ search_suffixes, MakeFilenameVector({"/jre/lib/server", "/lib/server"}));
+ file_name = "libjvm.dylib";
+
+// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are
+// expecting users to set an environment variable
+#else
+#if defined(__aarch64__)
+ const std::string prefix_arch{"arm64"};
+ const std::string suffix_arch{"aarch64"};
+#else
+ const std::string prefix_arch{"amd64"};
+ const std::string suffix_arch{"amd64"};
+#endif
+ ARROW_ASSIGN_OR_RAISE(
+ search_prefixes,
+ MakeFilenameVector({
+ "/usr/lib/jvm/default-java", // ubuntu / debian distros
+ "/usr/lib/jvm/java", // rhel6
+ "/usr/lib/jvm", // centos6
+ "/usr/lib64/jvm", // opensuse 13
+ "/usr/local/lib/jvm/default-java", // alt ubuntu / debian distros
+ "/usr/local/lib/jvm/java", // alt rhel6
+ "/usr/local/lib/jvm", // alt centos6
+ "/usr/local/lib64/jvm", // alt opensuse 13
+ "/usr/local/lib/jvm/java-8-openjdk-" +
+ prefix_arch, // alt ubuntu / debian distros
+ "/usr/lib/jvm/java-8-openjdk-" +
+ prefix_arch, // alt ubuntu / debian distros
+ "/usr/local/lib/jvm/java-7-openjdk-" +
+ prefix_arch, // alt ubuntu / debian distros
+ "/usr/lib/jvm/java-7-openjdk-" +
+ prefix_arch, // alt ubuntu / debian distros
+ "/usr/local/lib/jvm/java-6-openjdk-" +
+ prefix_arch, // alt ubuntu / debian distros
+ "/usr/lib/jvm/java-6-openjdk-" +
+ prefix_arch, // alt ubuntu / debian distros
+ "/usr/lib/jvm/java-7-oracle", // alt ubuntu
+ "/usr/lib/jvm/java-8-oracle", // alt ubuntu
+ "/usr/lib/jvm/java-6-oracle", // alt ubuntu
+ "/usr/local/lib/jvm/java-7-oracle", // alt ubuntu
+ "/usr/local/lib/jvm/java-8-oracle", // alt ubuntu
+ "/usr/local/lib/jvm/java-6-oracle", // alt ubuntu
+ "/usr/lib/jvm/default", // alt centos
+ "/usr/java/latest" // alt centos
+ }));
+ ARROW_ASSIGN_OR_RAISE(
+ search_suffixes,
+ MakeFilenameVector(
+ {"",
+ "/lib/server",
+ "/jre/lib/" + suffix_arch + "/server",
+ "/lib/" + suffix_arch + "/server"}));
+ file_name = "libjvm.so";
+#endif
+
+ // From direct environment variable
+ InsertEnvVarFilename("JAVA_HOME", &search_prefixes);
+
+ // Generate cross product between search_prefixes, search_suffixes, and
+ // file_name
+ for (auto& prefix : search_prefixes) {
+ for (auto& suffix : search_suffixes) {
+ ARROW_ASSIGN_OR_RAISE(auto path, prefix.Join(suffix).Join(file_name));
+ potential_paths.push_back(std::move(path));
+ }
+ }
+
+ return potential_paths;
+}
+
+#ifndef _WIN32
+::arrow::Result try_dlopen(
+ const std::vector& potential_paths,
+ const char* name) {
+ std::string error_message = "unknown error";
+ LibraryHandle handle;
+
+ for (const auto& p : potential_paths) {
+ handle = dlopen(p.ToNative().c_str(), RTLD_NOW | RTLD_LOCAL);
+
+ if (handle != NULL) {
+ return handle;
+ } else {
+ const char* err_msg = dlerror();
+ if (err_msg != NULL) {
+ error_message = err_msg;
+ }
+ }
+ }
+
+ return ::arrow::Status::IOError("Unable to load ", name, ": ", error_message);
+}
+
+#else
+::arrow::Result try_dlopen(
+ const std::vector& potential_paths,
+ const char* name) {
+ std::string error_message;
+ LibraryHandle handle;
+
+ for (const auto& p : potential_paths) {
+ handle = LoadLibraryW(p.ToNative().c_str());
+ if (handle != NULL) {
+ return handle;
+ } else {
+ error_message = WinErrorMessage(GetLastError());
+ }
+ }
+
+ return ::arrow::Status::IOError("Unable to load ", name, ": ", error_message);
+}
+#endif // _WIN32
+
+LibHdfsShim libhdfs_shim;
+
+} // namespace
+
+::arrow::Status LibHdfsShim::GetRequiredSymbols() {
+ GET_SYMBOL_REQUIRED(this, hdfsNewBuilder);
+ GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNode);
+ GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNodePort);
+ GET_SYMBOL_REQUIRED(this, hdfsBuilderSetUserName);
+ GET_SYMBOL_REQUIRED(this, hdfsBuilderSetKerbTicketCachePath);
+ GET_SYMBOL_REQUIRED(this, hdfsBuilderSetForceNewInstance);
+ GET_SYMBOL_REQUIRED(this, hdfsBuilderConfSetStr);
+ GET_SYMBOL_REQUIRED(this, hdfsBuilderConnect);
+ GET_SYMBOL_REQUIRED(this, hdfsCreateDirectory);
+ GET_SYMBOL_REQUIRED(this, hdfsDelete);
+ GET_SYMBOL_REQUIRED(this, hdfsDisconnect);
+ GET_SYMBOL_REQUIRED(this, hdfsExists);
+ GET_SYMBOL_REQUIRED(this, hdfsFreeFileInfo);
+ GET_SYMBOL_REQUIRED(this, hdfsGetCapacity);
+ GET_SYMBOL_REQUIRED(this, hdfsGetUsed);
+ GET_SYMBOL_REQUIRED(this, hdfsGetPathInfo);
+ GET_SYMBOL_REQUIRED(this, hdfsListDirectory);
+ GET_SYMBOL_REQUIRED(this, hdfsChown);
+ GET_SYMBOL_REQUIRED(this, hdfsChmod);
+
+ // File methods
+ GET_SYMBOL_REQUIRED(this, hdfsCloseFile);
+ GET_SYMBOL_REQUIRED(this, hdfsFlush);
+ GET_SYMBOL_REQUIRED(this, hdfsOpenFile);
+ GET_SYMBOL_REQUIRED(this, hdfsRead);
+ GET_SYMBOL_REQUIRED(this, hdfsSeek);
+ GET_SYMBOL_REQUIRED(this, hdfsTell);
+ GET_SYMBOL_REQUIRED(this, hdfsWrite);
+
+ return ::arrow::Status::OK();
+}
+
+::arrow::Status ConnectLibHdfs(LibHdfsShim** driver) {
+ static std::mutex lock;
+ std::lock_guard guard(lock);
+
+ LibHdfsShim* shim = &libhdfs_shim;
+
+ static bool shim_attempted = false;
+ if (!shim_attempted) {
+ shim_attempted = true;
+
+ shim->Initialize();
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto libjvm_potential_paths, get_potential_libjvm_paths());
+ ARROW_ASSIGN_OR_RAISE(
+ libjvm_handle, try_dlopen(libjvm_potential_paths, "libjvm"));
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto libhdfs_potential_paths, get_potential_libhdfs_paths());
+ ARROW_ASSIGN_OR_RAISE(
+ shim->handle, try_dlopen(libhdfs_potential_paths, "libhdfs"));
+ } else if (shim->handle == nullptr) {
+ return ::arrow::Status::IOError("Prior attempt to load libhdfs failed");
+ }
+
+ *driver = shim;
+ return shim->GetRequiredSymbols();
+}
+
+///////////////////////////////////////////////////////////////////////////
+// HDFS thin wrapper methods
+
+hdfsBuilder* LibHdfsShim::NewBuilder(void) {
+ return this->hdfsNewBuilder();
+}
+
+void LibHdfsShim::BuilderSetNameNode(hdfsBuilder* bld, const char* nn) {
+ this->hdfsBuilderSetNameNode(bld, nn);
+}
+
+void LibHdfsShim::BuilderSetNameNodePort(hdfsBuilder* bld, tPort port) {
+ this->hdfsBuilderSetNameNodePort(bld, port);
+}
+
+void LibHdfsShim::BuilderSetUserName(hdfsBuilder* bld, const char* userName) {
+ this->hdfsBuilderSetUserName(bld, userName);
+}
+
+void LibHdfsShim::BuilderSetKerbTicketCachePath(
+ hdfsBuilder* bld,
+ const char* kerbTicketCachePath) {
+ this->hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath);
+}
+
+void LibHdfsShim::BuilderSetForceNewInstance(hdfsBuilder* bld) {
+ this->hdfsBuilderSetForceNewInstance(bld);
+}
+
+hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) {
+ return this->hdfsBuilderConnect(bld);
+}
+
+int LibHdfsShim::BuilderConfSetStr(
+ hdfsBuilder* bld,
+ const char* key,
+ const char* val) {
+ return this->hdfsBuilderConfSetStr(bld, key, val);
+}
+
+int LibHdfsShim::Disconnect(hdfsFS fs) {
+ return this->hdfsDisconnect(fs);
+}
+
+hdfsFile LibHdfsShim::OpenFile(
+ hdfsFS fs,
+ const char* path,
+ int flags,
+ int bufferSize,
+ short replication,
+ tSize blocksize) { // NOLINT
+ return this->hdfsOpenFile(
+ fs, path, flags, bufferSize, replication, blocksize);
+}
+
+int LibHdfsShim::CloseFile(hdfsFS fs, hdfsFile file) {
+ return this->hdfsCloseFile(fs, file);
+}
+
+int LibHdfsShim::Exists(hdfsFS fs, const char* path) {
+ return this->hdfsExists(fs, path);
+}
+
+int LibHdfsShim::Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
+ return this->hdfsSeek(fs, file, desiredPos);
+}
+
+tOffset LibHdfsShim::Tell(hdfsFS fs, hdfsFile file) {
+ return this->hdfsTell(fs, file);
+}
+
+tSize LibHdfsShim::Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length) {
+ return this->hdfsRead(fs, file, buffer, length);
+}
+
+bool LibHdfsShim::HasPread() {
+ GET_SYMBOL(this, hdfsPread);
+ return this->hdfsPread != nullptr;
+}
+
+tSize LibHdfsShim::Pread(
+ hdfsFS fs,
+ hdfsFile file,
+ tOffset position,
+ void* buffer,
+ tSize length) {
+ GET_SYMBOL(this, hdfsPread);
+ DCHECK(this->hdfsPread);
+ return this->hdfsPread(fs, file, position, buffer, length);
+}
+
+tSize LibHdfsShim::Write(
+ hdfsFS fs,
+ hdfsFile file,
+ const void* buffer,
+ tSize length) {
+ return this->hdfsWrite(fs, file, buffer, length);
+}
+
+int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) {
+ return this->hdfsFlush(fs, file);
+}
+
+int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) {
+ GET_SYMBOL(this, hdfsAvailable);
+ if (this->hdfsAvailable)
+ return this->hdfsAvailable(fs, file);
+ else
+ return 0;
+}
+
+int LibHdfsShim::Copy(
+ hdfsFS srcFS,
+ const char* src,
+ hdfsFS dstFS,
+ const char* dst) {
+ GET_SYMBOL(this, hdfsCopy);
+ if (this->hdfsCopy)
+ return this->hdfsCopy(srcFS, src, dstFS, dst);
+ else
+ return 0;
+}
+
+int LibHdfsShim::Move(
+ hdfsFS srcFS,
+ const char* src,
+ hdfsFS dstFS,
+ const char* dst) {
+ GET_SYMBOL(this, hdfsMove);
+ if (this->hdfsMove)
+ return this->hdfsMove(srcFS, src, dstFS, dst);
+ else
+ return 0;
+}
+
+int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) {
+ return this->hdfsDelete(fs, path, recursive);
+}
+
+int LibHdfsShim::Rename(hdfsFS fs, const char* oldPath, const char* newPath) {
+ GET_SYMBOL(this, hdfsRename);
+ if (this->hdfsRename)
+ return this->hdfsRename(fs, oldPath, newPath);
+ else
+ return 0;
+}
+
+char* LibHdfsShim::GetWorkingDirectory(
+ hdfsFS fs,
+ char* buffer,
+ size_t bufferSize) {
+ GET_SYMBOL(this, hdfsGetWorkingDirectory);
+ if (this->hdfsGetWorkingDirectory) {
+ return this->hdfsGetWorkingDirectory(fs, buffer, bufferSize);
+ } else {
+ return NULL;
+ }
+}
+
+int LibHdfsShim::SetWorkingDirectory(hdfsFS fs, const char* path) {
+ GET_SYMBOL(this, hdfsSetWorkingDirectory);
+ if (this->hdfsSetWorkingDirectory) {
+ return this->hdfsSetWorkingDirectory(fs, path);
+ } else {
+ return 0;
+ }
+}
+
+int LibHdfsShim::MakeDirectory(hdfsFS fs, const char* path) {
+ return this->hdfsCreateDirectory(fs, path);
+}
+
+int LibHdfsShim::SetReplication(
+ hdfsFS fs,
+ const char* path,
+ int16_t replication) {
+ GET_SYMBOL(this, hdfsSetReplication);
+ if (this->hdfsSetReplication) {
+ return this->hdfsSetReplication(fs, path, replication);
+ } else {
+ return 0;
+ }
+}
+
+hdfsFileInfo*
+LibHdfsShim::ListDirectory(hdfsFS fs, const char* path, int* numEntries) {
+ return this->hdfsListDirectory(fs, path, numEntries);
+}
+
+hdfsFileInfo* LibHdfsShim::GetPathInfo(hdfsFS fs, const char* path) {
+ return this->hdfsGetPathInfo(fs, path);
+}
+
+void LibHdfsShim::FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) {
+ this->hdfsFreeFileInfo(hdfsFileInfo, numEntries);
+}
+
+char*** LibHdfsShim::GetHosts(
+ hdfsFS fs,
+ const char* path,
+ tOffset start,
+ tOffset length) {
+ GET_SYMBOL(this, hdfsGetHosts);
+ if (this->hdfsGetHosts) {
+ return this->hdfsGetHosts(fs, path, start, length);
+ } else {
+ return NULL;
+ }
+}
+
+void LibHdfsShim::FreeHosts(char*** blockHosts) {
+ GET_SYMBOL(this, hdfsFreeHosts);
+ if (this->hdfsFreeHosts) {
+ this->hdfsFreeHosts(blockHosts);
+ }
+}
+
+tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS fs) {
+ GET_SYMBOL(this, hdfsGetDefaultBlockSize);
+ if (this->hdfsGetDefaultBlockSize) {
+ return this->hdfsGetDefaultBlockSize(fs);
+ } else {
+ return 0;
+ }
+}
+
+tOffset LibHdfsShim::GetCapacity(hdfsFS fs) {
+ return this->hdfsGetCapacity(fs);
+}
+
+tOffset LibHdfsShim::GetUsed(hdfsFS fs) {
+ return this->hdfsGetUsed(fs);
+}
+
+int LibHdfsShim::Chown(
+ hdfsFS fs,
+ const char* path,
+ const char* owner,
+ const char* group) {
+ return this->hdfsChown(fs, path, owner, group);
+}
+
+int LibHdfsShim::Chmod(hdfsFS fs, const char* path, short mode) { // NOLINT
+ return this->hdfsChmod(fs, path, mode);
+}
+
+int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
+ GET_SYMBOL(this, hdfsUtime);
+ if (this->hdfsUtime) {
+ return this->hdfsUtime(fs, path, mtime, atime);
+ } else {
+ return 0;
+ }
+}
+
+char* LibHdfsShim::GetLastExceptionRootCause() {
+ GET_SYMBOL(this, hdfsGetLastExceptionRootCause);
+ if (this->hdfsGetLastExceptionRootCause) {
+ return this->hdfsGetLastExceptionRootCause();
+ } else {
+ return strdup("GetLastExceptionRootCause return null");
+ }
+}
+
+} // namespace internal
+} // namespace io
+} // namespace facebook::velox::filesystems::arrow
diff --git a/velox/external/hdfs/ArrowHdfsInternal.h b/velox/external/hdfs/ArrowHdfsInternal.h
new file mode 100644
index 000000000000..1ee57516fd17
--- /dev/null
+++ b/velox/external/hdfs/ArrowHdfsInternal.h
@@ -0,0 +1,260 @@
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Adapted from Apache Arrow.
+
+#pragma once
+
+#include
+#include
+
+#include "hdfs.h"
+
+#include
+
+#include "arrow/util/visibility.h"
+#include "arrow/util/windows_compatibility.h" // IWYU pragma: keep
+
+using std::size_t;
+
+struct hdfsBuilder;
+
+namespace facebook::velox::filesystems::arrow {
+
+namespace io {
+namespace internal {
+
+#ifndef _WIN32
+typedef void* LibraryHandle;
+#else
+typedef HINSTANCE LibraryHandle;
+#endif
+
+// NOTE(wesm): cpplint does not like use of short and other imprecise C types
+struct LibHdfsShim {
+ LibraryHandle handle;
+
+ hdfsBuilder* (*hdfsNewBuilder)(void);
+ void (*hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn);
+ void (*hdfsBuilderSetNameNodePort)(hdfsBuilder* bld, tPort port);
+ void (*hdfsBuilderSetUserName)(hdfsBuilder* bld, const char* userName);
+ void (*hdfsBuilderSetKerbTicketCachePath)(
+ hdfsBuilder* bld,
+ const char* kerbTicketCachePath);
+ void (*hdfsBuilderSetForceNewInstance)(hdfsBuilder* bld);
+ hdfsFS (*hdfsBuilderConnect)(hdfsBuilder* bld);
+ int (*hdfsBuilderConfSetStr)(
+ hdfsBuilder* bld,
+ const char* key,
+ const char* val);
+
+ int (*hdfsDisconnect)(hdfsFS fs);
+
+ hdfsFile (*hdfsOpenFile)(
+ hdfsFS fs,
+ const char* path,
+ int flags,
+ int bufferSize,
+ short replication,
+ tSize blocksize); // NOLINT
+
+ int (*hdfsCloseFile)(hdfsFS fs, hdfsFile file);
+ int (*hdfsExists)(hdfsFS fs, const char* path);
+ int (*hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos);
+ tOffset (*hdfsTell)(hdfsFS fs, hdfsFile file);
+ tSize (*hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
+ tSize (*hdfsPread)(
+ hdfsFS fs,
+ hdfsFile file,
+ tOffset position,
+ void* buffer,
+ tSize length);
+ tSize (
+ *hdfsWrite)(hdfsFS fs, hdfsFile file, const void* buffer, tSize length);
+ int (*hdfsFlush)(hdfsFS fs, hdfsFile file);
+ int (*hdfsAvailable)(hdfsFS fs, hdfsFile file);
+ int (*hdfsCopy)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
+ int (*hdfsMove)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
+ int (*hdfsDelete)(hdfsFS fs, const char* path, int recursive);
+ int (*hdfsRename)(hdfsFS fs, const char* oldPath, const char* newPath);
+ char* (*hdfsGetWorkingDirectory)(hdfsFS fs, char* buffer, size_t bufferSize);
+ int (*hdfsSetWorkingDirectory)(hdfsFS fs, const char* path);
+ int (*hdfsCreateDirectory)(hdfsFS fs, const char* path);
+ int (*hdfsSetReplication)(hdfsFS fs, const char* path, int16_t replication);
+ hdfsFileInfo* (
+ *hdfsListDirectory)(hdfsFS fs, const char* path, int* numEntries);
+ hdfsFileInfo* (*hdfsGetPathInfo)(hdfsFS fs, const char* path);
+ void (*hdfsFreeFileInfo)(hdfsFileInfo* hdfsFileInfo, int numEntries);
+ char*** (*hdfsGetHosts)(
+ hdfsFS fs,
+ const char* path,
+ tOffset start,
+ tOffset length);
+ void (*hdfsFreeHosts)(char*** blockHosts);
+ tOffset (*hdfsGetDefaultBlockSize)(hdfsFS fs);
+ tOffset (*hdfsGetCapacity)(hdfsFS fs);
+ tOffset (*hdfsGetUsed)(hdfsFS fs);
+ int (*hdfsChown)(
+ hdfsFS fs,
+ const char* path,
+ const char* owner,
+ const char* group);
+ int (*hdfsChmod)(hdfsFS fs, const char* path, short mode); // NOLINT
+ int (*hdfsUtime)(hdfsFS fs, const char* path, tTime mtime, tTime atime);
+ char* (*hdfsGetLastExceptionStackTrace)();
+ char* (*hdfsGetLastExceptionRootCause)();
+
+ void Initialize() {
+ this->handle = nullptr;
+ this->hdfsNewBuilder = nullptr;
+ this->hdfsBuilderSetNameNode = nullptr;
+ this->hdfsBuilderSetNameNodePort = nullptr;
+ this->hdfsBuilderSetUserName = nullptr;
+ this->hdfsBuilderSetKerbTicketCachePath = nullptr;
+ this->hdfsBuilderSetForceNewInstance = nullptr;
+ this->hdfsBuilderConfSetStr = nullptr;
+ this->hdfsBuilderConnect = nullptr;
+ this->hdfsDisconnect = nullptr;
+ this->hdfsOpenFile = nullptr;
+ this->hdfsCloseFile = nullptr;
+ this->hdfsExists = nullptr;
+ this->hdfsSeek = nullptr;
+ this->hdfsTell = nullptr;
+ this->hdfsRead = nullptr;
+ this->hdfsPread = nullptr;
+ this->hdfsWrite = nullptr;
+ this->hdfsFlush = nullptr;
+ this->hdfsAvailable = nullptr;
+ this->hdfsCopy = nullptr;
+ this->hdfsMove = nullptr;
+ this->hdfsDelete = nullptr;
+ this->hdfsRename = nullptr;
+ this->hdfsGetWorkingDirectory = nullptr;
+ this->hdfsSetWorkingDirectory = nullptr;
+ this->hdfsCreateDirectory = nullptr;
+ this->hdfsSetReplication = nullptr;
+ this->hdfsListDirectory = nullptr;
+ this->hdfsGetPathInfo = nullptr;
+ this->hdfsFreeFileInfo = nullptr;
+ this->hdfsGetHosts = nullptr;
+ this->hdfsFreeHosts = nullptr;
+ this->hdfsGetDefaultBlockSize = nullptr;
+ this->hdfsGetCapacity = nullptr;
+ this->hdfsGetUsed = nullptr;
+ this->hdfsChown = nullptr;
+ this->hdfsChmod = nullptr;
+ this->hdfsUtime = nullptr;
+ this->hdfsGetLastExceptionStackTrace = nullptr;
+ this->hdfsGetLastExceptionRootCause = nullptr;
+ }
+
+ hdfsBuilder* NewBuilder(void);
+
+ void BuilderSetNameNode(hdfsBuilder* bld, const char* nn);
+
+ void BuilderSetNameNodePort(hdfsBuilder* bld, tPort port);
+
+ void BuilderSetUserName(hdfsBuilder* bld, const char* userName);
+
+ void BuilderSetKerbTicketCachePath(
+ hdfsBuilder* bld,
+ const char* kerbTicketCachePath);
+
+ void BuilderSetForceNewInstance(hdfsBuilder* bld);
+
+ int BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val);
+
+ hdfsFS BuilderConnect(hdfsBuilder* bld);
+
+ int Disconnect(hdfsFS fs);
+
+ hdfsFile OpenFile(
+ hdfsFS fs,
+ const char* path,
+ int flags,
+ int bufferSize,
+ short replication,
+ tSize blocksize); // NOLINT
+
+ int CloseFile(hdfsFS fs, hdfsFile file);
+
+ int Exists(hdfsFS fs, const char* path);
+
+ int Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos);
+
+ tOffset Tell(hdfsFS fs, hdfsFile file);
+
+ tSize Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
+
+ bool HasPread();
+
+ tSize
+ Pread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length);
+
+ tSize Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize length);
+
+ int Flush(hdfsFS fs, hdfsFile file);
+
+ int Available(hdfsFS fs, hdfsFile file);
+
+ int Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
+
+ int Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
+
+ int Delete(hdfsFS fs, const char* path, int recursive);
+
+ int Rename(hdfsFS fs, const char* oldPath, const char* newPath);
+
+ char* GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize);
+
+ int SetWorkingDirectory(hdfsFS fs, const char* path);
+
+ int MakeDirectory(hdfsFS fs, const char* path);
+
+ int SetReplication(hdfsFS fs, const char* path, int16_t replication);
+
+ hdfsFileInfo* ListDirectory(hdfsFS fs, const char* path, int* numEntries);
+
+ hdfsFileInfo* GetPathInfo(hdfsFS fs, const char* path);
+
+ void FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries);
+
+ char*** GetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length);
+
+ void FreeHosts(char*** blockHosts);
+
+ tOffset GetDefaultBlockSize(hdfsFS fs);
+ tOffset GetCapacity(hdfsFS fs);
+
+ tOffset GetUsed(hdfsFS fs);
+
+ int Chown(hdfsFS fs, const char* path, const char* owner, const char* group);
+
+ int Chmod(hdfsFS fs, const char* path, short mode); // NOLINT
+
+ int Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
+
+ char* GetLastExceptionRootCause();
+
+ ::arrow::Status GetRequiredSymbols();
+};
+
+// TODO(wesm): Remove these exports when we are linking statically
+ARROW_EXPORT ::arrow::Status ConnectLibHdfs(LibHdfsShim** driver);
+
+} // namespace internal
+} // namespace io
+} // namespace facebook::velox::filesystems::arrow
diff --git a/velox/external/hdfs/CMakeLists.txt b/velox/external/hdfs/CMakeLists.txt
new file mode 100644
index 000000000000..a35f728224c7
--- /dev/null
+++ b/velox/external/hdfs/CMakeLists.txt
@@ -0,0 +1,19 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if(${VELOX_ENABLE_HDFS})
+ velox_add_library(velox_external_hdfs ArrowHdfsInternal.cpp)
+ velox_link_libraries(
+ velox_external_hdfs
+ PRIVATE
+ arrow)
+endif()
diff --git a/velox/external/hdfs/hdfs.h b/velox/external/hdfs/hdfs.h
new file mode 100644
index 000000000000..d9c3a058ec22
--- /dev/null
+++ b/velox/external/hdfs/hdfs.h
@@ -0,0 +1,1078 @@
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Adapted from Apache Hadoop.
+
+#ifndef LIBHDFS_HDFS_H
+#define LIBHDFS_HDFS_H
+
+#include /* for EINTERNAL, etc. */
+#include /* for O_RDONLY, O_WRONLY */
+#include /* for uint64_t, etc. */
+#include /* for time_t */
+
+/*
+ * Support export of DLL symbols during libhdfs build, and import of DLL symbols
+ * during client application build. A client application may optionally define
+ * symbol LIBHDFS_DLL_IMPORT in its build. This is not strictly required, but
+ * the compiler can produce more efficient code with it.
+ */
+#ifdef WIN32
+#ifdef LIBHDFS_DLL_EXPORT
+#define LIBHDFS_EXTERNAL __declspec(dllexport)
+#elif LIBHDFS_DLL_IMPORT
+#define LIBHDFS_EXTERNAL __declspec(dllimport)
+#else
+#define LIBHDFS_EXTERNAL
+#endif
+#else
+#ifdef LIBHDFS_DLL_EXPORT
+#define LIBHDFS_EXTERNAL __attribute__((visibility("default")))
+#elif LIBHDFS_DLL_IMPORT
+#define LIBHDFS_EXTERNAL __attribute__((visibility("default")))
+#else
+#define LIBHDFS_EXTERNAL
+#endif
+#endif
+
+#ifndef O_RDONLY
+#define O_RDONLY 1
+#endif
+
+#ifndef O_WRONLY
+#define O_WRONLY 2
+#endif
+
+#ifndef EINTERNAL
+#define EINTERNAL 255
+#endif
+
+#define ELASTIC_BYTE_BUFFER_POOL_CLASS \
+ "org/apache/hadoop/io/ElasticByteBufferPool"
+
+/** All APIs set errno to meaningful values */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+/**
+ * Some utility decls used in libhdfs.
+ */
+struct hdfsBuilder;
+typedef int32_t tSize; /// size of data for read/write io ops
+typedef time_t tTime; /// time type in seconds
+typedef int64_t tOffset; /// offset within the file
+typedef uint16_t tPort; /// port
+typedef enum tObjectKind {
+ kObjectKindFile = 'F',
+ kObjectKindDirectory = 'D',
+} tObjectKind;
+struct hdfsStreamBuilder;
+
+/**
+ * The C reflection of org.apache.org.hadoop.FileSystem .
+ */
+struct hdfs_internal;
+typedef struct hdfs_internal* hdfsFS;
+
+struct hdfsFile_internal;
+typedef struct hdfsFile_internal* hdfsFile;
+
+struct hadoopRzOptions;
+
+struct hadoopRzBuffer;
+
+/**
+ * Determine if a file is open for read.
+ *
+ * @param file The HDFS file
+ * @return 1 if the file is open for read; 0 otherwise
+ */
+LIBHDFS_EXTERNAL
+int hdfsFileIsOpenForRead(hdfsFile file);
+
+/**
+ * Determine if a file is open for write.
+ *
+ * @param file The HDFS file
+ * @return 1 if the file is open for write; 0 otherwise
+ */
+LIBHDFS_EXTERNAL
+int hdfsFileIsOpenForWrite(hdfsFile file);
+
+struct hdfsReadStatistics {
+ uint64_t totalBytesRead;
+ uint64_t totalLocalBytesRead;
+ uint64_t totalShortCircuitBytesRead;
+ uint64_t totalZeroCopyBytesRead;
+};
+
+/**
+ * Get read statistics about a file. This is only applicable to files
+ * opened for reading.
+ *
+ * @param file The HDFS file
+ * @param stats (out parameter) on a successful return, the read
+ * statistics. Unchanged otherwise. You must free the
+ * returned statistics with hdfsFileFreeReadStatistics.
+ * @return 0 if the statistics were successfully returned,
+ * -1 otherwise. On a failure, please check errno against
+ * ENOTSUP. webhdfs, LocalFilesystem, and so forth may
+ * not support read statistics.
+ */
+LIBHDFS_EXTERNAL
+int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics** stats);
+
+/**
+ * @param stats HDFS read statistics for a file.
+ *
+ * @return the number of remote bytes read.
+ */
+LIBHDFS_EXTERNAL
+int64_t hdfsReadStatisticsGetRemoteBytesRead(
+ const struct hdfsReadStatistics* stats);
+
+/**
+ * Clear the read statistics for a file.
+ *
+ * @param file The file to clear the read statistics of.
+ *
+ * @return 0 on success; the error code otherwise.
+ * EINVAL: the file is not open for reading.
+ * ENOTSUP: the file does not support clearing the read
+ * statistics.
+ * Errno will also be set to this code on failure.
+ */
+LIBHDFS_EXTERNAL
+int hdfsFileClearReadStatistics(hdfsFile file);
+
+/**
+ * Free some HDFS read statistics.
+ *
+ * @param stats The HDFS read statistics to free.
+ */
+LIBHDFS_EXTERNAL
+void hdfsFileFreeReadStatistics(struct hdfsReadStatistics* stats);
+
+struct hdfsHedgedReadMetrics {
+ uint64_t hedgedReadOps;
+ uint64_t hedgedReadOpsWin;
+ uint64_t hedgedReadOpsInCurThread;
+};
+
+/**
+ * Get cluster wide hedged read metrics.
+ *
+ * @param fs The configured filesystem handle
+ * @param metrics (out parameter) on a successful return, the hedged read
+ * metrics. Unchanged otherwise. You must free the returned
+ * statistics with hdfsFreeHedgedReadMetrics.
+ * @return 0 if the metrics were successfully returned, -1 otherwise.
+ * On a failure, please check errno against
+ * ENOTSUP. webhdfs, LocalFilesystem, and so forth may
+ * not support hedged read metrics.
+ */
+LIBHDFS_EXTERNAL
+int hdfsGetHedgedReadMetrics(hdfsFS fs, struct hdfsHedgedReadMetrics** metrics);
+
+/**
+ * Free HDFS Hedged read metrics.
+ *
+ * @param metrics The HDFS Hedged read metrics to free
+ */
+LIBHDFS_EXTERNAL
+void hdfsFreeHedgedReadMetrics(struct hdfsHedgedReadMetrics* metrics);
+
+/**
+ * hdfsConnectAsUser - Connect to a hdfs file system as a specific user
+ * Connect to the hdfs.
+ * @param nn The NameNode. See hdfsBuilderSetNameNode for details.
+ * @param port The port on which the server is listening.
+ * @param user the user name (this is hadoop domain user). Or NULL is equivelant
+ * to hhdfsConnect(host, port)
+ * @return Returns a handle to the filesystem or NULL on error.
+ * @deprecated Use hdfsBuilderConnect instead.
+ */
+LIBHDFS_EXTERNAL
+hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char* user);
+
+/**
+ * hdfsConnect - Connect to a hdfs file system.
+ * Connect to the hdfs.
+ * @param nn The NameNode. See hdfsBuilderSetNameNode for details.
+ * @param port The port on which the server is listening.
+ * @return Returns a handle to the filesystem or NULL on error.
+ * @deprecated Use hdfsBuilderConnect instead.
+ */
+LIBHDFS_EXTERNAL
+hdfsFS hdfsConnect(const char* nn, tPort port);
+
+/**
+ * hdfsConnect - Connect to an hdfs file system.
+ *
+ * Forces a new instance to be created
+ *
+ * @param nn The NameNode. See hdfsBuilderSetNameNode for details.
+ * @param port The port on which the server is listening.
+ * @param user The user name to use when connecting
+ * @return Returns a handle to the filesystem or NULL on error.
+ * @deprecated Use hdfsBuilderConnect instead.
+ */
+LIBHDFS_EXTERNAL
+hdfsFS
+hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char* user);
+
+/**
+ * hdfsConnect - Connect to an hdfs file system.
+ *
+ * Forces a new instance to be created
+ *
+ * @param nn The NameNode. See hdfsBuilderSetNameNode for details.
+ * @param port The port on which the server is listening.
+ * @return Returns a handle to the filesystem or NULL on error.
+ * @deprecated Use hdfsBuilderConnect instead.
+ */
+LIBHDFS_EXTERNAL
+hdfsFS hdfsConnectNewInstance(const char* nn, tPort port);
+
+/**
+ * Connect to HDFS using the parameters defined by the builder.
+ *
+ * The HDFS builder will be freed, whether or not the connection was
+ * successful.
+ *
+ * Every successful call to hdfsBuilderConnect should be matched with a call
+ * to hdfsDisconnect, when the hdfsFS is no longer needed.
+ *
+ * @param bld The HDFS builder
+ * @return Returns a handle to the filesystem, or NULL on error.
+ */
+LIBHDFS_EXTERNAL
+hdfsFS hdfsBuilderConnect(struct hdfsBuilder* bld);
+
+/**
+ * Create an HDFS builder.
+ *
+ * @return The HDFS builder, or NULL on error.
+ */
+LIBHDFS_EXTERNAL
+struct hdfsBuilder* hdfsNewBuilder(void);
+
+/**
+ * Force the builder to always create a new instance of the FileSystem,
+ * rather than possibly finding one in the cache.
+ *
+ * @param bld The HDFS builder
+ */
+LIBHDFS_EXTERNAL
+void hdfsBuilderSetForceNewInstance(struct hdfsBuilder* bld);
+
+/**
+ * Set the HDFS NameNode to connect to.
+ *
+ * @param bld The HDFS builder
+ * @param nn The NameNode to use.
+ *
+ * If the string given is 'default', the default NameNode
+ * configuration will be used (from the XML configuration files)
+ *
+ * If NULL is given, a LocalFileSystem will be created.
+ *
+ * If the string starts with a protocol type such as file:// or
+ * hdfs://, this protocol type will be used. If not, the
+ * hdfs:// protocol type will be used.
+ *
+ * You may specify a NameNode port in the usual way by
+ * passing a string of the format hdfs://:.
+ * Alternately, you may set the port with
+ * hdfsBuilderSetNameNodePort. However, you must not pass the
+ * port in two different ways.
+ */
+LIBHDFS_EXTERNAL
+void hdfsBuilderSetNameNode(struct hdfsBuilder* bld, const char* nn);
+
+/**
+ * Set the port of the HDFS NameNode to connect to.
+ *
+ * @param bld The HDFS builder
+ * @param port The port.
+ */
+LIBHDFS_EXTERNAL
+void hdfsBuilderSetNameNodePort(struct hdfsBuilder* bld, tPort port);
+
+/**
+ * Set the username to use when connecting to the HDFS cluster.
+ *
+ * @param bld The HDFS builder
+ * @param userName The user name. The string will be shallow-copied.
+ */
+LIBHDFS_EXTERNAL
+void hdfsBuilderSetUserName(struct hdfsBuilder* bld, const char* userName);
+
+/**
+ * Set the path to the Kerberos ticket cache to use when connecting to
+ * the HDFS cluster.
+ *
+ * @param bld The HDFS builder
+ * @param kerbTicketCachePath The Kerberos ticket cache path. The string
+ * will be shallow-copied.
+ */
+LIBHDFS_EXTERNAL
+void hdfsBuilderSetKerbTicketCachePath(
+ struct hdfsBuilder* bld,
+ const char* kerbTicketCachePath);
+
+/**
+ * Free an HDFS builder.
+ *
+ * It is normally not necessary to call this function since
+ * hdfsBuilderConnect frees the builder.
+ *
+ * @param bld The HDFS builder
+ */
+LIBHDFS_EXTERNAL
+void hdfsFreeBuilder(struct hdfsBuilder* bld);
+
+/**
+ * Set a configuration string for an HdfsBuilder.
+ *
+ * @param key The key to set.
+ * @param val The value, or NULL to set no value.
+ * This will be shallow-copied. You are responsible for
+ * ensuring that it remains valid until the builder is
+ * freed.
+ *
+ * @return 0 on success; nonzero error code otherwise.
+ */
+LIBHDFS_EXTERNAL
+int hdfsBuilderConfSetStr(
+ struct hdfsBuilder* bld,
+ const char* key,
+ const char* val);
+
+/**
+ * Get a configuration string.
+ *
+ * @param key The key to find
+ * @param val (out param) The value. This will be set to NULL if the
+ * key isn't found. You must free this string with
+ * hdfsConfStrFree.
+ *
+ * @return 0 on success; nonzero error code otherwise.
+ * Failure to find the key is not an error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsConfGetStr(const char* key, char** val);
+
+/**
+ * Get a configuration integer.
+ *
+ * @param key The key to find
+ * @param val (out param) The value. This will NOT be changed if the
+ * key isn't found.
+ *
+ * @return 0 on success; nonzero error code otherwise.
+ * Failure to find the key is not an error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsConfGetInt(const char* key, int32_t* val);
+
+/**
+ * Free a configuration string found with hdfsConfGetStr.
+ *
+ * @param val A configuration string obtained from hdfsConfGetStr
+ */
+LIBHDFS_EXTERNAL
+void hdfsConfStrFree(char* val);
+
+/**
+ * hdfsDisconnect - Disconnect from the hdfs file system.
+ * Disconnect from hdfs.
+ * @param fs The configured filesystem handle.
+ * @return Returns 0 on success, -1 on error.
+ * Even if there is an error, the resources associated with the
+ * hdfsFS will be freed.
+ */
+LIBHDFS_EXTERNAL
+int hdfsDisconnect(hdfsFS fs);
+
+/**
+ * hdfsOpenFile - Open a hdfs file in given mode.
+ * @deprecated Use the hdfsStreamBuilder functions instead.
+ * This function does not support setting block sizes bigger than 2 GB.
+ *
+ * @param fs The configured filesystem handle.
+ * @param path The full path to the file.
+ * @param flags - an | of bits/fcntl.h file flags - supported flags are
+ * O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT),
+ * O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR ||
+ * (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP.
+ * @param bufferSize Size of buffer for read/write - pass 0 if you want
+ * to use the default configured values.
+ * @param replication Block replication - pass 0 if you want to use
+ * the default configured values.
+ * @param blocksize Size of block - pass 0 if you want to use the
+ * default configured values. Note that if you want a block size bigger
+ * than 2 GB, you must use the hdfsStreamBuilder API rather than this
+ * deprecated function.
+ * @return Returns the handle to the open file or NULL on error.
+ */
+LIBHDFS_EXTERNAL
+hdfsFile hdfsOpenFile(
+ hdfsFS fs,
+ const char* path,
+ int flags,
+ int bufferSize,
+ short replication,
+ tSize blocksize);
+
+/**
+ * hdfsStreamBuilderAlloc - Allocate an HDFS stream builder.
+ *
+ * @param fs The configured filesystem handle.
+ * @param path The full path to the file. Will be deep-copied.
+ * @param flags The open flags, as in hdfsOpenFile.
+ * @return Returns the hdfsStreamBuilder, or NULL on error.
+ */
+LIBHDFS_EXTERNAL
+struct hdfsStreamBuilder*
+hdfsStreamBuilderAlloc(hdfsFS fs, const char* path, int flags);
+
+/**
+ * hdfsStreamBuilderFree - Free an HDFS file builder.
+ *
+ * It is normally not necessary to call this function since
+ * hdfsStreamBuilderBuild frees the builder.
+ *
+ * @param bld The hdfsStreamBuilder to free.
+ */
+LIBHDFS_EXTERNAL
+void hdfsStreamBuilderFree(struct hdfsStreamBuilder* bld);
+
+/**
+ * hdfsStreamBuilderSetBufferSize - Set the stream buffer size.
+ *
+ * @param bld The hdfs stream builder.
+ * @param bufferSize The buffer size to set.
+ *
+ * @return 0 on success, or -1 on error. Errno will be set on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsStreamBuilderSetBufferSize(
+ struct hdfsStreamBuilder* bld,
+ int32_t bufferSize);
+
+/**
+ * hdfsStreamBuilderSetReplication - Set the replication for the stream.
+ * This is only relevant for output streams, which will create new blocks.
+ *
+ * @param bld The hdfs stream builder.
+ * @param replication The replication to set.
+ *
+ * @return 0 on success, or -1 on error. Errno will be set on error.
+ * If you call this on an input stream builder, you will get
+ * EINVAL, because this configuration is not relevant to input
+ * streams.
+ */
+LIBHDFS_EXTERNAL
+int hdfsStreamBuilderSetReplication(
+ struct hdfsStreamBuilder* bld,
+ int16_t replication);
+
+/**
+ * hdfsStreamBuilderSetDefaultBlockSize - Set the default block size for
+ * the stream. This is only relevant for output streams, which will create
+ * new blocks.
+ *
+ * @param bld The hdfs stream builder.
+ * @param defaultBlockSize The default block size to set.
+ *
+ * @return 0 on success, or -1 on error. Errno will be set on error.
+ * If you call this on an input stream builder, you will get
+ * EINVAL, because this configuration is not relevant to input
+ * streams.
+ */
+LIBHDFS_EXTERNAL
+int hdfsStreamBuilderSetDefaultBlockSize(
+ struct hdfsStreamBuilder* bld,
+ int64_t defaultBlockSize);
+
+/**
+ * hdfsStreamBuilderBuild - Build the stream by calling open or create.
+ *
+ * @param bld The hdfs stream builder. This pointer will be freed, whether
+ * or not the open succeeds.
+ *
+ * @return the stream pointer on success, or NULL on error. Errno will be
+ * set on error.
+ */
+LIBHDFS_EXTERNAL
+hdfsFile hdfsStreamBuilderBuild(struct hdfsStreamBuilder* bld);
+
+/**
+ * hdfsTruncateFile - Truncate a hdfs file to given lenght.
+ * @param fs The configured filesystem handle.
+ * @param path The full path to the file.
+ * @param newlength The size the file is to be truncated to
+ * @return 1 if the file has been truncated to the desired newlength
+ * and is immediately available to be reused for write operations
+ * such as append.
+ * 0 if a background process of adjusting the length of the last
+ * block has been started, and clients should wait for it to
+ * complete before proceeding with further file updates.
+ * -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength);
+
+/**
+ * hdfsUnbufferFile - Reduce the buffering done on a file.
+ *
+ * @param file The file to unbuffer.
+ * @return 0 on success
+ * ENOTSUP if the file does not support unbuffering
+ * Errno will also be set to this value.
+ */
+LIBHDFS_EXTERNAL
+int hdfsUnbufferFile(hdfsFile file);
+
+/**
+ * hdfsCloseFile - Close an open file.
+ * @param fs The configured filesystem handle.
+ * @param file The file handle.
+ * @return Returns 0 on success, -1 on error.
+ * On error, errno will be set appropriately.
+ * If the hdfs file was valid, the memory associated with it will
+ * be freed at the end of this call, even if there was an I/O
+ * error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsCloseFile(hdfsFS fs, hdfsFile file);
+
+/**
+ * hdfsExists - Checks if a given path exsits on the filesystem
+ * @param fs The configured filesystem handle.
+ * @param path The path to look for
+ * @return Returns 0 on success, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsExists(hdfsFS fs, const char* path);
+
+/**
+ * hdfsSeek - Seek to given offset in file.
+ * This works only for files opened in read-only mode.
+ * @param fs The configured filesystem handle.
+ * @param file The file handle.
+ * @param desiredPos Offset into the file to seek into.
+ * @return Returns 0 on success, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos);
+
+/**
+ * hdfsTell - Get the current offset in the file, in bytes.
+ * @param fs The configured filesystem handle.
+ * @param file The file handle.
+ * @return Current offset, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+tOffset hdfsTell(hdfsFS fs, hdfsFile file);
+
+/**
+ * hdfsRead - Read data from an open file.
+ * @param fs The configured filesystem handle.
+ * @param file The file handle.
+ * @param buffer The buffer to copy read bytes into.
+ * @param length The length of the buffer.
+ * @return On success, a positive number indicating how many bytes
+ * were read.
+ * On end-of-file, 0.
+ * On error, -1. Errno will be set to the error code.
+ * Just like the POSIX read function, hdfsRead will return -1
+ * and set errno to EINTR if data is temporarily unavailable,
+ * but we are not yet at the end of the file.
+ */
+LIBHDFS_EXTERNAL
+tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
+
+/**
+ * hdfsPread - Positional read of data from an open file.
+ * @param fs The configured filesystem handle.
+ * @param file The file handle.
+ * @param position Position from which to read
+ * @param buffer The buffer to copy read bytes into.
+ * @param length The length of the buffer.
+ * @return See hdfsRead
+ */
+LIBHDFS_EXTERNAL
+tSize hdfsPread(
+ hdfsFS fs,
+ hdfsFile file,
+ tOffset position,
+ void* buffer,
+ tSize length);
+
+/**
+ * hdfsWrite - Write data into an open file.
+ * @param fs The configured filesystem handle.
+ * @param file The file handle.
+ * @param buffer The data.
+ * @param length The no. of bytes to write.
+ * @return Returns the number of bytes written, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length);
+
+/**
+ * hdfsWrite - Flush the data.
+ * @param fs The configured filesystem handle.
+ * @param file The file handle.
+ * @return Returns 0 on success, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsFlush(hdfsFS fs, hdfsFile file);
+
+/**
+ * hdfsHFlush - Flush out the data in client's user buffer. After the
+ * return of this call, new readers will see the data.
+ * @param fs configured filesystem handle
+ * @param file file handle
+ * @return 0 on success, -1 on error and sets errno
+ */
+LIBHDFS_EXTERNAL
+int hdfsHFlush(hdfsFS fs, hdfsFile file);
+
+/**
+ * hdfsHSync - Similar to posix fsync, Flush out the data in client's
+ * user buffer. all the way to the disk device (but the disk may have
+ * it in its cache).
+ * @param fs configured filesystem handle
+ * @param file file handle
+ * @return 0 on success, -1 on error and sets errno
+ */
+LIBHDFS_EXTERNAL
+int hdfsHSync(hdfsFS fs, hdfsFile file);
+
+/**
+ * hdfsAvailable - Number of bytes that can be read from this
+ * input stream without blocking.
+ * @param fs The configured filesystem handle.
+ * @param file The file handle.
+ * @return Returns available bytes; -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsAvailable(hdfsFS fs, hdfsFile file);
+
+/**
+ * hdfsCopy - Copy file from one filesystem to another.
+ * @param srcFS The handle to source filesystem.
+ * @param src The path of source file.
+ * @param dstFS The handle to destination filesystem.
+ * @param dst The path of destination file.
+ * @return Returns 0 on success, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
+
+/**
+ * hdfsMove - Move file from one filesystem to another.
+ * @param srcFS The handle to source filesystem.
+ * @param src The path of source file.
+ * @param dstFS The handle to destination filesystem.
+ * @param dst The path of destination file.
+ * @return Returns 0 on success, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
+
+/**
+ * hdfsDelete - Delete file.
+ * @param fs The configured filesystem handle.
+ * @param path The path of the file.
+ * @param recursive if path is a directory and set to
+ * non-zero, the directory is deleted else throws an exception. In
+ * case of a file the recursive argument is irrelevant.
+ * @return Returns 0 on success, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsDelete(hdfsFS fs, const char* path, int recursive);
+
+/**
+ * hdfsRename - Rename file.
+ * @param fs The configured filesystem handle.
+ * @param oldPath The path of the source file.
+ * @param newPath The path of the destination file.
+ * @return Returns 0 on success, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath);
+
+/**
+ * hdfsGetWorkingDirectory - Get the current working directory for
+ * the given filesystem.
+ * @param fs The configured filesystem handle.
+ * @param buffer The user-buffer to copy path of cwd into.
+ * @param bufferSize The length of user-buffer.
+ * @return Returns buffer, NULL on error.
+ */
+LIBHDFS_EXTERNAL
+char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize);
+
+/**
+ * hdfsSetWorkingDirectory - Set the working directory. All relative
+ * paths will be resolved relative to it.
+ * @param fs The configured filesystem handle.
+ * @param path The path of the new 'cwd'.
+ * @return Returns 0 on success, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsSetWorkingDirectory(hdfsFS fs, const char* path);
+
+/**
+ * hdfsCreateDirectory - Make the given file and all non-existent
+ * parents into directories.
+ * @param fs The configured filesystem handle.
+ * @param path The path of the directory.
+ * @return Returns 0 on success, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsCreateDirectory(hdfsFS fs, const char* path);
+
+/**
+ * hdfsSetReplication - Set the replication of the specified
+ * file to the supplied value
+ * @param fs The configured filesystem handle.
+ * @param path The path of the file.
+ * @return Returns 0 on success, -1 on error.
+ */
+LIBHDFS_EXTERNAL
+int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication);
+
+/**
+ * hdfsFileInfo - Information about a file/directory.
+ */
+typedef struct {
+ tObjectKind mKind; /* file or directory */
+ char* mName; /* the name of the file */
+ tTime mLastMod; /* the last modification time for the file in seconds */
+ tOffset mSize; /* the size of the file in bytes */
+ short mReplication; /* the count of replicas */
+ tOffset mBlockSize; /* the block size for the file */
+ char* mOwner; /* the owner of the file */
+ char* mGroup; /* the group associated with the file */
+ short mPermissions; /* the permissions associated with the file */
+ tTime mLastAccess; /* the last access time for the file in seconds */
+} hdfsFileInfo;
+
+/**
+ * hdfsListDirectory - Get list of files/directories for a given
+ * directory-path. hdfsFreeFileInfo should be called to deallocate memory.
+ * @param fs The configured filesystem handle.
+ * @param path The path of the directory.
+ * @param numEntries Set to the number of files/directories in path.
+ * @return Returns a dynamically-allocated array of hdfsFileInfo
+ * objects; NULL on error or empty directory.
+ * errno is set to non-zero on error or zero on success.
+ */
+LIBHDFS_EXTERNAL
+hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int* numEntries);
+
+/**
+ * hdfsGetPathInfo - Get information about a path as a (dynamically
+ * allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be
+ * called when the pointer is no longer needed.
+ * @param fs The configured filesystem handle.
+ * @param path The path of the file.
+ * @return Returns a dynamically-allocated hdfsFileInfo object;
+ * NULL on error.
+ */
+LIBHDFS_EXTERNAL
+hdfsFileInfo* hdfsGetPathInfo(hdfsFS fs, const char* path);
+
+/**
+ * hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields)
+ * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo
+ * objects.
+ * @param numEntries The size of the array.
+ */
+LIBHDFS_EXTERNAL
+void hdfsFreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries);
+
+/**
+ * hdfsFileIsEncrypted: determine if a file is encrypted based on its
+ * hdfsFileInfo.
+ * @return -1 if there was an error (errno will be set), 0 if the file is
+ * not encrypted, 1 if the file is encrypted.
+ */
+LIBHDFS_EXTERNAL
+int hdfsFileIsEncrypted(hdfsFileInfo* hdfsFileInfo);
+
+/**
+ * hdfsGetHosts - Get hostnames where a particular block (determined by
+ * pos & blocksize) of a file is stored. The last element in the array
+ * is NULL. Due to replication, a single block could be present on
+ * multiple hosts.
+ * @param fs The configured filesystem handle.
+ * @param path The path of the file.
+ * @param start The start of the block.
+ * @param length The length of the block.
+ * @return Returns a dynamically-allocated 2-d array of blocks-hosts;
+ * NULL on error.
+ */
+LIBHDFS_EXTERNAL
+char***
+hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length);
+
+/**
+ * hdfsFreeHosts - Free up the structure returned by hdfsGetHosts
+ * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo
+ * objects.
+ * @param numEntries The size of the array.
+ */
+LIBHDFS_EXTERNAL
+void hdfsFreeHosts(char*** blockHosts);
+
+/**
+ * hdfsGetDefaultBlockSize - Get the default blocksize.
+ *
+ * @param fs The configured filesystem handle.
+ * @deprecated Use hdfsGetDefaultBlockSizeAtPath instead.
+ *
+ * @return Returns the default blocksize, or -1 on error.
+ */
+LIBHDFS_EXTERNAL
+tOffset hdfsGetDefaultBlockSize(hdfsFS fs);
+
+/**
+ * hdfsGetDefaultBlockSizeAtPath - Get the default blocksize at the
+ * filesystem indicated by a given path.
+ *
+ * @param fs The configured filesystem handle.
+ * @param path The given path will be used to locate the actual
+ * filesystem. The full path does not have to exist.
+ *
+ * @return Returns the default blocksize, or -1 on error.
+ */
+LIBHDFS_EXTERNAL
+tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char* path);
+
+/**
+ * hdfsGetCapacity - Return the raw capacity of the filesystem.
+ * @param fs The configured filesystem handle.
+ * @return Returns the raw-capacity; -1 on error.
+ */
+LIBHDFS_EXTERNAL
+tOffset hdfsGetCapacity(hdfsFS fs);
+
+/**
+ * hdfsGetUsed - Return the total raw size of all files in the filesystem.
+ * @param fs The configured filesystem handle.
+ * @return Returns the total-size; -1 on error.
+ */
+LIBHDFS_EXTERNAL
+tOffset hdfsGetUsed(hdfsFS fs);
+
+/**
+ * Change the user and/or group of a file or directory.
+ *
+ * @param fs The configured filesystem handle.
+ * @param path the path to the file or directory
+ * @param owner User string. Set to NULL for 'no change'
+ * @param group Group string. Set to NULL for 'no change'
+ * @return 0 on success else -1
+ */
+LIBHDFS_EXTERNAL
+int hdfsChown(
+ hdfsFS fs,
+ const char* path,
+ const char* owner,
+ const char* group);
+
+/**
+ * hdfsChmod
+ * @param fs The configured filesystem handle.
+ * @param path the path to the file or directory
+ * @param mode the bitmask to set it to
+ * @return 0 on success else -1
+ */
+LIBHDFS_EXTERNAL
+int hdfsChmod(hdfsFS fs, const char* path, short mode);
+
+/**
+ * hdfsUtime
+ * @param fs The configured filesystem handle.
+ * @param path the path to the file or directory
+ * @param mtime new modification time or -1 for no change
+ * @param atime new access time or -1 for no change
+ * @return 0 on success else -1
+ */
+LIBHDFS_EXTERNAL
+int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
+
+/**
+ * Allocate a zero-copy options structure.
+ *
+ * You must free all options structures allocated with this function using
+ * hadoopRzOptionsFree.
+ *
+ * @return A zero-copy options structure, or NULL if one could
+ * not be allocated. If NULL is returned, errno will
+ * contain the error number.
+ */
+LIBHDFS_EXTERNAL
+struct hadoopRzOptions* hadoopRzOptionsAlloc(void);
+
+/**
+ * Determine whether we should skip checksums in read0.
+ *
+ * @param opts The options structure.
+ * @param skip Nonzero to skip checksums sometimes; zero to always
+ * check them.
+ *
+ * @return 0 on success; -1 plus errno on failure.
+ */
+LIBHDFS_EXTERNAL
+int hadoopRzOptionsSetSkipChecksum(struct hadoopRzOptions* opts, int skip);
+
+/**
+ * Set the ByteBufferPool to use with read0.
+ *
+ * @param opts The options structure.
+ * @param className If this is NULL, we will not use any
+ * ByteBufferPool. If this is non-NULL, it will be
+ * treated as the name of the pool class to use.
+ * For example, you can use
+ * ELASTIC_BYTE_BUFFER_POOL_CLASS.
+ *
+ * @return 0 if the ByteBufferPool class was found and
+ * instantiated;
+ * -1 plus errno otherwise.
+ */
+LIBHDFS_EXTERNAL
+int hadoopRzOptionsSetByteBufferPool(
+ struct hadoopRzOptions* opts,
+ const char* className);
+
+/**
+ * Free a hadoopRzOptionsFree structure.
+ *
+ * @param opts The options structure to free.
+ * Any associated ByteBufferPool will also be freed.
+ */
+LIBHDFS_EXTERNAL
+void hadoopRzOptionsFree(struct hadoopRzOptions* opts);
+
+/**
+ * Perform a byte buffer read.
+ * If possible, this will be a zero-copy (mmap) read.
+ *
+ * @param file The file to read from.
+ * @param opts An options structure created by hadoopRzOptionsAlloc.
+ * @param maxLength The maximum length to read. We may read fewer bytes
+ * than this length.
+ *
+ * @return On success, we will return a new hadoopRzBuffer.
+ * This buffer will continue to be valid and readable
+ * until it is released by readZeroBufferFree. Failure to
+ * release a buffer will lead to a memory leak.
+ * You can access the data within the hadoopRzBuffer with
+ * hadoopRzBufferGet. If you have reached EOF, the data
+ * within the hadoopRzBuffer will be NULL. You must still
+ * free hadoopRzBuffer instances containing NULL.
+ *
+ * On failure, we will return NULL plus an errno code.
+ * errno = EOPNOTSUPP indicates that we could not do a
+ * zero-copy read, and there was no ByteBufferPool
+ * supplied.
+ */
+LIBHDFS_EXTERNAL
+struct hadoopRzBuffer*
+hadoopReadZero(hdfsFile file, struct hadoopRzOptions* opts, int32_t maxLength);
+
+/**
+ * Determine the length of the buffer returned from readZero.
+ *
+ * @param buffer a buffer returned from readZero.
+ * @return the length of the buffer.
+ */
+LIBHDFS_EXTERNAL
+int32_t hadoopRzBufferLength(const struct hadoopRzBuffer* buffer);
+
+/**
+ * Get a pointer to the raw buffer returned from readZero.
+ *
+ * To find out how many bytes this buffer contains, call
+ * hadoopRzBufferLength.
+ *
+ * @param buffer a buffer returned from readZero.
+ * @return a pointer to the start of the buffer. This will be
+ * NULL when end-of-file has been reached.
+ */
+LIBHDFS_EXTERNAL
+const void* hadoopRzBufferGet(const struct hadoopRzBuffer* buffer);
+
+/**
+ * Release a buffer obtained through readZero.
+ *
+ * @param file The hdfs stream that created this buffer. This must be
+ * the same stream you called hadoopReadZero on.
+ * @param buffer The buffer to release.
+ */
+LIBHDFS_EXTERNAL
+void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer* buffer);
+
+/**
+ * Get the last exception root cause that happened in the context of the
+ * current thread, i.e. the thread that called into libHDFS.
+ *
+ * The pointer returned by this function is guaranteed to be valid until
+ * the next call into libHDFS by the current thread.
+ * Users of this function should not free the pointer.
+ *
+ * A NULL will be returned if no exception information could be retrieved
+ * for the previous call.
+ *
+ * @return The root cause as a C-string.
+ */
+LIBHDFS_EXTERNAL
+char* hdfsGetLastExceptionRootCause();
+
+/**
+ * Get the last exception stack trace that happened in the context of the
+ * current thread, i.e. the thread that called into libHDFS.
+ *
+ * The pointer returned by this function is guaranteed to be valid until
+ * the next call into libHDFS by the current thread.
+ * Users of this function should not free the pointer.
+ *
+ * A NULL will be returned if no exception information could be retrieved
+ * for the previous call.
+ *
+ * @return The stack trace as a C-string.
+ */
+LIBHDFS_EXTERNAL
+char* hdfsGetLastExceptionStackTrace();
+
+#ifdef __cplusplus
+}
+#endif
+
+#undef LIBHDFS_EXTERNAL
+#endif /*LIBHDFS_HDFS_H*/
+
+/**
+ * vim: ts=4: sw=4: et
+ */