From 11dd99593157833f635ea3212f3a11032bf67ebf Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 7 Jul 2023 23:01:44 +0800 Subject: [PATCH] 1 --- be/src/io/file_factory.cpp | 10 ++++------ be/src/io/file_factory.h | 19 ++++++++----------- be/src/io/fs/benchmark/s3_benchmark.hpp | 4 ++-- be/src/io/fs/broker_file_system.cpp | 3 ++- be/src/io/fs/broker_file_system.h | 3 ++- be/src/io/fs/buffered_reader.h | 11 ++++------- be/src/io/fs/file_system.h | 3 ++- be/src/io/fs/hdfs_file_system.cpp | 4 ++-- be/src/io/fs/hdfs_file_system.h | 3 ++- be/src/io/fs/local_file_system.h | 3 +-- be/src/io/fs/remote_file_system.cpp | 3 ++- be/src/io/fs/remote_file_system.h | 3 ++- be/src/io/fs/s3_file_system.h | 3 ++- be/src/olap/rowset/segment_v2/segment.cpp | 3 +-- be/src/util/os_util.cpp | 3 +-- be/src/vec/core/block_spill_reader.cpp | 3 +-- be/src/vec/exec/format/csv/csv_reader.cpp | 8 ++++---- .../vec/exec/format/json/new_json_reader.cpp | 4 ++-- be/src/vec/exec/format/orc/vorc_reader.cpp | 7 ++++--- .../exec/format/parquet/vparquet_reader.cpp | 7 ++++--- 20 files changed, 52 insertions(+), 55 deletions(-) diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 92e4574921299b9..adc02a0aa2e19de 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -112,13 +112,12 @@ Status FileFactory::create_file_reader(const io::FileSystemProperties& system_pr const io::FileDescription& file_description, const io::FileReaderOptions& reader_options, std::shared_ptr* file_system, - io::FileReaderSPtr* file_reader, - RuntimeProfile* profile) { + io::FileReaderSPtr* file_reader, RuntimeProfile* profile) { TFileType::type type = system_properties.system_type; switch (type) { case TFileType::FILE_LOCAL: { - RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_description, - reader_options, file_reader)); + RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_description, reader_options, + file_reader)); break; } case TFileType::FILE_S3: { @@ -169,8 +168,7 @@ Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd, const io::FileReaderOptions& reader_options, std::shared_ptr* hdfs_file_system, - io::FileReaderSPtr* reader, - RuntimeProfile* profile) { + io::FileReaderSPtr* reader, RuntimeProfile* profile) { std::shared_ptr fs; RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", profile, &fs)); RETURN_IF_ERROR(fs->open_file(fd, reader_options, reader)); diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index a0eec6bd74e99be..42589df531a5b67 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -57,24 +57,21 @@ class FileFactory { std::unique_ptr& file_writer); /// Create FileReader - static Status create_file_reader( - const io::FileSystemProperties& system_properties, - const io::FileDescription& file_description, - const io::FileReaderOptions& reader_options, - std::shared_ptr* file_system, - io::FileReaderSPtr* file_reader, - RuntimeProfile* profile = nullptr); + static Status create_file_reader(const io::FileSystemProperties& system_properties, + const io::FileDescription& file_description, + const io::FileReaderOptions& reader_options, + std::shared_ptr* file_system, + io::FileReaderSPtr* file_reader, + RuntimeProfile* profile = nullptr); // Create FileReader for stream load pipe static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader, const TUniqueId& fragment_instance_id); - static Status create_hdfs_reader(const THdfsParams& hdfs_params, - const io::FileDescription& fd, + static Status create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd, const io::FileReaderOptions& reader_options, std::shared_ptr* hdfs_file_system, - io::FileReaderSPtr* reader, - RuntimeProfile* profile); + io::FileReaderSPtr* reader, RuntimeProfile* profile); static Status create_s3_reader(const std::map& prop, const io::FileDescription& fd, diff --git a/be/src/io/fs/benchmark/s3_benchmark.hpp b/be/src/io/fs/benchmark/s3_benchmark.hpp index 8be755561d65990..d18c0606c41e8dc 100644 --- a/be/src/io/fs/benchmark/s3_benchmark.hpp +++ b/be/src/io/fs/benchmark/s3_benchmark.hpp @@ -118,8 +118,8 @@ class S3PrefetchReadBenchmark : public S3Benchmark { io::FileReaderOptions reader_options = FileFactory::get_reader_options(nullptr); IOContext io_ctx; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( - fs_props, fd, reader_options, &fs, &reader, io::DelegateReader::AccessMode::SEQUENTIAL, - &io_ctx)); + fs_props, fd, reader_options, &fs, &reader, + io::DelegateReader::AccessMode::SEQUENTIAL, &io_ctx)); return read(state, reader); } }; diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index e7182ed6eeaf708..5a4342027b11ad2 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -127,7 +127,8 @@ Status BrokerFileSystem::open_file_internal(const FileDescription& fd, const Pat (*_client)->openReader(*response, request); } } catch (apache::thrift::TException& e) { - return Status::RpcError("failed to open file {}: {}", abs_path.native(), error_msg(e.what())); + return Status::RpcError("failed to open file {}: {}", abs_path.native(), + error_msg(e.what())); } if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) { diff --git a/be/src/io/fs/broker_file_system.h b/be/src/io/fs/broker_file_system.h index 641e898ef3782c2..a015f5c1f53338d 100644 --- a/be/src/io/fs/broker_file_system.h +++ b/be/src/io/fs/broker_file_system.h @@ -49,7 +49,8 @@ class BrokerFileSystem final : public RemoteFileSystem { protected: Status connect_impl() override; Status create_file_impl(const Path& file, FileWriterPtr* writer) override; - Status open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) override; + Status open_file_internal(const FileDescription& fd, const Path& abs_path, + FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; Status delete_file_impl(const Path& file) override; Status delete_directory_impl(const Path& dir) override; diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 98a7843aa521f50..b0728e6af1684b4 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -249,13 +249,10 @@ class DelegateReader { static constexpr size_t IN_MEMORY_FILE_SIZE = 8 * 1024 * 1024; static Status create_file_reader( - RuntimeProfile* profile, - const FileSystemProperties& system_properties, - const FileDescription& file_description, - const io::FileReaderOptions& reader_options, - std::shared_ptr* file_system, - io::FileReaderSPtr* file_reader, AccessMode access_mode = SEQUENTIAL, - const IOContext* io_ctx = nullptr, + RuntimeProfile* profile, const FileSystemProperties& system_properties, + const FileDescription& file_description, const io::FileReaderOptions& reader_options, + std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, + AccessMode access_mode = SEQUENTIAL, const IOContext* io_ctx = nullptr, const PrefetchRange file_range = PrefetchRange(0, 0)); }; diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h index f9d01c129470b05..04e2615fb0bbdf7 100644 --- a/be/src/io/fs/file_system.h +++ b/be/src/io/fs/file_system.h @@ -118,7 +118,8 @@ class FileSystem : public std::enable_shared_from_this { virtual Status create_file_impl(const Path& file, FileWriterPtr* writer) = 0; /// open file and return a FileReader - virtual Status open_file_impl(const FileDescription& fd, const Path& abs_file, const FileReaderOptions& reader_options, + virtual Status open_file_impl(const FileDescription& fd, const Path& abs_file, + const FileReaderOptions& reader_options, FileReaderSPtr* reader) = 0; /// create directory recursively diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 511a0a46a586799..4473ea2a3aeb1f4 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -169,8 +169,8 @@ Status HdfsFileSystem::open_file_internal(const FileDescription& fd, const Path& FileHandleCache::Accessor accessor; RETURN_IF_ERROR(HdfsFileHandleCache::instance()->get_file( - std::static_pointer_cast(shared_from_this()), real_path, fd.mtime, fd.file_size, - &accessor)); + std::static_pointer_cast(shared_from_this()), real_path, fd.mtime, + fd.file_size, &accessor)); *reader = std::make_shared(abs_path, _namenode, std::move(accessor), _profile); return Status::OK(); diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index d724bdab18d406e..f365891aa96c7b0 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -123,7 +123,8 @@ class HdfsFileSystem final : public RemoteFileSystem { protected: Status connect_impl() override; Status create_file_impl(const Path& file, FileWriterPtr* writer) override; - Status open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) override; + Status open_file_internal(const FileDescription& fd, const Path& abs_path, + FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; Status delete_file_impl(const Path& file) override; Status delete_directory_impl(const Path& dir) override; diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h index 21e997d535cfb65..d9c0ec96c853bf4 100644 --- a/be/src/io/fs/local_file_system.h +++ b/be/src/io/fs/local_file_system.h @@ -75,8 +75,7 @@ class LocalFileSystem final : public FileSystem { protected: Status create_file_impl(const Path& file, FileWriterPtr* writer) override; Status open_file_impl(const FileDescription& file_desc, const Path& abs_path, - const FileReaderOptions& reader_options, - FileReaderSPtr* reader) override; + const FileReaderOptions& reader_options, FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; Status delete_file_impl(const Path& file) override; Status delete_directory_impl(const Path& dir) override; diff --git a/be/src/io/fs/remote_file_system.cpp b/be/src/io/fs/remote_file_system.cpp index a13094e27b587b9..c16977765387620 100644 --- a/be/src/io/fs/remote_file_system.cpp +++ b/be/src/io/fs/remote_file_system.cpp @@ -72,7 +72,8 @@ Status RemoteFileSystem::connect() { FILESYSTEM_M(connect_impl()); } -Status RemoteFileSystem::open_file_impl(const FileDescription& fd, const Path& abs_path, const FileReaderOptions& reader_options, +Status RemoteFileSystem::open_file_impl(const FileDescription& fd, const Path& abs_path, + const FileReaderOptions& reader_options, FileReaderSPtr* reader) { FileReaderSPtr raw_reader; RETURN_IF_ERROR(open_file_internal(fd, abs_path, &raw_reader)); diff --git a/be/src/io/fs/remote_file_system.h b/be/src/io/fs/remote_file_system.h index 221b2165012a526..559890d5ee40dd8 100644 --- a/be/src/io/fs/remote_file_system.h +++ b/be/src/io/fs/remote_file_system.h @@ -54,7 +54,8 @@ class RemoteFileSystem : public FileSystem { /// connect to remote file system virtual Status connect_impl() = 0; - virtual Status open_file_impl(const FileDescription& fd, const Path& abs_path, const FileReaderOptions& reader_options, + virtual Status open_file_impl(const FileDescription& fd, const Path& abs_path, + const FileReaderOptions& reader_options, FileReaderSPtr* reader) override; /// upload load_file to remote remote_file /// local_file should be an absolute path on local filesystem. diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index 666a7b41e98614b..d2570a10588951f 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -69,7 +69,8 @@ class S3FileSystem final : public RemoteFileSystem { protected: Status connect_impl() override; Status create_file_impl(const Path& file, FileWriterPtr* writer) override; - Status open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) override; + Status open_file_internal(const FileDescription& fd, const Path& abs_path, + FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; Status delete_file_impl(const Path& file) override; Status delete_directory_impl(const Path& dir) override; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index b72a89ebe56d1a3..c6d472d0358baeb 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -85,8 +85,7 @@ Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t se #else // be ut use local file reader instead of remote file reader while use remote cache if (!config::file_cache_type.empty()) { - RETURN_IF_ERROR( - io::global_local_filesystem()->open_file(fd, reader_options, &file_reader)); + RETURN_IF_ERROR(io::global_local_filesystem()->open_file(fd, reader_options, &file_reader)); } else { RETURN_IF_ERROR(fs->open_file(fd, reader_options, &file_reader)); } diff --git a/be/src/util/os_util.cpp b/be/src/util/os_util.cpp index 3d4d21bf004ac22..84cc364aba74f21 100644 --- a/be/src/util/os_util.cpp +++ b/be/src/util/os_util.cpp @@ -104,8 +104,7 @@ Status get_thread_stats(int64_t tid, ThreadStats* stats) { } std::string buf; RETURN_IF_ERROR(io::global_local_filesystem()->read_file_to_string( - strings::Substitute("/proc/self/task/$0/stat", tid), - &buf)); + strings::Substitute("/proc/self/task/$0/stat", tid), &buf)); return parse_stat(buf, nullptr, stats); } void disable_core_dumps() { diff --git a/be/src/vec/core/block_spill_reader.cpp b/be/src/vec/core/block_spill_reader.cpp index 4336d8f096de0d8..8d2d48122964054 100644 --- a/be/src/vec/core/block_spill_reader.cpp +++ b/be/src/vec/core/block_spill_reader.cpp @@ -54,8 +54,7 @@ Status BlockSpillReader::open() { file_description.path = file_path_; io::FileReaderOptions reader_options = io::FileReaderOptions::DEFAULT; RETURN_IF_ERROR(FileFactory::create_file_reader(system_properties, file_description, - reader_options, - &file_system, &file_reader_)); + reader_options, &file_system, &file_reader_)); size_t file_size = file_reader_->size(); diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 8bab81a6ac26067..8db857940206cd3 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -172,8 +172,8 @@ Status CsvReader::init_reader(bool is_load) { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( - _profile, _system_properties, _file_description, reader_options, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, + _profile, _system_properties, _file_description, reader_options, &_file_system, + &_file_reader, io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); } if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && @@ -659,8 +659,8 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { _file_description.start_offset = start_offset; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; - RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description, reader_options, - &_file_system, &_file_reader)); + RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description, + reader_options, &_file_system, &_file_reader)); if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && _params.file_type != TFileType::FILE_BROKER) { return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path); diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 3fc22b4ba8b9805..157b8a63e957492 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -384,8 +384,8 @@ Status NewJsonReader::_open_file_reader() { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( - _profile, _system_properties, _file_description, reader_options, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, + _profile, _system_properties, _file_description, reader_options, &_file_system, + &_file_reader, io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); } return Status::OK(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index d93e222fe80e36c..7f87f77590d553b 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -211,10 +211,11 @@ Status OrcReader::_create_file_reader() { if (_file_input_stream == nullptr) { io::FileReaderSPtr inner_reader; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); - _file_description.mtime = _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; + _file_description.mtime = + _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( - _profile, _system_properties, _file_description, reader_options, &_file_system, &inner_reader, - io::DelegateReader::AccessMode::RANDOM, _io_ctx)); + _profile, _system_properties, _file_description, reader_options, &_file_system, + &inner_reader, io::DelegateReader::AccessMode::RANDOM, _io_ctx)); _file_input_stream.reset(new ORCFileInputStream(_scan_range.path, inner_reader, &_statistics, _io_ctx, _profile)); } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 614110b52e5592b..9b179384e25f198 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -213,10 +213,11 @@ Status ParquetReader::_open_file() { SCOPED_RAW_TIMER(&_statistics.open_file_time); ++_statistics.open_file_num; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); - _file_description.mtime = _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; + _file_description.mtime = + _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( - _profile, _system_properties, _file_description, reader_options, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::RANDOM, _io_ctx)); + _profile, _system_properties, _file_description, reader_options, &_file_system, + &_file_reader, io::DelegateReader::AccessMode::RANDOM, _io_ctx)); } if (_file_metadata == nullptr) { SCOPED_RAW_TIMER(&_statistics.parse_footer_time);