From 10cdf6fd8ca372b11779431ef77a177551c01700 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Thu, 24 Oct 2024 10:08:20 -0700 Subject: [PATCH] Support jvm version libhdfs in velox (#9835) Summary: Currently, Gluten will throw hdfs connection failures when executing queries if their HDFS system employs Kerberos authentication and Viewfs support. This is due to the fact that the existing libhdfs3 API does not support Kerberos authentication, whereas the JVM version of libhdfs is capable of invoking APIs that support Kerberos authentication. If the user's system has the `HADOOP_HOME `environment variable set, the JVM version of libhdfs will be used during the compilation of Gluten; if not set, the default libhdfs3 will be used instead. Pull Request resolved: https://github.com/facebookincubator/velox/pull/9835 Reviewed By: xiaoxmeng, DanielHunte Differential Revision: D64872596 Pulled By: pedroerp fbshipit-source-id: 995ee73a9f8474f8a6467b926a56246073fee75e --- .github/workflows/linux-build.yml | 1 + CMakeLists.txt | 8 +- NOTICE.txt | 100 ++ velox/CMakeLists.txt | 1 + .../hive/storage_adapters/hdfs/CMakeLists.txt | 7 +- .../storage_adapters/hdfs/HdfsFileSystem.cpp | 33 +- .../storage_adapters/hdfs/HdfsFileSystem.h | 5 + .../storage_adapters/hdfs/HdfsReadFile.cpp | 70 +- .../hive/storage_adapters/hdfs/HdfsReadFile.h | 47 +- .../storage_adapters/hdfs/HdfsWriteFile.cpp | 31 +- .../storage_adapters/hdfs/HdfsWriteFile.h | 8 +- .../hdfs/RegisterHdfsFileSystem.cpp | 6 +- .../hdfs/tests/CMakeLists.txt | 30 +- .../hdfs/tests/HdfsFileSystemTest.cpp | 132 +- .../hdfs/tests/InsertIntoHdfsTest.cpp | 4 +- velox/external/hdfs/ArrowHdfsInternal.cpp | 641 ++++++++++ velox/external/hdfs/ArrowHdfsInternal.h | 260 ++++ velox/external/hdfs/CMakeLists.txt | 19 + velox/external/hdfs/hdfs.h | 1078 +++++++++++++++++ 19 files changed, 2342 insertions(+), 139 deletions(-) create mode 100644 velox/external/hdfs/ArrowHdfsInternal.cpp create mode 100644 velox/external/hdfs/ArrowHdfsInternal.h create mode 100644 velox/external/hdfs/CMakeLists.txt create mode 100644 velox/external/hdfs/hdfs.h diff --git a/.github/workflows/linux-build.yml b/.github/workflows/linux-build.yml index b4e26478702b..b897a345f562 100644 --- a/.github/workflows/linux-build.yml +++ b/.github/workflows/linux-build.yml @@ -134,6 +134,7 @@ jobs: LIBHDFS3_CONF: "${{ github.workspace }}/scripts/hdfs-client.xml" working-directory: _build/release run: | + export CLASSPATH=`/usr/local/hadoop/bin/hdfs classpath --glob` ctest -j 8 --output-on-failure --no-tests=error ubuntu-debug: diff --git a/CMakeLists.txt b/CMakeLists.txt index 2408c8c89d05..1dd401d3e147 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -259,11 +259,9 @@ if(VELOX_ENABLE_ABFS) endif() if(VELOX_ENABLE_HDFS) - find_library( - LIBHDFS3 - NAMES libhdfs3.so libhdfs3.dylib - HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) - add_definitions(-DVELOX_ENABLE_HDFS3) + add_definitions(-DVELOX_ENABLE_HDFS) + # JVM libhdfs requires arrow dependency. + set(VELOX_ENABLE_ARROW ON) endif() if(VELOX_ENABLE_PARQUET) diff --git a/NOTICE.txt b/NOTICE.txt index 58655beb3ca7..8b812aa41ab2 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -9,3 +9,103 @@ This product includes software from the QT project (BSD, 3-clause). This product includes software from HowardHinnant's date library (MIT License). * https://github.com/HowardHinnant/date/tree/master + +This product includes software from the The Arrow project. +* https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/arrow/io/hdfs_internal.h +* https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/arrow/io/hdfs_internal.cc +Which contain the following NOTICE file: +------- + Apache Arrow + Copyright 2016-2024 The Apache Software Foundation + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + This product includes software from the SFrame project (BSD, 3-clause). + * Copyright (C) 2015 Dato, Inc. + * Copyright (c) 2009 Carnegie Mellon University. + This product includes software from the Feather project (Apache 2.0) + https://github.com/wesm/feather + This product includes software from the DyND project (BSD 2-clause) + https://github.com/libdynd + This product includes software from the LLVM project + * distributed under the University of Illinois Open Source + This product includes software from the google-lint project + * Copyright (c) 2009 Google Inc. All rights reserved. + This product includes software from the mman-win32 project + * Copyright https://code.google.com/p/mman-win32/ + * Licensed under the MIT License; + This product includes software from the LevelDB project + * Copyright (c) 2011 The LevelDB Authors. All rights reserved. + * Use of this source code is governed by a BSD-style license that can be + * Moved from Kudu http://github.com/cloudera/kudu + This product includes software from the CMake project + * Copyright 2001-2009 Kitware, Inc. + * Copyright 2012-2014 Continuum Analytics, Inc. + * All rights reserved. + This product includes software from https://github.com/matthew-brett/multibuild (BSD 2-clause) + * Copyright (c) 2013-2016, Matt Terry and Matthew Brett; all rights reserved. + This product includes software from the Ibis project (Apache 2.0) + * Copyright (c) 2015 Cloudera, Inc. + * https://github.com/cloudera/ibis + This product includes software from Dremio (Apache 2.0) + * Copyright (C) 2017-2018 Dremio Corporation + * https://github.com/dremio/dremio-oss + This product includes software from Google Guava (Apache 2.0) + * Copyright (C) 2007 The Guava Authors + * https://github.com/google/guava + This product include software from CMake (BSD 3-Clause) + * CMake - Cross Platform Makefile Generator + * Copyright 2000-2019 Kitware, Inc. and Contributors + The web site includes files generated by Jekyll. + -------------------------------------------------------------------------------- + This product includes code from Apache Kudu, which includes the following in + its NOTICE file: + Apache Kudu + Copyright 2016 The Apache Software Foundation + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + Portions of this software were developed at + Cloudera, Inc (http://www.cloudera.com/). + -------------------------------------------------------------------------------- + This product includes code from Apache ORC, which includes the following in + its NOTICE file: + Apache ORC + Copyright 2013-2019 The Apache Software Foundation + This product includes software developed by The Apache Software + Foundation (http://www.apache.org/). + This product includes software developed by Hewlett-Packard: + (c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P +------- + +This product includes software from the The Hadoop project. +* https://github.com/apache/hadoop/blob/release-3.3.0-RC0/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h +Which contains the following NOTICE file: +---- + Apache Hadoop + Copyright 2006 and onwards The Apache Software Foundation. + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + Export Control Notice + --------------------- + This distribution includes cryptographic software. The country in + which you currently reside may have restrictions on the import, + possession, use, and/or re-export to another country, of + encryption software. BEFORE using any encryption software, please + check your country's laws, regulations and policies concerning the + import, possession, or use, and re-export of encryption software, to + see if this is permitted. See for more + information. + The U.S. Government Department of Commerce, Bureau of Industry and + Security (BIS), has classified this software as Export Commodity + Control Number (ECCN) 5D002.C.1, which includes information security + software using or performing cryptographic functions with asymmetric + algorithms. The form and manner of this Apache Software Foundation + distribution makes it eligible for export under the License Exception + ENC Technology Software Unrestricted (TSU) exception (see the BIS + Export Administration Regulations, Section 740.13) for both object + code and source code. + The following provides more details on the included cryptographic software: + This software uses the SSL libraries from the Jetty project written + by mortbay.org. + Hadoop Yarn Server Web Proxy uses the BouncyCastle Java + cryptography APIs written by the Legion of the Bouncy Castle Inc. +---- diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index 00c969ccce7b..06ae8bf1c053 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -24,6 +24,7 @@ add_subdirectory(row) add_subdirectory(flag_definitions) add_subdirectory(external/date) add_subdirectory(external/md5) +add_subdirectory(external/hdfs) # # examples depend on expression diff --git a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt index 6c1e84aec404..44aa7be3489c 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt @@ -23,7 +23,12 @@ if(VELOX_ENABLE_HDFS) HdfsFileSystem.cpp HdfsReadFile.cpp HdfsWriteFile.cpp) - velox_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd) + velox_link_libraries( + velox_hdfs + velox_external_hdfs + velox_dwio_common + Folly::folly + xsimd) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp index 4e7b9ddc0ec5..9d3f6e30a67a 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp @@ -14,11 +14,11 @@ * limitations under the License. */ #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" -#include #include #include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h" +#include "velox/external/hdfs/ArrowHdfsInternal.h" namespace facebook::velox::filesystems { std::string_view HdfsFileSystem::kScheme("hdfs://"); @@ -29,21 +29,27 @@ class HdfsFileSystem::Impl { explicit Impl( const config::ConfigBase* config, const HdfsServiceEndpoint& endpoint) { - auto builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(builder, endpoint.host.c_str()); - hdfsBuilderSetNameNodePort(builder, atoi(endpoint.port.data())); - hdfsClient_ = hdfsBuilderConnect(builder); - hdfsFreeBuilder(builder); + auto status = filesystems::arrow::io::internal::ConnectLibHdfs(&driver_); + if (!status.ok()) { + LOG(ERROR) << "ConnectLibHdfs failed "; + } + + // connect to HDFS with the builder object + hdfsBuilder* builder = driver_->NewBuilder(); + driver_->BuilderSetNameNode(builder, endpoint.host.c_str()); + driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data())); + driver_->BuilderSetForceNewInstance(builder); + hdfsClient_ = driver_->BuilderConnect(builder); VELOX_CHECK_NOT_NULL( hdfsClient_, "Unable to connect to HDFS: {}, got error: {}.", endpoint.identity(), - hdfsGetLastError()); + driver_->GetLastExceptionRootCause()); } ~Impl() { LOG(INFO) << "Disconnecting HDFS file system"; - int disconnectResult = hdfsDisconnect(hdfsClient_); + int disconnectResult = driver_->Disconnect(hdfsClient_); if (disconnectResult != 0) { LOG(WARNING) << "hdfs disconnect failure in HdfsReadFile close: " << errno; @@ -54,8 +60,13 @@ class HdfsFileSystem::Impl { return hdfsClient_; } + filesystems::arrow::io::internal::LibHdfsShim* hdfsShim() { + return driver_; + } + private: hdfsFS hdfsClient_; + filesystems::arrow::io::internal::LibHdfsShim* driver_; }; HdfsFileSystem::HdfsFileSystem( @@ -79,13 +90,15 @@ std::unique_ptr HdfsFileSystem::openFileForRead( path.remove_prefix(index); } - return std::make_unique(impl_->hdfsClient(), path); + return std::make_unique( + impl_->hdfsShim(), impl_->hdfsClient(), path); } std::unique_ptr HdfsFileSystem::openFileForWrite( std::string_view path, const FileOptions& /*unused*/) { - return std::make_unique(impl_->hdfsClient(), path); + return std::make_unique( + impl_->hdfsShim(), impl_->hdfsClient(), path); } bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) { diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h index 25602a470f16..1ae7d28b94e5 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h @@ -15,7 +15,12 @@ */ #include "velox/common/file/FileSystems.h" +namespace velox::filesystems::arrow::io::internal { +class LibHdfsShim; +} + namespace facebook::velox::filesystems { + struct HdfsServiceEndpoint { HdfsServiceEndpoint(const std::string& hdfsHost, const std::string& hdfsPort) : host(hdfsHost), port(hdfsPort) {} diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp index dedc2bb4a4c9..d48dd373d344 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp @@ -16,20 +16,65 @@ #include "HdfsReadFile.h" #include -#include +#include "velox/external/hdfs/ArrowHdfsInternal.h" namespace facebook::velox { -HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path) - : hdfsClient_(hdfs), filePath_(path) { - fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data()); +struct HdfsFile { + filesystems::arrow::io::internal::LibHdfsShim* driver_; + hdfsFS client_; + hdfsFile handle_; + + HdfsFile() : driver_(nullptr), client_(nullptr), handle_(nullptr) {} + ~HdfsFile() { + if (handle_ && driver_->CloseFile(client_, handle_) == -1) { + LOG(ERROR) << "Unable to close file, errno: " << errno; + } + } + + void open( + filesystems::arrow::io::internal::LibHdfsShim* driver, + hdfsFS client, + const std::string& path) { + driver_ = driver; + client_ = client; + handle_ = driver->OpenFile(client, path.data(), O_RDONLY, 0, 0, 0); + VELOX_CHECK_NOT_NULL( + handle_, + "Unable to open file {}. got error: {}", + path, + driver_->GetLastExceptionRootCause()); + } + + void seek(uint64_t offset) const { + VELOX_CHECK_EQ( + driver_->Seek(client_, handle_, offset), + 0, + "Cannot seek through HDFS file, error is : {}", + driver_->GetLastExceptionRootCause()); + } + + int32_t read(char* pos, uint64_t length) const { + auto bytesRead = driver_->Read(client_, handle_, pos, length); + VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal."); + return bytesRead; + } +}; + +HdfsReadFile::HdfsReadFile( + filesystems::arrow::io::internal::LibHdfsShim* driver, + hdfsFS hdfs, + const std::string_view path) + : driver_(driver), hdfsClient_(hdfs), filePath_(path) { + fileInfo_ = driver_->GetPathInfo(hdfsClient_, filePath_.data()); if (fileInfo_ == nullptr) { - auto error = hdfsGetLastError(); + auto error = fmt::format( + "FileNotFoundException: Path {} does not exist.", filePath_); auto errMsg = fmt::format( "Unable to get file path info for file: {}. got error: {}", filePath_, error); - if (std::strstr(error, "FileNotFoundException") != nullptr) { + if (error.find("FileNotFoundException") != std::string::npos) { VELOX_FILE_NOT_FOUND_ERROR(errMsg); } VELOX_FAIL(errMsg); @@ -38,19 +83,22 @@ HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path) HdfsReadFile::~HdfsReadFile() { // should call hdfsFreeFileInfo to avoid memory leak - hdfsFreeFileInfo(fileInfo_, 1); + if (fileInfo_) { + driver_->FreeFileInfo(fileInfo_, 1); + } } void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos) const { checkFileReadParameters(offset, length); - if (!file_->handle_) { - file_->open(hdfsClient_, filePath_); + folly::ThreadLocal file; + if (!file->handle_) { + file->open(driver_, hdfsClient_, filePath_); } - file_->seek(offset); + file->seek(offset); uint64_t totalBytesRead = 0; while (totalBytesRead < length) { - auto bytesRead = file_->read(pos, length - totalBytesRead); + auto bytesRead = file->read(pos, length - totalBytesRead); totalBytesRead += bytesRead; pos += bytesRead; } diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h index 1d531956f0ee..b63c2dd933dd 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h @@ -14,53 +14,24 @@ * limitations under the License. */ -#include #include "velox/common/file/File.h" +#include "velox/external/hdfs/hdfs.h" namespace facebook::velox { -struct HdfsFile { - hdfsFS client_; - hdfsFile handle_; - - HdfsFile() : client_(nullptr), handle_(nullptr) {} - ~HdfsFile() { - if (handle_ && hdfsCloseFile(client_, handle_) == -1) { - LOG(ERROR) << "Unable to close file, errno: " << errno; - } - } - - void open(hdfsFS client, const std::string& path) { - client_ = client; - handle_ = hdfsOpenFile(client, path.data(), O_RDONLY, 0, 0, 0); - VELOX_CHECK_NOT_NULL( - handle_, - "Unable to open file {}. got error: {}", - path, - hdfsGetLastError()); - } - - void seek(uint64_t offset) const { - VELOX_CHECK_EQ( - hdfsSeek(client_, handle_, offset), - 0, - "Cannot seek through HDFS file, error is : {}", - std::string(hdfsGetLastError())); - } - - int32_t read(char* pos, uint64_t length) const { - auto bytesRead = hdfsRead(client_, handle_, pos, length); - VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal."); - return bytesRead; - } -}; +namespace filesystems::arrow::io::internal { +class LibHdfsShim; +} /** * Implementation of hdfs read file. */ class HdfsReadFile final : public ReadFile { public: - explicit HdfsReadFile(hdfsFS hdfs, std::string_view path); + explicit HdfsReadFile( + filesystems::arrow::io::internal::LibHdfsShim* driver, + hdfsFS hdfs, + std::string_view path); ~HdfsReadFile() override; std::string_view pread(uint64_t offset, uint64_t length, void* buf) @@ -86,10 +57,10 @@ class HdfsReadFile final : public ReadFile { void preadInternal(uint64_t offset, uint64_t length, char* pos) const; void checkFileReadParameters(uint64_t offset, uint64_t length) const; + filesystems::arrow::io::internal::LibHdfsShim* driver_; hdfsFS hdfsClient_; hdfsFileInfo* fileInfo_; std::string filePath_; - folly::ThreadLocal file_; }; } // namespace facebook::velox diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp index 60f98a88c972..be668a3133e1 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp @@ -15,23 +15,24 @@ */ #include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h" -#include +#include "velox/external/hdfs/ArrowHdfsInternal.h" namespace facebook::velox { HdfsWriteFile::HdfsWriteFile( + filesystems::arrow::io::internal::LibHdfsShim* driver, hdfsFS hdfsClient, std::string_view path, int bufferSize, short replication, int blockSize) - : hdfsClient_(hdfsClient), filePath_(path) { + : driver_(driver), hdfsClient_(hdfsClient), filePath_(path) { auto pos = filePath_.rfind("/"); auto parentDir = filePath_.substr(0, pos + 1); - if (hdfsExists(hdfsClient_, parentDir.c_str()) == -1) { - hdfsCreateDirectory(hdfsClient_, parentDir.c_str()); + if (driver_->Exists(hdfsClient_, parentDir.c_str()) == -1) { + driver_->MakeDirectory(hdfsClient_, parentDir.c_str()); } - hdfsFile_ = hdfsOpenFile( + hdfsFile_ = driver_->OpenFile( hdfsClient_, filePath_.c_str(), O_WRONLY, @@ -42,7 +43,7 @@ HdfsWriteFile::HdfsWriteFile( hdfsFile_, "Failed to open hdfs file: {}, with error: {}", filePath_, - std::string(hdfsGetLastError())); + driver_->GetLastExceptionRootCause()); } HdfsWriteFile::~HdfsWriteFile() { @@ -52,12 +53,12 @@ HdfsWriteFile::~HdfsWriteFile() { } void HdfsWriteFile::close() { - int success = hdfsCloseFile(hdfsClient_, hdfsFile_); + int success = driver_->CloseFile(hdfsClient_, hdfsFile_); VELOX_CHECK_EQ( success, 0, "Failed to close hdfs file: {}", - std::string(hdfsGetLastError())); + driver_->GetLastExceptionRootCause()); hdfsFile_ = nullptr; } @@ -66,9 +67,9 @@ void HdfsWriteFile::flush() { hdfsFile_, "Cannot flush HDFS file because file handle is null, file path: {}", filePath_); - int success = hdfsFlush(hdfsClient_, hdfsFile_); + int success = driver_->Flush(hdfsClient_, hdfsFile_); VELOX_CHECK_EQ( - success, 0, "Hdfs flush error: {}", std::string(hdfsGetLastError())); + success, 0, "Hdfs flush error: {}", driver_->GetLastExceptionRootCause()); } void HdfsWriteFile::append(std::string_view data) { @@ -79,20 +80,20 @@ void HdfsWriteFile::append(std::string_view data) { hdfsFile_, "Cannot append to HDFS file because file handle is null, file path: {}", filePath_); - int64_t totalWrittenBytes = - hdfsWrite(hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size()); + int64_t totalWrittenBytes = driver_->Write( + hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size()); VELOX_CHECK_EQ( totalWrittenBytes, data.size(), "Write failure in HDFSWriteFile::append {}", - std::string(hdfsGetLastError())); + driver_->GetLastExceptionRootCause()); } uint64_t HdfsWriteFile::size() const { - auto fileInfo = hdfsGetPathInfo(hdfsClient_, filePath_.c_str()); + auto fileInfo = driver_->GetPathInfo(hdfsClient_, filePath_.c_str()); uint64_t size = fileInfo->mSize; // should call hdfsFreeFileInfo to avoid memory leak - hdfsFreeFileInfo(fileInfo, 1); + driver_->FreeFileInfo(fileInfo, 1); return size; } diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h index 7ed1819cd61f..fb311b1a6c3d 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h @@ -15,11 +15,15 @@ */ #pragma once -#include #include "velox/common/file/File.h" +#include "velox/external/hdfs/hdfs.h" namespace facebook::velox { +namespace filesystems::arrow::io::internal { +class LibHdfsShim; +} + /// Implementation of hdfs write file. Nothing written to the file should be /// read back until it is closed. class HdfsWriteFile : public WriteFile { @@ -34,6 +38,7 @@ class HdfsWriteFile : public WriteFile { /// @param blockSize Size of block - pass 0 if you want to use the /// default configured values. HdfsWriteFile( + filesystems::arrow::io::internal::LibHdfsShim* driver, hdfsFS hdfsClient, std::string_view path, int bufferSize = 0, @@ -55,6 +60,7 @@ class HdfsWriteFile : public WriteFile { void close() override; private: + filesystems::arrow::io::internal::LibHdfsShim* driver_; /// The configured hdfs filesystem handle. hdfsFS hdfsClient_; /// The hdfs file handle for write. diff --git a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp index bdff4a7a4fdc..1f23179f0a72 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#ifdef VELOX_ENABLE_HDFS3 +#ifdef VELOX_ENABLE_HDFS #include "folly/concurrency/ConcurrentHashMap.h" #include "velox/common/config/Config.h" @@ -25,7 +25,7 @@ namespace facebook::velox::filesystems { -#ifdef VELOX_ENABLE_HDFS3 +#ifdef VELOX_ENABLE_HDFS std::mutex mtx; std::function #include -#include #include #include #include "HdfsMiniCluster.h" #include "gtest/gtest.h" +#include "velox/common/base/Exceptions.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h" #include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" #include "velox/core/QueryConfig.h" #include "velox/exec/tests/utils/TempFilePath.h" +#include "velox/external/hdfs/ArrowHdfsInternal.h" + +#include using namespace facebook::velox; +using filesystems::arrow::io::internal::LibHdfsShim; + constexpr int kOneMB = 1 << 20; static const std::string destinationPath = "/test_file.txt"; static const std::string hdfsPort = "7878"; static const std::string localhost = "localhost"; static const std::string fullDestinationPath = "hdfs://" + localhost + ":" + hdfsPort + destinationPath; -static const std::string simpleDestinationPath = "hdfs:///" + destinationPath; +static const std::string simpleDestinationPath = "hdfs://" + destinationPath; static const std::unordered_map configurationValues( {{"hive.hdfs.host", localhost}, {"hive.hdfs.port", hdfsPort}}); @@ -55,6 +60,7 @@ class HdfsFileSystemTest : public testing::Test { if (!miniCluster->isRunning()) { miniCluster->start(); } + filesystems::registerHdfsFileSystem(); } static void TearDownTestSuite() { @@ -119,6 +125,7 @@ void checkReadErrorMessages( } catch (VeloxException const& error) { EXPECT_THAT(error.message(), testing::HasSubstr(errorMessage)); } + try { auto buf = std::make_unique(8); readFile->pread(10 + kOneMB, endpoint, buf.get()); @@ -128,9 +135,18 @@ void checkReadErrorMessages( } } -void verifyFailures(hdfsFS hdfs) { - HdfsReadFile readFile(hdfs, destinationPath); - HdfsReadFile readFile2(hdfs, destinationPath); +bool checkMiniClusterStop(ReadFile* readFile, const std::string& errorMessage) { + try { + readFile->pread(0, 1); + return false; + } catch (const VeloxException& error) { + return error.message().find(errorMessage) != std::string::npos; + } +} + +void verifyFailures(LibHdfsShim* driver, hdfsFS hdfs) { + HdfsReadFile readFile(driver, hdfs, destinationPath); + HdfsReadFile readFile2(driver, hdfs, destinationPath); auto startPoint = 10 + kOneMB; auto size = 15 + kOneMB; auto endpoint = 10 + 2 * kOneMB; @@ -139,43 +155,60 @@ void verifyFailures(hdfsFS hdfs) { "(%d vs. %d) Cannot read HDFS file beyond its size: %d, offset: %d, end point: %d") % size % endpoint % size % startPoint % endpoint) .str(); - auto serverAddress = (boost::format("%s:%s") % localhost % hdfsPort).str(); + auto readFailErrorMessage = (boost::format( - "Unable to open file %s. got error: HdfsIOException: InputStreamImpl: cannot open file: %s.\t" - "Caused by: Hdfs::HdfsRpcException: HdfsFailoverException: Failed to invoke RPC call \"getBlockLocations\" on server \"%s\"\t\t" - "Caused by: HdfsNetworkConnectException: Connect to \"%s\" failed") % - destinationPath % destinationPath % serverAddress % serverAddress) - .str(); - auto builderErrorMessage = - (boost::format( - "Unable to connect to HDFS: %s, got error: Hdfs::HdfsRpcException: HdfsFailoverException: " - "Failed to invoke RPC call \"getFsStats\" on server \"%s\"\tCaused by: " - "HdfsNetworkConnectException: Connect to \"%s\" failed") % - serverAddress % serverAddress % serverAddress) + "Unable to open file %s. got error: ConnectException: Connection refused") % + destinationPath) .str(); + checkReadErrorMessages(&readFile, offsetErrorMessage, kOneMB); HdfsFileSystemTest::miniCluster->stop(); - checkReadErrorMessages(&readFile2, readFailErrorMessage, 1); - try { - auto config = std::make_shared( - std::unordered_map(configurationValues)); - filesystems::HdfsFileSystem hdfsFileSystem( - config, - filesystems::HdfsFileSystem::getServiceEndpoint( - simpleDestinationPath, config.get())); - FAIL() << "expected VeloxException"; - } catch (VeloxException const& error) { - EXPECT_THAT(error.message(), testing::HasSubstr(builderErrorMessage)); + + constexpr auto kMaxRetries = 10; + int retries = 0; + while (true) { + if (checkMiniClusterStop(&readFile2, readFailErrorMessage)) { + checkReadErrorMessages(&readFile2, readFailErrorMessage, 1); + break; + } else { + if (retries >= kMaxRetries) { + FAIL() << "MiniCluster doesn't stop after kMaxRetries try"; + } else { + sleep(1); + retries++; + } + } } } +hdfsFS connectHdfsDriver( + filesystems::arrow::io::internal::LibHdfsShim** driver) { + filesystems::arrow::io::internal::LibHdfsShim* libhdfs_shim; + auto status = filesystems::arrow::io::internal::ConnectLibHdfs(&libhdfs_shim); + if (!status.ok()) { + LOG(ERROR) << "ConnectLibHdfs failed "; + } + + // Connect to HDFS with the builder object + hdfsBuilder* builder = libhdfs_shim->NewBuilder(); + libhdfs_shim->BuilderSetNameNode(builder, localhost.c_str()); + libhdfs_shim->BuilderSetNameNodePort(builder, 7878); + libhdfs_shim->BuilderSetForceNewInstance(builder); + + auto hdfs = libhdfs_shim->BuilderConnect(builder); + VELOX_CHECK_NOT_NULL( + hdfs, + "Unable to connect to HDFS: {}, got error", + std::string(localhost.c_str()) + ":7878"); + *driver = libhdfs_shim; + return hdfs; +} + TEST_F(HdfsFileSystemTest, read) { - struct hdfsBuilder* builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(builder, localhost.c_str()); - hdfsBuilderSetNameNodePort(builder, 7878); - auto hdfs = hdfsBuilderConnect(builder); - HdfsReadFile readFile(hdfs, destinationPath); + filesystems::arrow::io::internal::LibHdfsShim* driver; + auto hdfs = connectHdfsDriver(&driver); + HdfsReadFile readFile(driver, hdfs, destinationPath); readData(&readFile); } @@ -224,6 +257,7 @@ TEST_F(HdfsFileSystemTest, missingFileViaFileSystem) { auto config = std::make_shared( std::unordered_map(configurationValues)); auto hdfsFileSystem = filesystems::getFileSystem(fullDestinationPath, config); + VELOX_ASSERT_RUNTIME_THROW_CODE( hdfsFileSystem->openFileForRead( "hdfs://localhost:7777/path/that/does/not/exist"), @@ -271,11 +305,9 @@ TEST_F(HdfsFileSystemTest, missingPort) { TEST_F(HdfsFileSystemTest, missingFileViaReadFile) { try { - struct hdfsBuilder* builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(builder, localhost.c_str()); - hdfsBuilderSetNameNodePort(builder, std::stoi(hdfsPort)); - auto hdfs = hdfsBuilderConnect(builder); - HdfsReadFile readFile(hdfs, "/path/that/does/not/exist"); + filesystems::arrow::io::internal::LibHdfsShim* driver; + auto hdfs = connectHdfsDriver(&driver); + HdfsReadFile readFile(driver, hdfs, "/path/that/does/not/exist"); FAIL() << "expected VeloxException"; } catch (VeloxException const& error) { EXPECT_THAT( @@ -288,13 +320,13 @@ TEST_F(HdfsFileSystemTest, missingFileViaReadFile) { TEST_F(HdfsFileSystemTest, schemeMatching) { try { auto fs = std::dynamic_pointer_cast( - filesystems::getFileSystem("/", nullptr)); + filesystems::getFileSystem("file://", nullptr)); FAIL() << "expected VeloxException"; } catch (VeloxException const& error) { EXPECT_THAT( error.message(), testing::HasSubstr( - "No registered file system matched with file path '/'")); + "No registered file system matched with file path 'file://'")); } auto fs = std::dynamic_pointer_cast( filesystems::getFileSystem(fullDestinationPath, nullptr)); @@ -327,11 +359,10 @@ TEST_F(HdfsFileSystemTest, removeNotSupported) { TEST_F(HdfsFileSystemTest, multipleThreadsWithReadFile) { startThreads = false; - struct hdfsBuilder* builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(builder, localhost.c_str()); - hdfsBuilderSetNameNodePort(builder, 7878); - auto hdfs = hdfsBuilderConnect(builder); - HdfsReadFile readFile(hdfs, destinationPath); + + filesystems::arrow::io::internal::LibHdfsShim* driver; + auto hdfs = connectHdfsDriver(&driver); + std::vector threads; std::mt19937 generator(std::random_device{}()); std::vector sleepTimesInMicroseconds = {0, 500, 50000}; @@ -339,13 +370,14 @@ TEST_F(HdfsFileSystemTest, multipleThreadsWithReadFile) { 0, sleepTimesInMicroseconds.size() - 1); for (int i = 0; i < 25; i++) { auto thread = std::thread( - [&readFile, &distribution, &generator, &sleepTimesInMicroseconds] { + [&driver, &hdfs, &distribution, &generator, &sleepTimesInMicroseconds] { int index = distribution(generator); while (!HdfsFileSystemTest::startThreads) { std::this_thread::yield(); } std::this_thread::sleep_for( std::chrono::microseconds(sleepTimesInMicroseconds[index])); + HdfsReadFile readFile(driver, hdfs, destinationPath); readData(&readFile); }); threads.emplace_back(std::move(thread)); @@ -441,9 +473,7 @@ TEST_F(HdfsFileSystemTest, writeWithParentDirNotExist) { } TEST_F(HdfsFileSystemTest, readFailures) { - struct hdfsBuilder* builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(builder, localhost.c_str()); - hdfsBuilderSetNameNodePort(builder, stoi(hdfsPort)); - auto hdfs = hdfsBuilderConnect(builder); - verifyFailures(hdfs); + filesystems::arrow::io::internal::LibHdfsShim* driver; + auto hdfs = connectHdfsDriver(&driver); + verifyFailures(driver, hdfs); } diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp b/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp index 2765d73142e2..bc0f56b9423d 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/InsertIntoHdfsTest.cpp @@ -17,6 +17,7 @@ #include #include "gtest/gtest.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" +#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" #include "velox/connectors/hive/storage_adapters/hdfs/tests/HdfsMiniCluster.h" #include "velox/exec/TableWriter.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" @@ -36,6 +37,7 @@ class InsertIntoHdfsTest : public HiveConnectorTestBase { public: void SetUp() override { HiveConnectorTestBase::SetUp(); + filesystems::registerHdfsFileSystem(); if (miniCluster == nullptr) { miniCluster = std::make_shared(); miniCluster->start(); @@ -104,7 +106,7 @@ TEST_F(InsertIntoHdfsTest, insertIntoHdfsTest) { plan = PlanBuilder().tableScan(rowType_).planNode(); auto splits = HiveConnectorTestBase::makeHiveConnectorSplits( - fmt::format("{}/{}", outputDirectory, writeFileName), + fmt::format("{}{}", outputDirectory, writeFileName), 1, dwio::common::FileFormat::DWRF); auto copy = AssertQueryBuilder(plan).split(splits[0]).copyResults(pool()); diff --git a/velox/external/hdfs/ArrowHdfsInternal.cpp b/velox/external/hdfs/ArrowHdfsInternal.cpp new file mode 100644 index 000000000000..c9a0c7dd81f0 --- /dev/null +++ b/velox/external/hdfs/ArrowHdfsInternal.cpp @@ -0,0 +1,641 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This shim interface to libhdfs (for runtime shared library loading) has been +// adapted from the SFrame project, released under the ASF-compatible 3-clause +// BSD license +// +// Using this required having the $JAVA_HOME and $HADOOP_HOME environment +// variables set, so that libjvm and libhdfs can be located easily + +// Copyright (C) 2015 Dato, Inc. +// All rights reserved. +// +// This software may be modified and distributed under the terms +// of the BSD license. See the LICENSE file for details. + +#include "ArrowHdfsInternal.h" + +#include +#include +#include +#include // IWYU pragma: keep +#include +#include +#include + +#ifndef _WIN32 +#include +#endif + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" + +namespace facebook::velox::filesystems::arrow { + +using ::arrow::internal::GetEnvVarNative; +using ::arrow::internal::PlatformFilename; +#ifdef _WIN32 +using internal::WinErrorMessage; +#endif + +namespace io { +namespace internal { + +namespace { + +void* GetLibrarySymbol(LibraryHandle handle, const char* symbol) { + if (handle == NULL) { + printf("handle is null\n"); + return NULL; + } + +#ifndef _WIN32 + return dlsym(handle, symbol); +#else + + void* ret = reinterpret_cast(GetProcAddress(, symbol)); + if (ret == NULL) { + printf("ret is null\n"); + // logstream(LOG_INFO) << "GetProcAddress error: " + // << get_last_err_str(GetLastError()) << std::endl; + } + return ret; +#endif +} + +#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME) \ + do { \ + if (!SHIM->SYMBOL_NAME) { \ + *reinterpret_cast(&SHIM->SYMBOL_NAME) = \ + GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ + } \ + if (!SHIM->SYMBOL_NAME) \ + return ::arrow::Status::IOError("Getting symbol " #SYMBOL_NAME \ + "failed"); \ + } while (0) + +#define GET_SYMBOL(SHIM, SYMBOL_NAME) \ + if (!SHIM->SYMBOL_NAME) { \ + *reinterpret_cast(&SHIM->SYMBOL_NAME) = \ + GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ + } + +LibraryHandle libjvm_handle = nullptr; + +// Helper functions for dlopens +::arrow::Result> get_potential_libjvm_paths(); +::arrow::Result> get_potential_libhdfs_paths(); +::arrow::Result try_dlopen( + const std::vector& potential_paths, + const char* name); + +::arrow::Result> MakeFilenameVector( + const std::vector& names) { + std::vector filenames(names.size()); + for (size_t i = 0; i < names.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(filenames[i], PlatformFilename::FromString(names[i])); + } + return filenames; +} + +void AppendEnvVarFilename( + const char* var_name, + std::vector* filenames) { + auto maybe_env_var = GetEnvVarNative(var_name); + if (maybe_env_var.ok()) { + filenames->emplace_back(std::move(*maybe_env_var)); + } +} + +void AppendEnvVarFilename( + const char* var_name, + const char* suffix, + std::vector* filenames) { + auto maybe_env_var = GetEnvVarNative(var_name); + if (maybe_env_var.ok()) { + auto maybe_env_var_with_suffix = + PlatformFilename(std::move(*maybe_env_var)).Join(suffix); + if (maybe_env_var_with_suffix.ok()) { + filenames->emplace_back(std::move(*maybe_env_var_with_suffix)); + } + } +} + +void InsertEnvVarFilename( + const char* var_name, + std::vector* filenames) { + auto maybe_env_var = GetEnvVarNative(var_name); + if (maybe_env_var.ok()) { + filenames->emplace( + filenames->begin(), PlatformFilename(std::move(*maybe_env_var))); + } +} + +::arrow::Result> get_potential_libhdfs_paths() { + std::vector potential_paths; + std::string file_name; + +// OS-specific file name +#ifdef _WIN32 + file_name = "hdfs.dll"; +#elif __APPLE__ + file_name = "libhdfs.dylib"; +#else + file_name = "libhdfs.so"; +#endif + + // Common paths + ARROW_ASSIGN_OR_RAISE(auto search_paths, MakeFilenameVector({"", "."})); + + // Path from environment variable + AppendEnvVarFilename("HADOOP_HOME", "lib/native", &search_paths); + AppendEnvVarFilename("ARROW_LIBHDFS_DIR", &search_paths); + + // All paths with file name + for (const auto& path : search_paths) { + ARROW_ASSIGN_OR_RAISE(auto full_path, path.Join(file_name)); + potential_paths.push_back(std::move(full_path)); + } + + return potential_paths; +} + +::arrow::Result> get_potential_libjvm_paths() { + std::vector potential_paths; + + std::vector search_prefixes; + std::vector search_suffixes; + std::string file_name; + +// From heuristics +#ifdef _WIN32 + ARROW_ASSIGN_OR_RAISE(search_prefixes, MakeFilenameVector({""})); + ARROW_ASSIGN_OR_RAISE( + search_suffixes, MakeFilenameVector({"/jre/bin/server", "/bin/server"})); + file_name = "jvm.dll"; +#elif __APPLE__ + ARROW_ASSIGN_OR_RAISE(search_prefixes, MakeFilenameVector({""})); + ARROW_ASSIGN_OR_RAISE( + search_suffixes, MakeFilenameVector({"/jre/lib/server", "/lib/server"})); + file_name = "libjvm.dylib"; + +// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are +// expecting users to set an environment variable +#else +#if defined(__aarch64__) + const std::string prefix_arch{"arm64"}; + const std::string suffix_arch{"aarch64"}; +#else + const std::string prefix_arch{"amd64"}; + const std::string suffix_arch{"amd64"}; +#endif + ARROW_ASSIGN_OR_RAISE( + search_prefixes, + MakeFilenameVector({ + "/usr/lib/jvm/default-java", // ubuntu / debian distros + "/usr/lib/jvm/java", // rhel6 + "/usr/lib/jvm", // centos6 + "/usr/lib64/jvm", // opensuse 13 + "/usr/local/lib/jvm/default-java", // alt ubuntu / debian distros + "/usr/local/lib/jvm/java", // alt rhel6 + "/usr/local/lib/jvm", // alt centos6 + "/usr/local/lib64/jvm", // alt opensuse 13 + "/usr/local/lib/jvm/java-8-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-8-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/local/lib/jvm/java-7-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-7-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/local/lib/jvm/java-6-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-6-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-7-oracle", // alt ubuntu + "/usr/lib/jvm/java-8-oracle", // alt ubuntu + "/usr/lib/jvm/java-6-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-7-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-8-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-6-oracle", // alt ubuntu + "/usr/lib/jvm/default", // alt centos + "/usr/java/latest" // alt centos + })); + ARROW_ASSIGN_OR_RAISE( + search_suffixes, + MakeFilenameVector( + {"", + "/lib/server", + "/jre/lib/" + suffix_arch + "/server", + "/lib/" + suffix_arch + "/server"})); + file_name = "libjvm.so"; +#endif + + // From direct environment variable + InsertEnvVarFilename("JAVA_HOME", &search_prefixes); + + // Generate cross product between search_prefixes, search_suffixes, and + // file_name + for (auto& prefix : search_prefixes) { + for (auto& suffix : search_suffixes) { + ARROW_ASSIGN_OR_RAISE(auto path, prefix.Join(suffix).Join(file_name)); + potential_paths.push_back(std::move(path)); + } + } + + return potential_paths; +} + +#ifndef _WIN32 +::arrow::Result try_dlopen( + const std::vector& potential_paths, + const char* name) { + std::string error_message = "unknown error"; + LibraryHandle handle; + + for (const auto& p : potential_paths) { + handle = dlopen(p.ToNative().c_str(), RTLD_NOW | RTLD_LOCAL); + + if (handle != NULL) { + return handle; + } else { + const char* err_msg = dlerror(); + if (err_msg != NULL) { + error_message = err_msg; + } + } + } + + return ::arrow::Status::IOError("Unable to load ", name, ": ", error_message); +} + +#else +::arrow::Result try_dlopen( + const std::vector& potential_paths, + const char* name) { + std::string error_message; + LibraryHandle handle; + + for (const auto& p : potential_paths) { + handle = LoadLibraryW(p.ToNative().c_str()); + if (handle != NULL) { + return handle; + } else { + error_message = WinErrorMessage(GetLastError()); + } + } + + return ::arrow::Status::IOError("Unable to load ", name, ": ", error_message); +} +#endif // _WIN32 + +LibHdfsShim libhdfs_shim; + +} // namespace + +::arrow::Status LibHdfsShim::GetRequiredSymbols() { + GET_SYMBOL_REQUIRED(this, hdfsNewBuilder); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNode); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNodePort); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetUserName); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetKerbTicketCachePath); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetForceNewInstance); + GET_SYMBOL_REQUIRED(this, hdfsBuilderConfSetStr); + GET_SYMBOL_REQUIRED(this, hdfsBuilderConnect); + GET_SYMBOL_REQUIRED(this, hdfsCreateDirectory); + GET_SYMBOL_REQUIRED(this, hdfsDelete); + GET_SYMBOL_REQUIRED(this, hdfsDisconnect); + GET_SYMBOL_REQUIRED(this, hdfsExists); + GET_SYMBOL_REQUIRED(this, hdfsFreeFileInfo); + GET_SYMBOL_REQUIRED(this, hdfsGetCapacity); + GET_SYMBOL_REQUIRED(this, hdfsGetUsed); + GET_SYMBOL_REQUIRED(this, hdfsGetPathInfo); + GET_SYMBOL_REQUIRED(this, hdfsListDirectory); + GET_SYMBOL_REQUIRED(this, hdfsChown); + GET_SYMBOL_REQUIRED(this, hdfsChmod); + + // File methods + GET_SYMBOL_REQUIRED(this, hdfsCloseFile); + GET_SYMBOL_REQUIRED(this, hdfsFlush); + GET_SYMBOL_REQUIRED(this, hdfsOpenFile); + GET_SYMBOL_REQUIRED(this, hdfsRead); + GET_SYMBOL_REQUIRED(this, hdfsSeek); + GET_SYMBOL_REQUIRED(this, hdfsTell); + GET_SYMBOL_REQUIRED(this, hdfsWrite); + + return ::arrow::Status::OK(); +} + +::arrow::Status ConnectLibHdfs(LibHdfsShim** driver) { + static std::mutex lock; + std::lock_guard guard(lock); + + LibHdfsShim* shim = &libhdfs_shim; + + static bool shim_attempted = false; + if (!shim_attempted) { + shim_attempted = true; + + shim->Initialize(); + + ARROW_ASSIGN_OR_RAISE( + auto libjvm_potential_paths, get_potential_libjvm_paths()); + ARROW_ASSIGN_OR_RAISE( + libjvm_handle, try_dlopen(libjvm_potential_paths, "libjvm")); + + ARROW_ASSIGN_OR_RAISE( + auto libhdfs_potential_paths, get_potential_libhdfs_paths()); + ARROW_ASSIGN_OR_RAISE( + shim->handle, try_dlopen(libhdfs_potential_paths, "libhdfs")); + } else if (shim->handle == nullptr) { + return ::arrow::Status::IOError("Prior attempt to load libhdfs failed"); + } + + *driver = shim; + return shim->GetRequiredSymbols(); +} + +/////////////////////////////////////////////////////////////////////////// +// HDFS thin wrapper methods + +hdfsBuilder* LibHdfsShim::NewBuilder(void) { + return this->hdfsNewBuilder(); +} + +void LibHdfsShim::BuilderSetNameNode(hdfsBuilder* bld, const char* nn) { + this->hdfsBuilderSetNameNode(bld, nn); +} + +void LibHdfsShim::BuilderSetNameNodePort(hdfsBuilder* bld, tPort port) { + this->hdfsBuilderSetNameNodePort(bld, port); +} + +void LibHdfsShim::BuilderSetUserName(hdfsBuilder* bld, const char* userName) { + this->hdfsBuilderSetUserName(bld, userName); +} + +void LibHdfsShim::BuilderSetKerbTicketCachePath( + hdfsBuilder* bld, + const char* kerbTicketCachePath) { + this->hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath); +} + +void LibHdfsShim::BuilderSetForceNewInstance(hdfsBuilder* bld) { + this->hdfsBuilderSetForceNewInstance(bld); +} + +hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) { + return this->hdfsBuilderConnect(bld); +} + +int LibHdfsShim::BuilderConfSetStr( + hdfsBuilder* bld, + const char* key, + const char* val) { + return this->hdfsBuilderConfSetStr(bld, key, val); +} + +int LibHdfsShim::Disconnect(hdfsFS fs) { + return this->hdfsDisconnect(fs); +} + +hdfsFile LibHdfsShim::OpenFile( + hdfsFS fs, + const char* path, + int flags, + int bufferSize, + short replication, + tSize blocksize) { // NOLINT + return this->hdfsOpenFile( + fs, path, flags, bufferSize, replication, blocksize); +} + +int LibHdfsShim::CloseFile(hdfsFS fs, hdfsFile file) { + return this->hdfsCloseFile(fs, file); +} + +int LibHdfsShim::Exists(hdfsFS fs, const char* path) { + return this->hdfsExists(fs, path); +} + +int LibHdfsShim::Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { + return this->hdfsSeek(fs, file, desiredPos); +} + +tOffset LibHdfsShim::Tell(hdfsFS fs, hdfsFile file) { + return this->hdfsTell(fs, file); +} + +tSize LibHdfsShim::Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { + return this->hdfsRead(fs, file, buffer, length); +} + +bool LibHdfsShim::HasPread() { + GET_SYMBOL(this, hdfsPread); + return this->hdfsPread != nullptr; +} + +tSize LibHdfsShim::Pread( + hdfsFS fs, + hdfsFile file, + tOffset position, + void* buffer, + tSize length) { + GET_SYMBOL(this, hdfsPread); + DCHECK(this->hdfsPread); + return this->hdfsPread(fs, file, position, buffer, length); +} + +tSize LibHdfsShim::Write( + hdfsFS fs, + hdfsFile file, + const void* buffer, + tSize length) { + return this->hdfsWrite(fs, file, buffer, length); +} + +int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) { + return this->hdfsFlush(fs, file); +} + +int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) { + GET_SYMBOL(this, hdfsAvailable); + if (this->hdfsAvailable) + return this->hdfsAvailable(fs, file); + else + return 0; +} + +int LibHdfsShim::Copy( + hdfsFS srcFS, + const char* src, + hdfsFS dstFS, + const char* dst) { + GET_SYMBOL(this, hdfsCopy); + if (this->hdfsCopy) + return this->hdfsCopy(srcFS, src, dstFS, dst); + else + return 0; +} + +int LibHdfsShim::Move( + hdfsFS srcFS, + const char* src, + hdfsFS dstFS, + const char* dst) { + GET_SYMBOL(this, hdfsMove); + if (this->hdfsMove) + return this->hdfsMove(srcFS, src, dstFS, dst); + else + return 0; +} + +int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) { + return this->hdfsDelete(fs, path, recursive); +} + +int LibHdfsShim::Rename(hdfsFS fs, const char* oldPath, const char* newPath) { + GET_SYMBOL(this, hdfsRename); + if (this->hdfsRename) + return this->hdfsRename(fs, oldPath, newPath); + else + return 0; +} + +char* LibHdfsShim::GetWorkingDirectory( + hdfsFS fs, + char* buffer, + size_t bufferSize) { + GET_SYMBOL(this, hdfsGetWorkingDirectory); + if (this->hdfsGetWorkingDirectory) { + return this->hdfsGetWorkingDirectory(fs, buffer, bufferSize); + } else { + return NULL; + } +} + +int LibHdfsShim::SetWorkingDirectory(hdfsFS fs, const char* path) { + GET_SYMBOL(this, hdfsSetWorkingDirectory); + if (this->hdfsSetWorkingDirectory) { + return this->hdfsSetWorkingDirectory(fs, path); + } else { + return 0; + } +} + +int LibHdfsShim::MakeDirectory(hdfsFS fs, const char* path) { + return this->hdfsCreateDirectory(fs, path); +} + +int LibHdfsShim::SetReplication( + hdfsFS fs, + const char* path, + int16_t replication) { + GET_SYMBOL(this, hdfsSetReplication); + if (this->hdfsSetReplication) { + return this->hdfsSetReplication(fs, path, replication); + } else { + return 0; + } +} + +hdfsFileInfo* +LibHdfsShim::ListDirectory(hdfsFS fs, const char* path, int* numEntries) { + return this->hdfsListDirectory(fs, path, numEntries); +} + +hdfsFileInfo* LibHdfsShim::GetPathInfo(hdfsFS fs, const char* path) { + return this->hdfsGetPathInfo(fs, path); +} + +void LibHdfsShim::FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) { + this->hdfsFreeFileInfo(hdfsFileInfo, numEntries); +} + +char*** LibHdfsShim::GetHosts( + hdfsFS fs, + const char* path, + tOffset start, + tOffset length) { + GET_SYMBOL(this, hdfsGetHosts); + if (this->hdfsGetHosts) { + return this->hdfsGetHosts(fs, path, start, length); + } else { + return NULL; + } +} + +void LibHdfsShim::FreeHosts(char*** blockHosts) { + GET_SYMBOL(this, hdfsFreeHosts); + if (this->hdfsFreeHosts) { + this->hdfsFreeHosts(blockHosts); + } +} + +tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS fs) { + GET_SYMBOL(this, hdfsGetDefaultBlockSize); + if (this->hdfsGetDefaultBlockSize) { + return this->hdfsGetDefaultBlockSize(fs); + } else { + return 0; + } +} + +tOffset LibHdfsShim::GetCapacity(hdfsFS fs) { + return this->hdfsGetCapacity(fs); +} + +tOffset LibHdfsShim::GetUsed(hdfsFS fs) { + return this->hdfsGetUsed(fs); +} + +int LibHdfsShim::Chown( + hdfsFS fs, + const char* path, + const char* owner, + const char* group) { + return this->hdfsChown(fs, path, owner, group); +} + +int LibHdfsShim::Chmod(hdfsFS fs, const char* path, short mode) { // NOLINT + return this->hdfsChmod(fs, path, mode); +} + +int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { + GET_SYMBOL(this, hdfsUtime); + if (this->hdfsUtime) { + return this->hdfsUtime(fs, path, mtime, atime); + } else { + return 0; + } +} + +char* LibHdfsShim::GetLastExceptionRootCause() { + GET_SYMBOL(this, hdfsGetLastExceptionRootCause); + if (this->hdfsGetLastExceptionRootCause) { + return this->hdfsGetLastExceptionRootCause(); + } else { + return strdup("GetLastExceptionRootCause return null"); + } +} + +} // namespace internal +} // namespace io +} // namespace facebook::velox::filesystems::arrow diff --git a/velox/external/hdfs/ArrowHdfsInternal.h b/velox/external/hdfs/ArrowHdfsInternal.h new file mode 100644 index 000000000000..1ee57516fd17 --- /dev/null +++ b/velox/external/hdfs/ArrowHdfsInternal.h @@ -0,0 +1,260 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Arrow. + +#pragma once + +#include +#include + +#include "hdfs.h" + +#include + +#include "arrow/util/visibility.h" +#include "arrow/util/windows_compatibility.h" // IWYU pragma: keep + +using std::size_t; + +struct hdfsBuilder; + +namespace facebook::velox::filesystems::arrow { + +namespace io { +namespace internal { + +#ifndef _WIN32 +typedef void* LibraryHandle; +#else +typedef HINSTANCE LibraryHandle; +#endif + +// NOTE(wesm): cpplint does not like use of short and other imprecise C types +struct LibHdfsShim { + LibraryHandle handle; + + hdfsBuilder* (*hdfsNewBuilder)(void); + void (*hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn); + void (*hdfsBuilderSetNameNodePort)(hdfsBuilder* bld, tPort port); + void (*hdfsBuilderSetUserName)(hdfsBuilder* bld, const char* userName); + void (*hdfsBuilderSetKerbTicketCachePath)( + hdfsBuilder* bld, + const char* kerbTicketCachePath); + void (*hdfsBuilderSetForceNewInstance)(hdfsBuilder* bld); + hdfsFS (*hdfsBuilderConnect)(hdfsBuilder* bld); + int (*hdfsBuilderConfSetStr)( + hdfsBuilder* bld, + const char* key, + const char* val); + + int (*hdfsDisconnect)(hdfsFS fs); + + hdfsFile (*hdfsOpenFile)( + hdfsFS fs, + const char* path, + int flags, + int bufferSize, + short replication, + tSize blocksize); // NOLINT + + int (*hdfsCloseFile)(hdfsFS fs, hdfsFile file); + int (*hdfsExists)(hdfsFS fs, const char* path); + int (*hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos); + tOffset (*hdfsTell)(hdfsFS fs, hdfsFile file); + tSize (*hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + tSize (*hdfsPread)( + hdfsFS fs, + hdfsFile file, + tOffset position, + void* buffer, + tSize length); + tSize ( + *hdfsWrite)(hdfsFS fs, hdfsFile file, const void* buffer, tSize length); + int (*hdfsFlush)(hdfsFS fs, hdfsFile file); + int (*hdfsAvailable)(hdfsFS fs, hdfsFile file); + int (*hdfsCopy)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + int (*hdfsMove)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + int (*hdfsDelete)(hdfsFS fs, const char* path, int recursive); + int (*hdfsRename)(hdfsFS fs, const char* oldPath, const char* newPath); + char* (*hdfsGetWorkingDirectory)(hdfsFS fs, char* buffer, size_t bufferSize); + int (*hdfsSetWorkingDirectory)(hdfsFS fs, const char* path); + int (*hdfsCreateDirectory)(hdfsFS fs, const char* path); + int (*hdfsSetReplication)(hdfsFS fs, const char* path, int16_t replication); + hdfsFileInfo* ( + *hdfsListDirectory)(hdfsFS fs, const char* path, int* numEntries); + hdfsFileInfo* (*hdfsGetPathInfo)(hdfsFS fs, const char* path); + void (*hdfsFreeFileInfo)(hdfsFileInfo* hdfsFileInfo, int numEntries); + char*** (*hdfsGetHosts)( + hdfsFS fs, + const char* path, + tOffset start, + tOffset length); + void (*hdfsFreeHosts)(char*** blockHosts); + tOffset (*hdfsGetDefaultBlockSize)(hdfsFS fs); + tOffset (*hdfsGetCapacity)(hdfsFS fs); + tOffset (*hdfsGetUsed)(hdfsFS fs); + int (*hdfsChown)( + hdfsFS fs, + const char* path, + const char* owner, + const char* group); + int (*hdfsChmod)(hdfsFS fs, const char* path, short mode); // NOLINT + int (*hdfsUtime)(hdfsFS fs, const char* path, tTime mtime, tTime atime); + char* (*hdfsGetLastExceptionStackTrace)(); + char* (*hdfsGetLastExceptionRootCause)(); + + void Initialize() { + this->handle = nullptr; + this->hdfsNewBuilder = nullptr; + this->hdfsBuilderSetNameNode = nullptr; + this->hdfsBuilderSetNameNodePort = nullptr; + this->hdfsBuilderSetUserName = nullptr; + this->hdfsBuilderSetKerbTicketCachePath = nullptr; + this->hdfsBuilderSetForceNewInstance = nullptr; + this->hdfsBuilderConfSetStr = nullptr; + this->hdfsBuilderConnect = nullptr; + this->hdfsDisconnect = nullptr; + this->hdfsOpenFile = nullptr; + this->hdfsCloseFile = nullptr; + this->hdfsExists = nullptr; + this->hdfsSeek = nullptr; + this->hdfsTell = nullptr; + this->hdfsRead = nullptr; + this->hdfsPread = nullptr; + this->hdfsWrite = nullptr; + this->hdfsFlush = nullptr; + this->hdfsAvailable = nullptr; + this->hdfsCopy = nullptr; + this->hdfsMove = nullptr; + this->hdfsDelete = nullptr; + this->hdfsRename = nullptr; + this->hdfsGetWorkingDirectory = nullptr; + this->hdfsSetWorkingDirectory = nullptr; + this->hdfsCreateDirectory = nullptr; + this->hdfsSetReplication = nullptr; + this->hdfsListDirectory = nullptr; + this->hdfsGetPathInfo = nullptr; + this->hdfsFreeFileInfo = nullptr; + this->hdfsGetHosts = nullptr; + this->hdfsFreeHosts = nullptr; + this->hdfsGetDefaultBlockSize = nullptr; + this->hdfsGetCapacity = nullptr; + this->hdfsGetUsed = nullptr; + this->hdfsChown = nullptr; + this->hdfsChmod = nullptr; + this->hdfsUtime = nullptr; + this->hdfsGetLastExceptionStackTrace = nullptr; + this->hdfsGetLastExceptionRootCause = nullptr; + } + + hdfsBuilder* NewBuilder(void); + + void BuilderSetNameNode(hdfsBuilder* bld, const char* nn); + + void BuilderSetNameNodePort(hdfsBuilder* bld, tPort port); + + void BuilderSetUserName(hdfsBuilder* bld, const char* userName); + + void BuilderSetKerbTicketCachePath( + hdfsBuilder* bld, + const char* kerbTicketCachePath); + + void BuilderSetForceNewInstance(hdfsBuilder* bld); + + int BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val); + + hdfsFS BuilderConnect(hdfsBuilder* bld); + + int Disconnect(hdfsFS fs); + + hdfsFile OpenFile( + hdfsFS fs, + const char* path, + int flags, + int bufferSize, + short replication, + tSize blocksize); // NOLINT + + int CloseFile(hdfsFS fs, hdfsFile file); + + int Exists(hdfsFS fs, const char* path); + + int Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos); + + tOffset Tell(hdfsFS fs, hdfsFile file); + + tSize Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + + bool HasPread(); + + tSize + Pread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length); + + tSize Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize length); + + int Flush(hdfsFS fs, hdfsFile file); + + int Available(hdfsFS fs, hdfsFile file); + + int Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + int Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + int Delete(hdfsFS fs, const char* path, int recursive); + + int Rename(hdfsFS fs, const char* oldPath, const char* newPath); + + char* GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize); + + int SetWorkingDirectory(hdfsFS fs, const char* path); + + int MakeDirectory(hdfsFS fs, const char* path); + + int SetReplication(hdfsFS fs, const char* path, int16_t replication); + + hdfsFileInfo* ListDirectory(hdfsFS fs, const char* path, int* numEntries); + + hdfsFileInfo* GetPathInfo(hdfsFS fs, const char* path); + + void FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries); + + char*** GetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length); + + void FreeHosts(char*** blockHosts); + + tOffset GetDefaultBlockSize(hdfsFS fs); + tOffset GetCapacity(hdfsFS fs); + + tOffset GetUsed(hdfsFS fs); + + int Chown(hdfsFS fs, const char* path, const char* owner, const char* group); + + int Chmod(hdfsFS fs, const char* path, short mode); // NOLINT + + int Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime); + + char* GetLastExceptionRootCause(); + + ::arrow::Status GetRequiredSymbols(); +}; + +// TODO(wesm): Remove these exports when we are linking statically +ARROW_EXPORT ::arrow::Status ConnectLibHdfs(LibHdfsShim** driver); + +} // namespace internal +} // namespace io +} // namespace facebook::velox::filesystems::arrow diff --git a/velox/external/hdfs/CMakeLists.txt b/velox/external/hdfs/CMakeLists.txt new file mode 100644 index 000000000000..a35f728224c7 --- /dev/null +++ b/velox/external/hdfs/CMakeLists.txt @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if(${VELOX_ENABLE_HDFS}) + velox_add_library(velox_external_hdfs ArrowHdfsInternal.cpp) + velox_link_libraries( + velox_external_hdfs + PRIVATE + arrow) +endif() diff --git a/velox/external/hdfs/hdfs.h b/velox/external/hdfs/hdfs.h new file mode 100644 index 000000000000..d9c3a058ec22 --- /dev/null +++ b/velox/external/hdfs/hdfs.h @@ -0,0 +1,1078 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Hadoop. + +#ifndef LIBHDFS_HDFS_H +#define LIBHDFS_HDFS_H + +#include /* for EINTERNAL, etc. */ +#include /* for O_RDONLY, O_WRONLY */ +#include /* for uint64_t, etc. */ +#include /* for time_t */ + +/* + * Support export of DLL symbols during libhdfs build, and import of DLL symbols + * during client application build. A client application may optionally define + * symbol LIBHDFS_DLL_IMPORT in its build. This is not strictly required, but + * the compiler can produce more efficient code with it. + */ +#ifdef WIN32 +#ifdef LIBHDFS_DLL_EXPORT +#define LIBHDFS_EXTERNAL __declspec(dllexport) +#elif LIBHDFS_DLL_IMPORT +#define LIBHDFS_EXTERNAL __declspec(dllimport) +#else +#define LIBHDFS_EXTERNAL +#endif +#else +#ifdef LIBHDFS_DLL_EXPORT +#define LIBHDFS_EXTERNAL __attribute__((visibility("default"))) +#elif LIBHDFS_DLL_IMPORT +#define LIBHDFS_EXTERNAL __attribute__((visibility("default"))) +#else +#define LIBHDFS_EXTERNAL +#endif +#endif + +#ifndef O_RDONLY +#define O_RDONLY 1 +#endif + +#ifndef O_WRONLY +#define O_WRONLY 2 +#endif + +#ifndef EINTERNAL +#define EINTERNAL 255 +#endif + +#define ELASTIC_BYTE_BUFFER_POOL_CLASS \ + "org/apache/hadoop/io/ElasticByteBufferPool" + +/** All APIs set errno to meaningful values */ + +#ifdef __cplusplus +extern "C" { +#endif +/** + * Some utility decls used in libhdfs. + */ +struct hdfsBuilder; +typedef int32_t tSize; /// size of data for read/write io ops +typedef time_t tTime; /// time type in seconds +typedef int64_t tOffset; /// offset within the file +typedef uint16_t tPort; /// port +typedef enum tObjectKind { + kObjectKindFile = 'F', + kObjectKindDirectory = 'D', +} tObjectKind; +struct hdfsStreamBuilder; + +/** + * The C reflection of org.apache.org.hadoop.FileSystem . + */ +struct hdfs_internal; +typedef struct hdfs_internal* hdfsFS; + +struct hdfsFile_internal; +typedef struct hdfsFile_internal* hdfsFile; + +struct hadoopRzOptions; + +struct hadoopRzBuffer; + +/** + * Determine if a file is open for read. + * + * @param file The HDFS file + * @return 1 if the file is open for read; 0 otherwise + */ +LIBHDFS_EXTERNAL +int hdfsFileIsOpenForRead(hdfsFile file); + +/** + * Determine if a file is open for write. + * + * @param file The HDFS file + * @return 1 if the file is open for write; 0 otherwise + */ +LIBHDFS_EXTERNAL +int hdfsFileIsOpenForWrite(hdfsFile file); + +struct hdfsReadStatistics { + uint64_t totalBytesRead; + uint64_t totalLocalBytesRead; + uint64_t totalShortCircuitBytesRead; + uint64_t totalZeroCopyBytesRead; +}; + +/** + * Get read statistics about a file. This is only applicable to files + * opened for reading. + * + * @param file The HDFS file + * @param stats (out parameter) on a successful return, the read + * statistics. Unchanged otherwise. You must free the + * returned statistics with hdfsFileFreeReadStatistics. + * @return 0 if the statistics were successfully returned, + * -1 otherwise. On a failure, please check errno against + * ENOTSUP. webhdfs, LocalFilesystem, and so forth may + * not support read statistics. + */ +LIBHDFS_EXTERNAL +int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics** stats); + +/** + * @param stats HDFS read statistics for a file. + * + * @return the number of remote bytes read. + */ +LIBHDFS_EXTERNAL +int64_t hdfsReadStatisticsGetRemoteBytesRead( + const struct hdfsReadStatistics* stats); + +/** + * Clear the read statistics for a file. + * + * @param file The file to clear the read statistics of. + * + * @return 0 on success; the error code otherwise. + * EINVAL: the file is not open for reading. + * ENOTSUP: the file does not support clearing the read + * statistics. + * Errno will also be set to this code on failure. + */ +LIBHDFS_EXTERNAL +int hdfsFileClearReadStatistics(hdfsFile file); + +/** + * Free some HDFS read statistics. + * + * @param stats The HDFS read statistics to free. + */ +LIBHDFS_EXTERNAL +void hdfsFileFreeReadStatistics(struct hdfsReadStatistics* stats); + +struct hdfsHedgedReadMetrics { + uint64_t hedgedReadOps; + uint64_t hedgedReadOpsWin; + uint64_t hedgedReadOpsInCurThread; +}; + +/** + * Get cluster wide hedged read metrics. + * + * @param fs The configured filesystem handle + * @param metrics (out parameter) on a successful return, the hedged read + * metrics. Unchanged otherwise. You must free the returned + * statistics with hdfsFreeHedgedReadMetrics. + * @return 0 if the metrics were successfully returned, -1 otherwise. + * On a failure, please check errno against + * ENOTSUP. webhdfs, LocalFilesystem, and so forth may + * not support hedged read metrics. + */ +LIBHDFS_EXTERNAL +int hdfsGetHedgedReadMetrics(hdfsFS fs, struct hdfsHedgedReadMetrics** metrics); + +/** + * Free HDFS Hedged read metrics. + * + * @param metrics The HDFS Hedged read metrics to free + */ +LIBHDFS_EXTERNAL +void hdfsFreeHedgedReadMetrics(struct hdfsHedgedReadMetrics* metrics); + +/** + * hdfsConnectAsUser - Connect to a hdfs file system as a specific user + * Connect to the hdfs. + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @param user the user name (this is hadoop domain user). Or NULL is equivelant + * to hhdfsConnect(host, port) + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char* user); + +/** + * hdfsConnect - Connect to a hdfs file system. + * Connect to the hdfs. + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsConnect(const char* nn, tPort port); + +/** + * hdfsConnect - Connect to an hdfs file system. + * + * Forces a new instance to be created + * + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @param user The user name to use when connecting + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS +hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char* user); + +/** + * hdfsConnect - Connect to an hdfs file system. + * + * Forces a new instance to be created + * + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsConnectNewInstance(const char* nn, tPort port); + +/** + * Connect to HDFS using the parameters defined by the builder. + * + * The HDFS builder will be freed, whether or not the connection was + * successful. + * + * Every successful call to hdfsBuilderConnect should be matched with a call + * to hdfsDisconnect, when the hdfsFS is no longer needed. + * + * @param bld The HDFS builder + * @return Returns a handle to the filesystem, or NULL on error. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsBuilderConnect(struct hdfsBuilder* bld); + +/** + * Create an HDFS builder. + * + * @return The HDFS builder, or NULL on error. + */ +LIBHDFS_EXTERNAL +struct hdfsBuilder* hdfsNewBuilder(void); + +/** + * Force the builder to always create a new instance of the FileSystem, + * rather than possibly finding one in the cache. + * + * @param bld The HDFS builder + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetForceNewInstance(struct hdfsBuilder* bld); + +/** + * Set the HDFS NameNode to connect to. + * + * @param bld The HDFS builder + * @param nn The NameNode to use. + * + * If the string given is 'default', the default NameNode + * configuration will be used (from the XML configuration files) + * + * If NULL is given, a LocalFileSystem will be created. + * + * If the string starts with a protocol type such as file:// or + * hdfs://, this protocol type will be used. If not, the + * hdfs:// protocol type will be used. + * + * You may specify a NameNode port in the usual way by + * passing a string of the format hdfs://:. + * Alternately, you may set the port with + * hdfsBuilderSetNameNodePort. However, you must not pass the + * port in two different ways. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetNameNode(struct hdfsBuilder* bld, const char* nn); + +/** + * Set the port of the HDFS NameNode to connect to. + * + * @param bld The HDFS builder + * @param port The port. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetNameNodePort(struct hdfsBuilder* bld, tPort port); + +/** + * Set the username to use when connecting to the HDFS cluster. + * + * @param bld The HDFS builder + * @param userName The user name. The string will be shallow-copied. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetUserName(struct hdfsBuilder* bld, const char* userName); + +/** + * Set the path to the Kerberos ticket cache to use when connecting to + * the HDFS cluster. + * + * @param bld The HDFS builder + * @param kerbTicketCachePath The Kerberos ticket cache path. The string + * will be shallow-copied. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetKerbTicketCachePath( + struct hdfsBuilder* bld, + const char* kerbTicketCachePath); + +/** + * Free an HDFS builder. + * + * It is normally not necessary to call this function since + * hdfsBuilderConnect frees the builder. + * + * @param bld The HDFS builder + */ +LIBHDFS_EXTERNAL +void hdfsFreeBuilder(struct hdfsBuilder* bld); + +/** + * Set a configuration string for an HdfsBuilder. + * + * @param key The key to set. + * @param val The value, or NULL to set no value. + * This will be shallow-copied. You are responsible for + * ensuring that it remains valid until the builder is + * freed. + * + * @return 0 on success; nonzero error code otherwise. + */ +LIBHDFS_EXTERNAL +int hdfsBuilderConfSetStr( + struct hdfsBuilder* bld, + const char* key, + const char* val); + +/** + * Get a configuration string. + * + * @param key The key to find + * @param val (out param) The value. This will be set to NULL if the + * key isn't found. You must free this string with + * hdfsConfStrFree. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. + */ +LIBHDFS_EXTERNAL +int hdfsConfGetStr(const char* key, char** val); + +/** + * Get a configuration integer. + * + * @param key The key to find + * @param val (out param) The value. This will NOT be changed if the + * key isn't found. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. + */ +LIBHDFS_EXTERNAL +int hdfsConfGetInt(const char* key, int32_t* val); + +/** + * Free a configuration string found with hdfsConfGetStr. + * + * @param val A configuration string obtained from hdfsConfGetStr + */ +LIBHDFS_EXTERNAL +void hdfsConfStrFree(char* val); + +/** + * hdfsDisconnect - Disconnect from the hdfs file system. + * Disconnect from hdfs. + * @param fs The configured filesystem handle. + * @return Returns 0 on success, -1 on error. + * Even if there is an error, the resources associated with the + * hdfsFS will be freed. + */ +LIBHDFS_EXTERNAL +int hdfsDisconnect(hdfsFS fs); + +/** + * hdfsOpenFile - Open a hdfs file in given mode. + * @deprecated Use the hdfsStreamBuilder functions instead. + * This function does not support setting block sizes bigger than 2 GB. + * + * @param fs The configured filesystem handle. + * @param path The full path to the file. + * @param flags - an | of bits/fcntl.h file flags - supported flags are + * O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT), + * O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR || + * (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP. + * @param bufferSize Size of buffer for read/write - pass 0 if you want + * to use the default configured values. + * @param replication Block replication - pass 0 if you want to use + * the default configured values. + * @param blocksize Size of block - pass 0 if you want to use the + * default configured values. Note that if you want a block size bigger + * than 2 GB, you must use the hdfsStreamBuilder API rather than this + * deprecated function. + * @return Returns the handle to the open file or NULL on error. + */ +LIBHDFS_EXTERNAL +hdfsFile hdfsOpenFile( + hdfsFS fs, + const char* path, + int flags, + int bufferSize, + short replication, + tSize blocksize); + +/** + * hdfsStreamBuilderAlloc - Allocate an HDFS stream builder. + * + * @param fs The configured filesystem handle. + * @param path The full path to the file. Will be deep-copied. + * @param flags The open flags, as in hdfsOpenFile. + * @return Returns the hdfsStreamBuilder, or NULL on error. + */ +LIBHDFS_EXTERNAL +struct hdfsStreamBuilder* +hdfsStreamBuilderAlloc(hdfsFS fs, const char* path, int flags); + +/** + * hdfsStreamBuilderFree - Free an HDFS file builder. + * + * It is normally not necessary to call this function since + * hdfsStreamBuilderBuild frees the builder. + * + * @param bld The hdfsStreamBuilder to free. + */ +LIBHDFS_EXTERNAL +void hdfsStreamBuilderFree(struct hdfsStreamBuilder* bld); + +/** + * hdfsStreamBuilderSetBufferSize - Set the stream buffer size. + * + * @param bld The hdfs stream builder. + * @param bufferSize The buffer size to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + */ +LIBHDFS_EXTERNAL +int hdfsStreamBuilderSetBufferSize( + struct hdfsStreamBuilder* bld, + int32_t bufferSize); + +/** + * hdfsStreamBuilderSetReplication - Set the replication for the stream. + * This is only relevant for output streams, which will create new blocks. + * + * @param bld The hdfs stream builder. + * @param replication The replication to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + * If you call this on an input stream builder, you will get + * EINVAL, because this configuration is not relevant to input + * streams. + */ +LIBHDFS_EXTERNAL +int hdfsStreamBuilderSetReplication( + struct hdfsStreamBuilder* bld, + int16_t replication); + +/** + * hdfsStreamBuilderSetDefaultBlockSize - Set the default block size for + * the stream. This is only relevant for output streams, which will create + * new blocks. + * + * @param bld The hdfs stream builder. + * @param defaultBlockSize The default block size to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + * If you call this on an input stream builder, you will get + * EINVAL, because this configuration is not relevant to input + * streams. + */ +LIBHDFS_EXTERNAL +int hdfsStreamBuilderSetDefaultBlockSize( + struct hdfsStreamBuilder* bld, + int64_t defaultBlockSize); + +/** + * hdfsStreamBuilderBuild - Build the stream by calling open or create. + * + * @param bld The hdfs stream builder. This pointer will be freed, whether + * or not the open succeeds. + * + * @return the stream pointer on success, or NULL on error. Errno will be + * set on error. + */ +LIBHDFS_EXTERNAL +hdfsFile hdfsStreamBuilderBuild(struct hdfsStreamBuilder* bld); + +/** + * hdfsTruncateFile - Truncate a hdfs file to given lenght. + * @param fs The configured filesystem handle. + * @param path The full path to the file. + * @param newlength The size the file is to be truncated to + * @return 1 if the file has been truncated to the desired newlength + * and is immediately available to be reused for write operations + * such as append. + * 0 if a background process of adjusting the length of the last + * block has been started, and clients should wait for it to + * complete before proceeding with further file updates. + * -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength); + +/** + * hdfsUnbufferFile - Reduce the buffering done on a file. + * + * @param file The file to unbuffer. + * @return 0 on success + * ENOTSUP if the file does not support unbuffering + * Errno will also be set to this value. + */ +LIBHDFS_EXTERNAL +int hdfsUnbufferFile(hdfsFile file); + +/** + * hdfsCloseFile - Close an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + * On error, errno will be set appropriately. + * If the hdfs file was valid, the memory associated with it will + * be freed at the end of this call, even if there was an I/O + * error. + */ +LIBHDFS_EXTERNAL +int hdfsCloseFile(hdfsFS fs, hdfsFile file); + +/** + * hdfsExists - Checks if a given path exsits on the filesystem + * @param fs The configured filesystem handle. + * @param path The path to look for + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsExists(hdfsFS fs, const char* path); + +/** + * hdfsSeek - Seek to given offset in file. + * This works only for files opened in read-only mode. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param desiredPos Offset into the file to seek into. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos); + +/** + * hdfsTell - Get the current offset in the file, in bytes. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Current offset, -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsTell(hdfsFS fs, hdfsFile file); + +/** + * hdfsRead - Read data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return On success, a positive number indicating how many bytes + * were read. + * On end-of-file, 0. + * On error, -1. Errno will be set to the error code. + * Just like the POSIX read function, hdfsRead will return -1 + * and set errno to EINTR if data is temporarily unavailable, + * but we are not yet at the end of the file. + */ +LIBHDFS_EXTERNAL +tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + +/** + * hdfsPread - Positional read of data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param position Position from which to read + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return See hdfsRead + */ +LIBHDFS_EXTERNAL +tSize hdfsPread( + hdfsFS fs, + hdfsFile file, + tOffset position, + void* buffer, + tSize length); + +/** + * hdfsWrite - Write data into an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The data. + * @param length The no. of bytes to write. + * @return Returns the number of bytes written, -1 on error. + */ +LIBHDFS_EXTERNAL +tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length); + +/** + * hdfsWrite - Flush the data. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsFlush(hdfsFS fs, hdfsFile file); + +/** + * hdfsHFlush - Flush out the data in client's user buffer. After the + * return of this call, new readers will see the data. + * @param fs configured filesystem handle + * @param file file handle + * @return 0 on success, -1 on error and sets errno + */ +LIBHDFS_EXTERNAL +int hdfsHFlush(hdfsFS fs, hdfsFile file); + +/** + * hdfsHSync - Similar to posix fsync, Flush out the data in client's + * user buffer. all the way to the disk device (but the disk may have + * it in its cache). + * @param fs configured filesystem handle + * @param file file handle + * @return 0 on success, -1 on error and sets errno + */ +LIBHDFS_EXTERNAL +int hdfsHSync(hdfsFS fs, hdfsFile file); + +/** + * hdfsAvailable - Number of bytes that can be read from this + * input stream without blocking. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns available bytes; -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsAvailable(hdfsFS fs, hdfsFile file); + +/** + * hdfsCopy - Copy file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + +/** + * hdfsMove - Move file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + +/** + * hdfsDelete - Delete file. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @param recursive if path is a directory and set to + * non-zero, the directory is deleted else throws an exception. In + * case of a file the recursive argument is irrelevant. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsDelete(hdfsFS fs, const char* path, int recursive); + +/** + * hdfsRename - Rename file. + * @param fs The configured filesystem handle. + * @param oldPath The path of the source file. + * @param newPath The path of the destination file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath); + +/** + * hdfsGetWorkingDirectory - Get the current working directory for + * the given filesystem. + * @param fs The configured filesystem handle. + * @param buffer The user-buffer to copy path of cwd into. + * @param bufferSize The length of user-buffer. + * @return Returns buffer, NULL on error. + */ +LIBHDFS_EXTERNAL +char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize); + +/** + * hdfsSetWorkingDirectory - Set the working directory. All relative + * paths will be resolved relative to it. + * @param fs The configured filesystem handle. + * @param path The path of the new 'cwd'. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsSetWorkingDirectory(hdfsFS fs, const char* path); + +/** + * hdfsCreateDirectory - Make the given file and all non-existent + * parents into directories. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsCreateDirectory(hdfsFS fs, const char* path); + +/** + * hdfsSetReplication - Set the replication of the specified + * file to the supplied value + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication); + +/** + * hdfsFileInfo - Information about a file/directory. + */ +typedef struct { + tObjectKind mKind; /* file or directory */ + char* mName; /* the name of the file */ + tTime mLastMod; /* the last modification time for the file in seconds */ + tOffset mSize; /* the size of the file in bytes */ + short mReplication; /* the count of replicas */ + tOffset mBlockSize; /* the block size for the file */ + char* mOwner; /* the owner of the file */ + char* mGroup; /* the group associated with the file */ + short mPermissions; /* the permissions associated with the file */ + tTime mLastAccess; /* the last access time for the file in seconds */ +} hdfsFileInfo; + +/** + * hdfsListDirectory - Get list of files/directories for a given + * directory-path. hdfsFreeFileInfo should be called to deallocate memory. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @param numEntries Set to the number of files/directories in path. + * @return Returns a dynamically-allocated array of hdfsFileInfo + * objects; NULL on error or empty directory. + * errno is set to non-zero on error or zero on success. + */ +LIBHDFS_EXTERNAL +hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int* numEntries); + +/** + * hdfsGetPathInfo - Get information about a path as a (dynamically + * allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be + * called when the pointer is no longer needed. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns a dynamically-allocated hdfsFileInfo object; + * NULL on error. + */ +LIBHDFS_EXTERNAL +hdfsFileInfo* hdfsGetPathInfo(hdfsFS fs, const char* path); + +/** + * hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields) + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ +LIBHDFS_EXTERNAL +void hdfsFreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries); + +/** + * hdfsFileIsEncrypted: determine if a file is encrypted based on its + * hdfsFileInfo. + * @return -1 if there was an error (errno will be set), 0 if the file is + * not encrypted, 1 if the file is encrypted. + */ +LIBHDFS_EXTERNAL +int hdfsFileIsEncrypted(hdfsFileInfo* hdfsFileInfo); + +/** + * hdfsGetHosts - Get hostnames where a particular block (determined by + * pos & blocksize) of a file is stored. The last element in the array + * is NULL. Due to replication, a single block could be present on + * multiple hosts. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @param start The start of the block. + * @param length The length of the block. + * @return Returns a dynamically-allocated 2-d array of blocks-hosts; + * NULL on error. + */ +LIBHDFS_EXTERNAL +char*** +hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length); + +/** + * hdfsFreeHosts - Free up the structure returned by hdfsGetHosts + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ +LIBHDFS_EXTERNAL +void hdfsFreeHosts(char*** blockHosts); + +/** + * hdfsGetDefaultBlockSize - Get the default blocksize. + * + * @param fs The configured filesystem handle. + * @deprecated Use hdfsGetDefaultBlockSizeAtPath instead. + * + * @return Returns the default blocksize, or -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetDefaultBlockSize(hdfsFS fs); + +/** + * hdfsGetDefaultBlockSizeAtPath - Get the default blocksize at the + * filesystem indicated by a given path. + * + * @param fs The configured filesystem handle. + * @param path The given path will be used to locate the actual + * filesystem. The full path does not have to exist. + * + * @return Returns the default blocksize, or -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char* path); + +/** + * hdfsGetCapacity - Return the raw capacity of the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the raw-capacity; -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetCapacity(hdfsFS fs); + +/** + * hdfsGetUsed - Return the total raw size of all files in the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the total-size; -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetUsed(hdfsFS fs); + +/** + * Change the user and/or group of a file or directory. + * + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param owner User string. Set to NULL for 'no change' + * @param group Group string. Set to NULL for 'no change' + * @return 0 on success else -1 + */ +LIBHDFS_EXTERNAL +int hdfsChown( + hdfsFS fs, + const char* path, + const char* owner, + const char* group); + +/** + * hdfsChmod + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mode the bitmask to set it to + * @return 0 on success else -1 + */ +LIBHDFS_EXTERNAL +int hdfsChmod(hdfsFS fs, const char* path, short mode); + +/** + * hdfsUtime + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mtime new modification time or -1 for no change + * @param atime new access time or -1 for no change + * @return 0 on success else -1 + */ +LIBHDFS_EXTERNAL +int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime); + +/** + * Allocate a zero-copy options structure. + * + * You must free all options structures allocated with this function using + * hadoopRzOptionsFree. + * + * @return A zero-copy options structure, or NULL if one could + * not be allocated. If NULL is returned, errno will + * contain the error number. + */ +LIBHDFS_EXTERNAL +struct hadoopRzOptions* hadoopRzOptionsAlloc(void); + +/** + * Determine whether we should skip checksums in read0. + * + * @param opts The options structure. + * @param skip Nonzero to skip checksums sometimes; zero to always + * check them. + * + * @return 0 on success; -1 plus errno on failure. + */ +LIBHDFS_EXTERNAL +int hadoopRzOptionsSetSkipChecksum(struct hadoopRzOptions* opts, int skip); + +/** + * Set the ByteBufferPool to use with read0. + * + * @param opts The options structure. + * @param className If this is NULL, we will not use any + * ByteBufferPool. If this is non-NULL, it will be + * treated as the name of the pool class to use. + * For example, you can use + * ELASTIC_BYTE_BUFFER_POOL_CLASS. + * + * @return 0 if the ByteBufferPool class was found and + * instantiated; + * -1 plus errno otherwise. + */ +LIBHDFS_EXTERNAL +int hadoopRzOptionsSetByteBufferPool( + struct hadoopRzOptions* opts, + const char* className); + +/** + * Free a hadoopRzOptionsFree structure. + * + * @param opts The options structure to free. + * Any associated ByteBufferPool will also be freed. + */ +LIBHDFS_EXTERNAL +void hadoopRzOptionsFree(struct hadoopRzOptions* opts); + +/** + * Perform a byte buffer read. + * If possible, this will be a zero-copy (mmap) read. + * + * @param file The file to read from. + * @param opts An options structure created by hadoopRzOptionsAlloc. + * @param maxLength The maximum length to read. We may read fewer bytes + * than this length. + * + * @return On success, we will return a new hadoopRzBuffer. + * This buffer will continue to be valid and readable + * until it is released by readZeroBufferFree. Failure to + * release a buffer will lead to a memory leak. + * You can access the data within the hadoopRzBuffer with + * hadoopRzBufferGet. If you have reached EOF, the data + * within the hadoopRzBuffer will be NULL. You must still + * free hadoopRzBuffer instances containing NULL. + * + * On failure, we will return NULL plus an errno code. + * errno = EOPNOTSUPP indicates that we could not do a + * zero-copy read, and there was no ByteBufferPool + * supplied. + */ +LIBHDFS_EXTERNAL +struct hadoopRzBuffer* +hadoopReadZero(hdfsFile file, struct hadoopRzOptions* opts, int32_t maxLength); + +/** + * Determine the length of the buffer returned from readZero. + * + * @param buffer a buffer returned from readZero. + * @return the length of the buffer. + */ +LIBHDFS_EXTERNAL +int32_t hadoopRzBufferLength(const struct hadoopRzBuffer* buffer); + +/** + * Get a pointer to the raw buffer returned from readZero. + * + * To find out how many bytes this buffer contains, call + * hadoopRzBufferLength. + * + * @param buffer a buffer returned from readZero. + * @return a pointer to the start of the buffer. This will be + * NULL when end-of-file has been reached. + */ +LIBHDFS_EXTERNAL +const void* hadoopRzBufferGet(const struct hadoopRzBuffer* buffer); + +/** + * Release a buffer obtained through readZero. + * + * @param file The hdfs stream that created this buffer. This must be + * the same stream you called hadoopReadZero on. + * @param buffer The buffer to release. + */ +LIBHDFS_EXTERNAL +void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer* buffer); + +/** + * Get the last exception root cause that happened in the context of the + * current thread, i.e. the thread that called into libHDFS. + * + * The pointer returned by this function is guaranteed to be valid until + * the next call into libHDFS by the current thread. + * Users of this function should not free the pointer. + * + * A NULL will be returned if no exception information could be retrieved + * for the previous call. + * + * @return The root cause as a C-string. + */ +LIBHDFS_EXTERNAL +char* hdfsGetLastExceptionRootCause(); + +/** + * Get the last exception stack trace that happened in the context of the + * current thread, i.e. the thread that called into libHDFS. + * + * The pointer returned by this function is guaranteed to be valid until + * the next call into libHDFS by the current thread. + * Users of this function should not free the pointer. + * + * A NULL will be returned if no exception information could be retrieved + * for the previous call. + * + * @return The stack trace as a C-string. + */ +LIBHDFS_EXTERNAL +char* hdfsGetLastExceptionStackTrace(); + +#ifdef __cplusplus +} +#endif + +#undef LIBHDFS_EXTERNAL +#endif /*LIBHDFS_HDFS_H*/ + +/** + * vim: ts=4: sw=4: et + */