diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index fc91c4c9dd90f7..79d06a3fe74022 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -97,7 +97,7 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env, case TFileType::FILE_HDFS: { THdfsParams hdfs_params = parse_properties(properties); std::shared_ptr fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); RETURN_IF_ERROR(fs->create_file(path, &file_writer)); break; } @@ -129,7 +129,7 @@ Status FileFactory::create_file_reader(RuntimeProfile* profile, } case TFileType::FILE_HDFS: { RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description.path, - file_system, file_reader, reader_options)); + file_system, file_reader, reader_options, profile)); break; } case TFileType::FILE_BROKER: { @@ -169,9 +169,10 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, std::shared_ptr* hdfs_file_system, io::FileReaderSPtr* reader, - const io::FileReaderOptions& reader_options) { + const io::FileReaderOptions& reader_options, + RuntimeProfile* profile) { std::shared_ptr fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", profile, &fs)); RETURN_IF_ERROR(fs->open_file(path, reader_options, reader)); *hdfs_file_system = std::move(fs); return Status::OK(); diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 5f7360c372b5ac..7a83abe5fabc42 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -82,7 +82,8 @@ class FileFactory { static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, std::shared_ptr* hdfs_file_system, io::FileReaderSPtr* reader, - const io::FileReaderOptions& reader_options); + const io::FileReaderOptions& reader_options, + RuntimeProfile* profile); static Status create_s3_reader(const std::map& prop, const std::string& path, diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp b/be/src/io/fs/benchmark/hdfs_benchmark.hpp index b508e14a24aa30..d6ad059b77ddc7 100644 --- a/be/src/io/fs/benchmark/hdfs_benchmark.hpp +++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp @@ -50,8 +50,8 @@ class HdfsOpenReadBenchmark : public BaseBenchmark { io::FileReaderSPtr reader; io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); THdfsParams hdfs_params = parse_properties(_conf_map); - RETURN_IF_ERROR( - FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts)); + RETURN_IF_ERROR(FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, + reader_opts, nullptr)); auto end = std::chrono::high_resolution_clock::now(); auto elapsed_seconds = std::chrono::duration_cast>(end - start); @@ -94,7 +94,7 @@ class HdfsCreateWriteBenchmark : public BaseBenchmark { std::shared_ptr fs; io::FileWriterPtr writer; THdfsParams hdfs_params = parse_properties(_conf_map); - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); RETURN_IF_ERROR(fs->create_file(file_path, &writer)); return write(state, writer.get()); } @@ -115,7 +115,7 @@ class HdfsRenameBenchmark : public BaseBenchmark { auto new_file_path = file_path + "_new"; THdfsParams hdfs_params = parse_properties(_conf_map); std::shared_ptr fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); auto start = std::chrono::high_resolution_clock::now(); RETURN_IF_ERROR(fs->rename(file_path, new_file_path)); @@ -142,7 +142,7 @@ class HdfsExistsBenchmark : public BaseBenchmark { std::shared_ptr fs; THdfsParams hdfs_params = parse_properties(_conf_map); - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); auto start = std::chrono::high_resolution_clock::now(); bool res = false; diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 402cdb3fafbb13..03c607f4cc8b3f 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -36,12 +36,27 @@ namespace doris { namespace io { HdfsFileReader::HdfsFileReader(Path path, const std::string& name_node, - FileHandleCache::Accessor accessor) - : _path(std::move(path)), _name_node(name_node), _accessor(std::move(accessor)) { + FileHandleCache::Accessor accessor, RuntimeProfile* profile) + : _path(std::move(path)), + _name_node(name_node), + _accessor(std::move(accessor)), + _profile(profile) { _handle = _accessor.get(); DorisMetrics::instance()->hdfs_file_open_reading->increment(1); DorisMetrics::instance()->hdfs_file_reader_total->increment(1); + if (_profile != nullptr) { + const char* hdfs_profile_name = "HdfsIO"; + ADD_TIMER(_profile, hdfs_profile_name); + _hdfs_profile.total_bytes_read = + ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, hdfs_profile_name); + _hdfs_profile.total_local_bytes_read = + ADD_CHILD_COUNTER(_profile, "TotalLocalBytesRead", TUnit::BYTES, hdfs_profile_name); + _hdfs_profile.total_short_circuit_bytes_read = ADD_CHILD_COUNTER( + _profile, "TotalShortCircuitBytesRead", TUnit::BYTES, hdfs_profile_name); + _hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER( + _profile, "TotalZeroCopyBytesRead", TUnit::BYTES, hdfs_profile_name); + } } HdfsFileReader::~HdfsFileReader() { @@ -52,6 +67,23 @@ Status HdfsFileReader::close() { bool expected = false; if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { DorisMetrics::instance()->hdfs_file_open_reading->increment(-1); + if (_profile != nullptr) { + struct hdfsReadStatistics* hdfs_statistics = nullptr; + auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics); + if (r != 0) { + return Status::InternalError( + fmt::format("Failed to run hdfsFileGetReadStatistics(): {}", r)); + } + COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, + hdfs_statistics->totalLocalBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read, + hdfs_statistics->totalShortCircuitBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read, + hdfs_statistics->totalZeroCopyBytesRead); + hdfsFileFreeReadStatistics(hdfs_statistics); + hdfsFileClearReadStatistics(_handle->file()); + } } return Status::OK(); } diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h index efff1bfcd63ca6..c4ac2e37765ea3 100644 --- a/be/src/io/fs/hdfs_file_reader.h +++ b/be/src/io/fs/hdfs_file_reader.h @@ -38,7 +38,8 @@ class IOContext; class HdfsFileReader : public FileReader { public: - HdfsFileReader(Path path, const std::string& name_node, FileHandleCache::Accessor accessor); + HdfsFileReader(Path path, const std::string& name_node, FileHandleCache::Accessor accessor, + RuntimeProfile* profile); ~HdfsFileReader() override; @@ -57,11 +58,20 @@ class HdfsFileReader : public FileReader { const IOContext* io_ctx) override; private: + struct HDFSProfile { + RuntimeProfile::Counter* total_bytes_read; + RuntimeProfile::Counter* total_local_bytes_read; + RuntimeProfile::Counter* total_short_circuit_bytes_read; + RuntimeProfile::Counter* total_total_zero_copy_bytes_read; + }; + Path _path; const std::string& _name_node; FileHandleCache::Accessor _accessor; CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle std::atomic _closed = false; + RuntimeProfile* _profile; + HDFSProfile _hdfs_profile; }; } // namespace io } // namespace doris diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 775754bd4d91f4..5e90f04dbacb40 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -123,7 +123,7 @@ Status HdfsFileHandleCache::get_file(const std::shared_ptr& fs, } Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string& path, - std::shared_ptr* fs) { + RuntimeProfile* profile, std::shared_ptr* fs) { #ifdef USE_HADOOP_HDFS if (!config::enable_java_support) { return Status::InternalError( @@ -131,14 +131,16 @@ Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string& "true."); } #endif - (*fs).reset(new HdfsFileSystem(hdfs_params, path)); + (*fs).reset(new HdfsFileSystem(hdfs_params, path, profile)); return (*fs)->connect(); } -HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path) +HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path, + RuntimeProfile* profile) : RemoteFileSystem(path, "", FileSystemType::HDFS), _hdfs_params(hdfs_params), - _fs_handle(nullptr) { + _fs_handle(nullptr), + _profile(profile) { _namenode = _hdfs_params.fs_name; } @@ -175,7 +177,7 @@ Status HdfsFileSystem::open_file_internal(const Path& file, int64_t file_size, std::static_pointer_cast(shared_from_this()), real_path, 0, file_size, &accessor)); - *reader = std::make_shared(file, _namenode, std::move(accessor)); + *reader = std::make_shared(file, _namenode, std::move(accessor), _profile); return Status::OK(); } diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index bd28ec73c2dc7f..6a45a92b37c384 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -33,6 +33,7 @@ #include "io/fs/hdfs.h" #include "io/fs/path.h" #include "io/fs/remote_file_system.h" +#include "util/runtime_profile.h" namespace doris { class THdfsParams; @@ -111,7 +112,7 @@ class HdfsFileHandleCache; class HdfsFileSystem final : public RemoteFileSystem { public: static Status create(const THdfsParams& hdfs_params, const std::string& path, - std::shared_ptr* fs); + RuntimeProfile* profile, std::shared_ptr* fs); ~HdfsFileSystem() override; @@ -148,12 +149,14 @@ class HdfsFileSystem final : public RemoteFileSystem { private: friend class HdfsFileWriter; - HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path); + HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path, + RuntimeProfile* profile); const THdfsParams& _hdfs_params; std::string _namenode; // do not use std::shared_ptr or std::unique_ptr // _fs_handle is managed by HdfsFileSystemCache HdfsFileSystemHandle* _fs_handle; + RuntimeProfile* _profile; }; } // namespace io } // namespace doris diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index f1b58fa454c2f9..3ff8229bc38235 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -86,7 +86,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l } else if (TStorageBackendType::type::HDFS == type) { THdfsParams hdfs_params = parse_properties(_prop); std::shared_ptr fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); _remote_fs = std::move(fs); } else if (TStorageBackendType::type::BROKER == type) { std::shared_ptr fs; diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp index ed408e5f7af748..8977cd0c47c31e 100644 --- a/be/src/vec/runtime/vfile_result_writer.cpp +++ b/be/src/vec/runtime/vfile_result_writer.cpp @@ -595,7 +595,7 @@ Status VFileResultWriter::_delete_dir() { case TStorageBackendType::HDFS: { THdfsParams hdfs_params = parse_properties(_file_opts->broker_properties); std::shared_ptr hdfs_fs = nullptr; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &hdfs_fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &hdfs_fs)); file_system = hdfs_fs; break; }