Skip to content

Commit

Permalink
[Enhancement](multi-catalog) Add hdfs read statistics profile.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaka11chen authored and morningman committed Jul 3, 2023
1 parent 2c5b617 commit 08159e5
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 22 deletions.
9 changes: 5 additions & 4 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
case TFileType::FILE_HDFS: {
THdfsParams hdfs_params = parse_properties(properties);
std::shared_ptr<io::HdfsFileSystem> fs;
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
RETURN_IF_ERROR(fs->create_file(path, &file_writer));
break;
}
Expand Down Expand Up @@ -129,7 +129,7 @@ Status FileFactory::create_file_reader(RuntimeProfile* profile,
}
case TFileType::FILE_HDFS: {
RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description.path,
file_system, file_reader, reader_options));
file_system, file_reader, reader_options, profile));
break;
}
case TFileType::FILE_BROKER: {
Expand Down Expand Up @@ -169,9 +169,10 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
std::shared_ptr<io::FileSystem>* hdfs_file_system,
io::FileReaderSPtr* reader,
const io::FileReaderOptions& reader_options) {
const io::FileReaderOptions& reader_options,
RuntimeProfile* profile) {
std::shared_ptr<io::HdfsFileSystem> fs;
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", profile, &fs));
RETURN_IF_ERROR(fs->open_file(path, reader_options, reader));
*hdfs_file_system = std::move(fs);
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/file_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class FileFactory {
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
std::shared_ptr<io::FileSystem>* hdfs_file_system,
io::FileReaderSPtr* reader,
const io::FileReaderOptions& reader_options);
const io::FileReaderOptions& reader_options,
RuntimeProfile* profile);

static Status create_s3_reader(const std::map<std::string, std::string>& prop,
const std::string& path,
Expand Down
10 changes: 5 additions & 5 deletions be/src/io/fs/benchmark/hdfs_benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class HdfsOpenReadBenchmark : public BaseBenchmark {
io::FileReaderSPtr reader;
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
THdfsParams hdfs_params = parse_properties(_conf_map);
RETURN_IF_ERROR(
FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts));
RETURN_IF_ERROR(FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader,
reader_opts, nullptr));
auto end = std::chrono::high_resolution_clock::now();
auto elapsed_seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
Expand Down Expand Up @@ -94,7 +94,7 @@ class HdfsCreateWriteBenchmark : public BaseBenchmark {
std::shared_ptr<io::HdfsFileSystem> fs;
io::FileWriterPtr writer;
THdfsParams hdfs_params = parse_properties(_conf_map);
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
RETURN_IF_ERROR(fs->create_file(file_path, &writer));
return write(state, writer.get());
}
Expand All @@ -115,7 +115,7 @@ class HdfsRenameBenchmark : public BaseBenchmark {
auto new_file_path = file_path + "_new";
THdfsParams hdfs_params = parse_properties(_conf_map);
std::shared_ptr<io::HdfsFileSystem> fs;
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));

auto start = std::chrono::high_resolution_clock::now();
RETURN_IF_ERROR(fs->rename(file_path, new_file_path));
Expand All @@ -142,7 +142,7 @@ class HdfsExistsBenchmark : public BaseBenchmark {

std::shared_ptr<io::HdfsFileSystem> fs;
THdfsParams hdfs_params = parse_properties(_conf_map);
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));

auto start = std::chrono::high_resolution_clock::now();
bool res = false;
Expand Down
36 changes: 34 additions & 2 deletions be/src/io/fs/hdfs_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,27 @@ namespace doris {
namespace io {

HdfsFileReader::HdfsFileReader(Path path, const std::string& name_node,
FileHandleCache::Accessor accessor)
: _path(std::move(path)), _name_node(name_node), _accessor(std::move(accessor)) {
FileHandleCache::Accessor accessor, RuntimeProfile* profile)
: _path(std::move(path)),
_name_node(name_node),
_accessor(std::move(accessor)),
_profile(profile) {
_handle = _accessor.get();

DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
DorisMetrics::instance()->hdfs_file_reader_total->increment(1);
if (_profile != nullptr) {
const char* hdfs_profile_name = "HdfsIO";
ADD_TIMER(_profile, hdfs_profile_name);
_hdfs_profile.total_bytes_read =
ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, hdfs_profile_name);
_hdfs_profile.total_local_bytes_read =
ADD_CHILD_COUNTER(_profile, "TotalLocalBytesRead", TUnit::BYTES, hdfs_profile_name);
_hdfs_profile.total_short_circuit_bytes_read = ADD_CHILD_COUNTER(
_profile, "TotalShortCircuitBytesRead", TUnit::BYTES, hdfs_profile_name);
_hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER(
_profile, "TotalZeroCopyBytesRead", TUnit::BYTES, hdfs_profile_name);
}
}

HdfsFileReader::~HdfsFileReader() {
Expand All @@ -52,6 +67,23 @@ Status HdfsFileReader::close() {
bool expected = false;
if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
if (_profile != nullptr) {
struct hdfsReadStatistics* hdfs_statistics = nullptr;
auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics);
if (r != 0) {
return Status::InternalError(
fmt::format("Failed to run hdfsFileGetReadStatistics(): {}", r));
}
COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead);
COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read,
hdfs_statistics->totalLocalBytesRead);
COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
hdfs_statistics->totalShortCircuitBytesRead);
COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
hdfs_statistics->totalZeroCopyBytesRead);
hdfsFileFreeReadStatistics(hdfs_statistics);
hdfsFileClearReadStatistics(_handle->file());
}
}
return Status::OK();
}
Expand Down
12 changes: 11 additions & 1 deletion be/src/io/fs/hdfs_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class IOContext;

