Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orc hdfs #1674

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,9 @@ http_archive(
http_archive(
name = "liborc",
build_file = "//third_party:liborc.BUILD",
patch_cmds = [
"tar -xzf c++/libs/libhdfspp/libhdfspp.tar.gz -C c++/libs/libhdfspp",
patch_args = ["-p1"],
patches = [
"//third_party:liborc.patch",
],
sha256 = "39d983f4c7feb8ea1e8ab8e3e53e9afc643282b7a500b3a93c91aa6490f65c17",
strip_prefix = "orc-rel-release-1.6.14",
Expand Down
23 changes: 3 additions & 20 deletions third_party/liborc.BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ cc_library(
],
copts = [],
defines = [],
local_defines = ["BUILD_LIBHDFSPP"],
includes = [
"c++/include",
"c++/src",
Expand All @@ -49,34 +50,16 @@ cc_library(
linkopts = [],
visibility = ["//visibility:public"],
deps = [
":libhdfspp",
":orc_cc_proto",
"@local_config_tf//:libtensorflow_framework",
"@local_config_tf//:tf_header_lib",
"@lz4",
"@snappy",
"@zlib",
"@zstd",
],
)

cc_library(
name = "libhdfspp",
srcs = glob(
[
"c++/libs/libhdfspp/include/hdfspp/*.h",
],
exclude = [
],
),
hdrs = [
],
copts = [],
defines = [],
includes = [
"c++/libs/libhdfspp/include",
],
deps = [],
)

proto_library(
name = "orc_proto",
srcs = ["proto/orc_proto.proto"],
Expand Down
195 changes: 195 additions & 0 deletions third_party/liborc.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
--- a/c++/src/OrcHdfsFile.cc 2022-04-11 04:30:41.000000000 +0800
+++ b/c++/src/OrcHdfsFile.cc 2022-04-11 19:56:37.206680217 +0800
@@ -1,4 +1,5 @@
/**
+ * 1
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -29,145 +30,57 @@
#include <sys/types.h>
#include <unistd.h>

-#include "hdfspp/hdfspp.h"
+#include "tensorflow/core/platform/env.h"
+#include "tensorflow/core/platform/file_system.h"
+#include "tensorflow/core/platform/logging.h"
+#include "tensorflow/core/platform/status.h"
+#include "tensorflow/core/platform/types.h"

namespace orc {

- class HdfsFileInputStream : public InputStream {
- private:
- std::string filename;
- std::unique_ptr<hdfs::FileHandle> file;
- std::unique_ptr<hdfs::FileSystem> file_system;
- uint64_t totalLength;
- const uint64_t READ_SIZE = 1024 * 1024; //1 MB
-
- public:
- HdfsFileInputStream(std::string _filename) {
- filename = _filename ;
-
- //Building a URI object from the given uri_path
- hdfs::URI uri;
- try {
- uri = hdfs::URI::parse_from_string(filename);
- } catch (const hdfs::uri_parse_error&) {
- throw ParseError("Malformed URI: " + filename);
- }
-
- //This sets conf path to default "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
- //and loads configs core-site.xml and hdfs-site.xml from the conf path
- hdfs::ConfigParser parser;
- if(!parser.LoadDefaultResources()){
- throw ParseError("Could not load default resources. ");
- }
- auto stats = parser.ValidateResources();
- //validating core-site.xml
- if(!stats[0].second.ok()){
- throw ParseError(stats[0].first + " is invalid: " + stats[0].second.ToString());
- }
- //validating hdfs-site.xml
- if(!stats[1].second.ok()){
- throw ParseError(stats[1].first + " is invalid: " + stats[1].second.ToString());
- }
- hdfs::Options options;
- if(!parser.get_options(options)){
- throw ParseError("Could not load Options object. ");
- }
- hdfs::IoService * io_service = hdfs::IoService::New();
- //Wrapping file_system into a unique pointer to guarantee deletion
- file_system = std::unique_ptr<hdfs::FileSystem>(
- hdfs::FileSystem::New(io_service, "", options));
- if (file_system.get() == nullptr) {
- throw ParseError("Can't create FileSystem object. ");
- }
- hdfs::Status status;
- //Checking if the user supplied the host
- if(!uri.get_host().empty()){
- //Using port if supplied, otherwise using "" to look up port in configs
- std::string port = uri.has_port() ?
- std::to_string(uri.get_port()) : "";
- status = file_system->Connect(uri.get_host(), port);
- if (!status.ok()) {
- throw ParseError("Can't connect to " + uri.get_host()
- + ":" + port + ". " + status.ToString());
- }
- } else {
- status = file_system->ConnectToDefaultFs();
- if (!status.ok()) {
- if(!options.defaultFS.get_host().empty()){
- throw ParseError("Error connecting to " +
- options.defaultFS.str() + ". " + status.ToString());
- } else {
- throw ParseError(
- "Error connecting to the cluster: defaultFS is empty. "
- + status.ToString());
- }
- }
- }
-
- if (file_system.get() == nullptr) {
- throw ParseError("Can't connect the file system. ");
- }
-
- hdfs::FileHandle *file_raw = nullptr;
- status = file_system->Open(uri.get_path(), &file_raw);
- if (!status.ok()) {
- throw ParseError("Can't open "
- + uri.get_path() + ". " + status.ToString());
- }
- //Wrapping file_raw into a unique pointer to guarantee deletion
- file.reset(file_raw);
-
- hdfs::StatInfo stat_info;
- status = file_system->GetFileInfo(uri.get_path(), stat_info);
- if (!status.ok()) {
- throw ParseError("Can't stat "
- + uri.get_path() + ". " + status.ToString());
- }
- totalLength = stat_info.length;
+class HdfsFileInputStream : public InputStream {
+ private:
+ std::string filename_;
+ std::unique_ptr<tensorflow::RandomAccessFile> file_;
+ uint64_t total_length_;
+ const uint64_t READ_SIZE = 1024 * 1024; // 1 MB
+
+ public:
+ HdfsFileInputStream(std::string filename) {
+ filename_ = filename;
+ tensorflow::Status status =
+ tensorflow::Env::Default()->NewRandomAccessFile(filename_, &file_);
+ if (!status.ok()) {
+ LOG(FATAL) << status.ToString();
}

- uint64_t getLength() const override {
- return totalLength;
- }
+ tensorflow::Env::Default()->GetFileSize(filename_, &total_length_);
+ }

- uint64_t getNaturalReadSize() const override {
- return READ_SIZE;
- }
+ uint64_t getLength() const override { return total_length_; }

- void read(void* buf,
- uint64_t length,
- uint64_t offset) override {
-
- if (!buf) {
- throw ParseError("Buffer is null");
- }
-
- hdfs::Status status;
- size_t total_bytes_read = 0;
- size_t last_bytes_read = 0;
-
- do {
- status = file->PositionRead(buf,
- static_cast<size_t>(length) - total_bytes_read,
- static_cast<off_t>(offset + total_bytes_read), &last_bytes_read);
- if(!status.ok()) {
- throw ParseError("Error reading the file: " + status.ToString());
- }
- total_bytes_read += last_bytes_read;
- } while (total_bytes_read < length);
- }
+ uint64_t getNaturalReadSize() const override { return READ_SIZE; }

- const std::string& getName() const override {
- return filename;
+ void read(void* buf, uint64_t length, uint64_t offset) override {
+ if (!buf) {
+ LOG(FATAL) << " Null buf";
+ }
+ tensorflow::StringPiece sp;
+ tensorflow::Status s =
+ file_->Read(offset, length, &sp, static_cast<char*>(buf));
+ if (!(s.ok() || tensorflow::errors::IsOutOfRange(s))) {
+ LOG(FATAL) << s.ToString();
}
+ }

- ~HdfsFileInputStream() override;
- };
+ const std::string& getName() const override { return filename_; }

- HdfsFileInputStream::~HdfsFileInputStream() {
- }
+ ~HdfsFileInputStream() override;
+};

- std::unique_ptr<InputStream> readHdfsFile(const std::string& path) {
- return std::unique_ptr<InputStream>(new HdfsFileInputStream(path));
- }
+HdfsFileInputStream::~HdfsFileInputStream() {}
+
+std::unique_ptr<InputStream> readHdfsFile(const std::string& path) {
+ return std::unique_ptr<InputStream>(new HdfsFileInputStream(path));
}
+} // namespace orc