From 184b5e4e6382ea107001a1c2652759f1722a206a Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 27 Jun 2023 22:32:29 +0800 Subject: [PATCH] [improvement](kerberos) disable hdfs fs handle cache to renew kerberos ticket at fix interval #21265 --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 5 ++++ be/src/io/fs/hdfs_file_system.cpp | 49 +++++++++++++++++-------------- be/src/io/fs/hdfs_file_system.h | 26 ++++++++++++---- be/src/io/hdfs_builder.cpp | 1 + 5 files changed, 56 insertions(+), 27 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a76c2892104f71..9650c2256f0f6d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1043,6 +1043,8 @@ DEFINE_Bool(enable_set_in_bitmap_value, "false"); DEFINE_Int64(max_hdfs_file_handle_cache_num, "20000"); DEFINE_Int64(max_external_file_meta_cache_num, "20000"); +DEFINE_mInt64(kerberos_expiration_time_seconds, "43200"); + #ifdef BE_TEST // test s3 DEFINE_String(test_s3_resource, "resource"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 0cd4e8630215d9..0b661f47568173 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1059,6 +1059,11 @@ DECLARE_Int64(max_hdfs_file_handle_cache_num); // max number of meta info of external files, such as parquet footer DECLARE_Int64(max_external_file_meta_cache_num); +// the max expiration time of kerberos ticket. +// If a hdfs filesytem with kerberos authentication live longer +// than this time, it will be expired. +DECLARE_mInt64(kerberos_expiration_time_seconds); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 745a7736f912f3..775754bd4d91f4 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -78,7 +78,7 @@ class HdfsFileSystemCache { HdfsFileSystemCache() = default; uint64 _hdfs_hash_code(const THdfsParams& hdfs_params); - Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs); + Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs, bool* is_kerberos); void _clean_invalid(); void _clean_oldest(); }; @@ -423,9 +423,11 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() { // ************* HdfsFileSystemCache ****************** int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64; -Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs) { +Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs, + bool* is_kerberos) { HDFSCommonBuilder builder; RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder)); + *is_kerberos = builder.is_need_kinit(); hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get()); if (hdfs_fs == nullptr) { return Status::IOError("faield to connect to hdfs {}: {}", hdfs_params.fs_name, @@ -467,30 +469,33 @@ Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params, auto it = _cache.find(hash_code); if (it != _cache.end()) { HdfsFileSystemHandle* handle = it->second.get(); - if (handle->invalid()) { - hdfsFS hdfs_fs = nullptr; - RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs)); - *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false); - } else { + if (!handle->invalid()) { handle->inc_ref(); *fs_handle = handle; + return Status::OK(); } + // fs handle is invalid, erase it. + _cache.erase(it); + LOG(INFO) << "erase the hdfs handle, fs name: " << hdfs_params.fs_name; + } + + // not find in cache, or fs handle is invalid + // create a new one and try to put it into cache + hdfsFS hdfs_fs = nullptr; + bool is_kerberos = false; + RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs, &is_kerberos)); + if (_cache.size() >= MAX_CACHE_HANDLE) { + _clean_invalid(); + _clean_oldest(); + } + if (_cache.size() < MAX_CACHE_HANDLE) { + std::unique_ptr handle = + std::make_unique(hdfs_fs, true, is_kerberos); + handle->inc_ref(); + *fs_handle = handle.get(); + _cache[hash_code] = std::move(handle); } else { - hdfsFS hdfs_fs = nullptr; - RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs)); - if (_cache.size() >= MAX_CACHE_HANDLE) { - _clean_invalid(); - _clean_oldest(); - } - if (_cache.size() < MAX_CACHE_HANDLE) { - std::unique_ptr handle = - std::make_unique(hdfs_fs, true); - handle->inc_ref(); - *fs_handle = handle.get(); - _cache[hash_code] = std::move(handle); - } else { - *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false); - } + *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false, is_kerberos); } } return Status::OK(); diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index d542cd1ba7955e..bd28ec73c2dc7f 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -27,6 +27,7 @@ #include #include +#include "common/config.h" #include "common/status.h" #include "io/fs/file_reader_writer_fwd.h" #include "io/fs/hdfs.h" @@ -41,14 +42,21 @@ struct FileInfo; class HdfsFileSystemHandle { public: - HdfsFileSystemHandle(hdfsFS fs, bool cached) - : hdfs_fs(fs), from_cache(cached), _ref_cnt(0), _last_access_time(0), _invalid(false) {} + HdfsFileSystemHandle(hdfsFS fs, bool cached, bool is_kerberos) + : hdfs_fs(fs), + from_cache(cached), + _is_kerberos(is_kerberos), + _ref_cnt(0), + _create_time(_now()), + _last_access_time(0), + _invalid(false) {} ~HdfsFileSystemHandle() { DCHECK(_ref_cnt == 0); if (hdfs_fs != nullptr) { - // Even if there is an error, the resources associated with the hdfsFS will be freed. - hdfsDisconnect(hdfs_fs); + // DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed" + // even if we create a new one + // hdfsDisconnect(hdfs_fs); } hdfs_fs = nullptr; } @@ -67,7 +75,11 @@ class HdfsFileSystemHandle { int ref_cnt() { return _ref_cnt; } - bool invalid() { return _invalid; } + bool invalid() { + return _invalid || + (_is_kerberos && + _now() - _create_time.load() > config::kerberos_expiration_time_seconds * 1000); + } void set_invalid() { _invalid = true; } @@ -77,8 +89,12 @@ class HdfsFileSystemHandle { const bool from_cache; private: + const bool _is_kerberos; // the number of referenced client std::atomic _ref_cnt; + // For kerberos authentication, we need to save create time so that + // we can know if the kerberos ticket is expired. + std::atomic _create_time; // HdfsFileSystemCache try to remove the oldest handler when the cache is full std::atomic _last_access_time; // Client will set invalid if error thrown, and HdfsFileSystemCache will not reuse this handler diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp index 73edc326c30feb..19986f76e41a8b 100644 --- a/be/src/io/hdfs_builder.cpp +++ b/be/src/io/hdfs_builder.cpp @@ -72,6 +72,7 @@ Status HDFSCommonBuilder::run_kinit() { #endif hdfsBuilderConfSetStr(hdfs_builder, "hadoop.security.kerberos.ticket.cache.path", ticket_path.c_str()); + LOG(INFO) << "finished to run kinit command: " << fmt::to_string(kinit_command); return Status::OK(); }