class HdfsFileReader : public FileReader {
public:
HdfsFileReader(Path path, const std::string& name_node, FileHandleCache::Accessor accessor);
HdfsFileReader(Path path, const std::string& name_node, FileHandleCache::Accessor accessor,
RuntimeProfile* profile);

~HdfsFileReader() override;

Expand All @@ -57,11 +58,20 @@ class HdfsFileReader : public FileReader {
const IOContext* io_ctx) override;

private:
struct HDFSProfile {
RuntimeProfile::Counter* total_bytes_read;
RuntimeProfile::Counter* total_local_bytes_read;
RuntimeProfile::Counter* total_short_circuit_bytes_read;
RuntimeProfile::Counter* total_total_zero_copy_bytes_read;
};

Path _path;
const std::string& _name_node;
FileHandleCache::Accessor _accessor;
CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle
std::atomic<bool> _closed = false;
RuntimeProfile* _profile;
HDFSProfile _hdfs_profile;
};
} // namespace io
} // namespace doris
12 changes: 7 additions & 5 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,24 @@ Status HdfsFileHandleCache::get_file(const std::shared_ptr<HdfsFileSystem>& fs,
}

Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string& path,
std::shared_ptr<HdfsFileSystem>* fs) {
RuntimeProfile* profile, std::shared_ptr<HdfsFileSystem>* fs) {
#ifdef USE_HADOOP_HDFS
if (!config::enable_java_support) {
return Status::InternalError(
"hdfs file system is not enabled, you can change be config enable_java_support to "
"true.");
}
#endif
(*fs).reset(new HdfsFileSystem(hdfs_params, path));
(*fs).reset(new HdfsFileSystem(hdfs_params, path, profile));
return (*fs)->connect();
}

HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path)
HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path,
RuntimeProfile* profile)
: RemoteFileSystem(path, "", FileSystemType::HDFS),
_hdfs_params(hdfs_params),
_fs_handle(nullptr) {
_fs_handle(nullptr),
_profile(profile) {
_namenode = _hdfs_params.fs_name;
}

Expand Down Expand Up @@ -175,7 +177,7 @@ Status HdfsFileSystem::open_file_internal(const Path& file, int64_t file_size,
std::static_pointer_cast<HdfsFileSystem>(shared_from_this()), real_path, 0, file_size,
&accessor));

*reader = std::make_shared<HdfsFileReader>(file, _namenode, std::move(accessor));
*reader = std::make_shared<HdfsFileReader>(file, _namenode, std::move(accessor), _profile);
return Status::OK();
}

Expand Down
7 changes: 5 additions & 2 deletions be/src/io/fs/hdfs_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "io/fs/hdfs.h"
#include "io/fs/path.h"
#include "io/fs/remote_file_system.h"
#include "util/runtime_profile.h"

namespace doris {
class THdfsParams;
Expand Down Expand Up @@ -111,7 +112,7 @@ class HdfsFileHandleCache;
class HdfsFileSystem final : public RemoteFileSystem {
public:
static Status create(const THdfsParams& hdfs_params, const std::string& path,
std::shared_ptr<HdfsFileSystem>* fs);
RuntimeProfile* profile, std::shared_ptr<HdfsFileSystem>* fs);

~HdfsFileSystem() override;

Expand Down Expand Up @@ -148,12 +149,14 @@ class HdfsFileSystem final : public RemoteFileSystem {

private:
friend class HdfsFileWriter;
HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path);
HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path,
RuntimeProfile* profile);
const THdfsParams& _hdfs_params;
std::string _namenode;
// do not use std::shared_ptr or std::unique_ptr
// _fs_handle is managed by HdfsFileSystemCache
HdfsFileSystemHandle* _fs_handle;
RuntimeProfile* _profile;
};
} // namespace io
} // namespace doris
2 changes: 1 addition & 1 deletion be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l
} else if (TStorageBackendType::type::HDFS == type) {
THdfsParams hdfs_params = parse_properties(_prop);
std::shared_ptr<io::HdfsFileSystem> fs;
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
_remote_fs = std::move(fs);
} else if (TStorageBackendType::type::BROKER == type) {
std::shared_ptr<io::BrokerFileSystem> fs;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/runtime/vfile_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ Status VFileResultWriter::_delete_dir() {
case TStorageBackendType::HDFS: {
THdfsParams hdfs_params = parse_properties(_file_opts->broker_properties);
std::shared_ptr<io::HdfsFileSystem> hdfs_fs = nullptr;
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &hdfs_fs));
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &hdfs_fs));
file_system = hdfs_fs;
break;
}
Expand Down

0 comments on commit 08159e5

Please sign in to comment.