Skip to content

Commit

Permalink
[improvement](kerberos) disable hdfs fs handle cache to renew kerbero…
Browse files Browse the repository at this point in the history
…s ticket at fix interval apache#21265
  • Loading branch information
morningman committed Jul 3, 2023
1 parent 5a37534 commit 184b5e4
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 27 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,8 @@ DEFINE_Bool(enable_set_in_bitmap_value, "false");
DEFINE_Int64(max_hdfs_file_handle_cache_num, "20000");
DEFINE_Int64(max_external_file_meta_cache_num, "20000");

DEFINE_mInt64(kerberos_expiration_time_seconds, "43200");

#ifdef BE_TEST
// test s3
DEFINE_String(test_s3_resource, "resource");
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,11 @@ DECLARE_Int64(max_hdfs_file_handle_cache_num);
// max number of meta info of external files, such as parquet footer
DECLARE_Int64(max_external_file_meta_cache_num);

// the max expiration time of kerberos ticket.
// If a hdfs filesytem with kerberos authentication live longer
// than this time, it will be expired.
DECLARE_mInt64(kerberos_expiration_time_seconds);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
49 changes: 27 additions & 22 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class HdfsFileSystemCache {
HdfsFileSystemCache() = default;

uint64 _hdfs_hash_code(const THdfsParams& hdfs_params);
Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs);
Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs, bool* is_kerberos);
void _clean_invalid();
void _clean_oldest();
};
Expand Down Expand Up @@ -423,9 +423,11 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
// ************* HdfsFileSystemCache ******************
int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64;

Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs) {
Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs,
bool* is_kerberos) {
HDFSCommonBuilder builder;
RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder));
*is_kerberos = builder.is_need_kinit();
hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
if (hdfs_fs == nullptr) {
return Status::IOError("faield to connect to hdfs {}: {}", hdfs_params.fs_name,
Expand Down Expand Up @@ -467,30 +469,33 @@ Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params,
auto it = _cache.find(hash_code);
if (it != _cache.end()) {
HdfsFileSystemHandle* handle = it->second.get();
if (handle->invalid()) {
hdfsFS hdfs_fs = nullptr;
RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
*fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
} else {
if (!handle->invalid()) {
handle->inc_ref();
*fs_handle = handle;
return Status::OK();
}
// fs handle is invalid, erase it.
_cache.erase(it);
LOG(INFO) << "erase the hdfs handle, fs name: " << hdfs_params.fs_name;
}

// not find in cache, or fs handle is invalid
// create a new one and try to put it into cache
hdfsFS hdfs_fs = nullptr;
bool is_kerberos = false;
RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs, &is_kerberos));
if (_cache.size() >= MAX_CACHE_HANDLE) {
_clean_invalid();
_clean_oldest();
}
if (_cache.size() < MAX_CACHE_HANDLE) {
std::unique_ptr<HdfsFileSystemHandle> handle =
std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true, is_kerberos);
handle->inc_ref();
*fs_handle = handle.get();
_cache[hash_code] = std::move(handle);
} else {
hdfsFS hdfs_fs = nullptr;
RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
if (_cache.size() >= MAX_CACHE_HANDLE) {
_clean_invalid();
_clean_oldest();
}
if (_cache.size() < MAX_CACHE_HANDLE) {
std::unique_ptr<HdfsFileSystemHandle> handle =
std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true);
handle->inc_ref();
*fs_handle = handle.get();
_cache[hash_code] = std::move(handle);
} else {
*fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
}
*fs_handle = new HdfsFileSystemHandle(hdfs_fs, false, is_kerberos);
}
}
return Status::OK();
Expand Down
26 changes: 21 additions & 5 deletions be/src/io/fs/hdfs_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <string>
#include <vector>

#include "common/config.h"
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/hdfs.h"
Expand All @@ -41,14 +42,21 @@ struct FileInfo;

class HdfsFileSystemHandle {
public:
HdfsFileSystemHandle(hdfsFS fs, bool cached)
: hdfs_fs(fs), from_cache(cached), _ref_cnt(0), _last_access_time(0), _invalid(false) {}
HdfsFileSystemHandle(hdfsFS fs, bool cached, bool is_kerberos)
: hdfs_fs(fs),
from_cache(cached),
_is_kerberos(is_kerberos),
_ref_cnt(0),
_create_time(_now()),
_last_access_time(0),
_invalid(false) {}

~HdfsFileSystemHandle() {
DCHECK(_ref_cnt == 0);
if (hdfs_fs != nullptr) {
// Even if there is an error, the resources associated with the hdfsFS will be freed.
hdfsDisconnect(hdfs_fs);
// DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed"
// even if we create a new one
// hdfsDisconnect(hdfs_fs);
}
hdfs_fs = nullptr;
}
Expand All @@ -67,7 +75,11 @@ class HdfsFileSystemHandle {

int ref_cnt() { return _ref_cnt; }

bool invalid() { return _invalid; }
bool invalid() {
return _invalid ||
(_is_kerberos &&
_now() - _create_time.load() > config::kerberos_expiration_time_seconds * 1000);
}

void set_invalid() { _invalid = true; }

Expand All @@ -77,8 +89,12 @@ class HdfsFileSystemHandle {
const bool from_cache;

private:
const bool _is_kerberos;
// the number of referenced client
std::atomic<int> _ref_cnt;
// For kerberos authentication, we need to save create time so that
// we can know if the kerberos ticket is expired.
std::atomic<uint64_t> _create_time;
// HdfsFileSystemCache try to remove the oldest handler when the cache is full
std::atomic<uint64_t> _last_access_time;
// Client will set invalid if error thrown, and HdfsFileSystemCache will not reuse this handler
Expand Down
1 change: 1 addition & 0 deletions be/src/io/hdfs_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Status HDFSCommonBuilder::run_kinit() {
#endif
hdfsBuilderConfSetStr(hdfs_builder, "hadoop.security.kerberos.ticket.cache.path",
ticket_path.c_str());
LOG(INFO) << "finished to run kinit command: " << fmt::to_string(kinit_command);
return Status::OK();
}

Expand Down

0 comments on commit 184b5e4

Please sign in to comment.