Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Jul 7, 2023
1 parent 94cf286 commit 11dd995
Show file tree
Hide file tree
Showing 20 changed files with 52 additions and 55 deletions.
10 changes: 4 additions & 6 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,12 @@ Status FileFactory::create_file_reader(const io::FileSystemProperties& system_pr
const io::FileDescription& file_description,
const io::FileReaderOptions& reader_options,
std::shared_ptr<io::FileSystem>* file_system,
io::FileReaderSPtr* file_reader,
RuntimeProfile* profile) {
io::FileReaderSPtr* file_reader, RuntimeProfile* profile) {
TFileType::type type = system_properties.system_type;
switch (type) {
case TFileType::FILE_LOCAL: {
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_description,
reader_options, file_reader));
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_description, reader_options,
file_reader));
break;
}
case TFileType::FILE_S3: {
Expand Down Expand Up @@ -169,8 +168,7 @@ Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params,
const io::FileDescription& fd,
const io::FileReaderOptions& reader_options,
std::shared_ptr<io::FileSystem>* hdfs_file_system,
io::FileReaderSPtr* reader,
RuntimeProfile* profile) {
io::FileReaderSPtr* reader, RuntimeProfile* profile) {
std::shared_ptr<io::HdfsFileSystem> fs;
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", profile, &fs));
RETURN_IF_ERROR(fs->open_file(fd, reader_options, reader));
Expand Down
19 changes: 8 additions & 11 deletions be/src/io/file_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,21 @@ class FileFactory {
std::unique_ptr<io::FileWriter>& file_writer);

/// Create FileReader
static Status create_file_reader(
const io::FileSystemProperties& system_properties,
const io::FileDescription& file_description,
const io::FileReaderOptions& reader_options,
std::shared_ptr<io::FileSystem>* file_system,
io::FileReaderSPtr* file_reader,
RuntimeProfile* profile = nullptr);
static Status create_file_reader(const io::FileSystemProperties& system_properties,
const io::FileDescription& file_description,
const io::FileReaderOptions& reader_options,
std::shared_ptr<io::FileSystem>* file_system,
io::FileReaderSPtr* file_reader,
RuntimeProfile* profile = nullptr);

// Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
const TUniqueId& fragment_instance_id);

static Status create_hdfs_reader(const THdfsParams& hdfs_params,
const io::FileDescription& fd,
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd,
const io::FileReaderOptions& reader_options,
std::shared_ptr<io::FileSystem>* hdfs_file_system,
io::FileReaderSPtr* reader,
RuntimeProfile* profile);
io::FileReaderSPtr* reader, RuntimeProfile* profile);

static Status create_s3_reader(const std::map<std::string, std::string>& prop,
const io::FileDescription& fd,
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/benchmark/s3_benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ class S3PrefetchReadBenchmark : public S3Benchmark {
io::FileReaderOptions reader_options = FileFactory::get_reader_options(nullptr);
IOContext io_ctx;
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
fs_props, fd, reader_options, &fs, &reader, io::DelegateReader::AccessMode::SEQUENTIAL,
&io_ctx));
fs_props, fd, reader_options, &fs, &reader,
io::DelegateReader::AccessMode::SEQUENTIAL, &io_ctx));
return read(state, reader);
}
};
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/broker_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ Status BrokerFileSystem::open_file_internal(const FileDescription& fd, const Pat
(*_client)->openReader(*response, request);
}
} catch (apache::thrift::TException& e) {
return Status::RpcError("failed to open file {}: {}", abs_path.native(), error_msg(e.what()));
return Status::RpcError("failed to open file {}: {}", abs_path.native(),
error_msg(e.what()));
}

if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/broker_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class BrokerFileSystem final : public RemoteFileSystem {
protected:
Status connect_impl() override;
Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
Status open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) override;
Status open_file_internal(const FileDescription& fd, const Path& abs_path,
FileReaderSPtr* reader) override;
Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override;
Status delete_file_impl(const Path& file) override;
Status delete_directory_impl(const Path& dir) override;
Expand Down
11 changes: 4 additions & 7 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,10 @@ class DelegateReader {
static constexpr size_t IN_MEMORY_FILE_SIZE = 8 * 1024 * 1024;

static Status create_file_reader(
RuntimeProfile* profile,
const FileSystemProperties& system_properties,
const FileDescription& file_description,
const io::FileReaderOptions& reader_options,
std::shared_ptr<io::FileSystem>* file_system,
io::FileReaderSPtr* file_reader, AccessMode access_mode = SEQUENTIAL,
const IOContext* io_ctx = nullptr,
RuntimeProfile* profile, const FileSystemProperties& system_properties,
const FileDescription& file_description, const io::FileReaderOptions& reader_options,
std::shared_ptr<io::FileSystem>* file_system, io::FileReaderSPtr* file_reader,
AccessMode access_mode = SEQUENTIAL, const IOContext* io_ctx = nullptr,
const PrefetchRange file_range = PrefetchRange(0, 0));
};

Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class FileSystem : public std::enable_shared_from_this<FileSystem> {
virtual Status create_file_impl(const Path& file, FileWriterPtr* writer) = 0;

/// open file and return a FileReader
virtual Status open_file_impl(const FileDescription& fd, const Path& abs_file, const FileReaderOptions& reader_options,
virtual Status open_file_impl(const FileDescription& fd, const Path& abs_file,
const FileReaderOptions& reader_options,
FileReaderSPtr* reader) = 0;

/// create directory recursively
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ Status HdfsFileSystem::open_file_internal(const FileDescription& fd, const Path&

FileHandleCache::Accessor accessor;
RETURN_IF_ERROR(HdfsFileHandleCache::instance()->get_file(
std::static_pointer_cast<HdfsFileSystem>(shared_from_this()), real_path, fd.mtime, fd.file_size,
&accessor));
std::static_pointer_cast<HdfsFileSystem>(shared_from_this()), real_path, fd.mtime,
fd.file_size, &accessor));

*reader = std::make_shared<HdfsFileReader>(abs_path, _namenode, std::move(accessor), _profile);
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/hdfs_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ class HdfsFileSystem final : public RemoteFileSystem {
protected:
Status connect_impl() override;
Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
Status open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) override;
Status open_file_internal(const FileDescription& fd, const Path& abs_path,
FileReaderSPtr* reader) override;
Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override;
Status delete_file_impl(const Path& file) override;
Status delete_directory_impl(const Path& dir) override;
Expand Down
3 changes: 1 addition & 2 deletions be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ class LocalFileSystem final : public FileSystem {
protected:
Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
Status open_file_impl(const FileDescription& file_desc, const Path& abs_path,
const FileReaderOptions& reader_options,
FileReaderSPtr* reader) override;
const FileReaderOptions& reader_options, FileReaderSPtr* reader) override;
Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override;
Status delete_file_impl(const Path& file) override;
Status delete_directory_impl(const Path& dir) override;
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/remote_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ Status RemoteFileSystem::connect() {
FILESYSTEM_M(connect_impl());
}

Status RemoteFileSystem::open_file_impl(const FileDescription& fd, const Path& abs_path, const FileReaderOptions& reader_options,
Status RemoteFileSystem::open_file_impl(const FileDescription& fd, const Path& abs_path,
const FileReaderOptions& reader_options,
FileReaderSPtr* reader) {
FileReaderSPtr raw_reader;
RETURN_IF_ERROR(open_file_internal(fd, abs_path, &raw_reader));
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/remote_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class RemoteFileSystem : public FileSystem {
/// connect to remote file system
virtual Status connect_impl() = 0;

virtual Status open_file_impl(const FileDescription& fd, const Path& abs_path, const FileReaderOptions& reader_options,
virtual Status open_file_impl(const FileDescription& fd, const Path& abs_path,
const FileReaderOptions& reader_options,
FileReaderSPtr* reader) override;
/// upload load_file to remote remote_file
/// local_file should be an absolute path on local filesystem.
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class S3FileSystem final : public RemoteFileSystem {
protected:
Status connect_impl() override;
Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
Status open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) override;
Status open_file_internal(const FileDescription& fd, const Path& abs_path,
FileReaderSPtr* reader) override;
Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override;
Status delete_file_impl(const Path& file) override;
Status delete_directory_impl(const Path& dir) override;
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t se
#else
// be ut use local file reader instead of remote file reader while use remote cache
if (!config::file_cache_type.empty()) {
RETURN_IF_ERROR(
io::global_local_filesystem()->open_file(fd, reader_options, &file_reader));
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(fd, reader_options, &file_reader));
} else {
RETURN_IF_ERROR(fs->open_file(fd, reader_options, &file_reader));
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/util/os_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ Status get_thread_stats(int64_t tid, ThreadStats* stats) {
}
std::string buf;
RETURN_IF_ERROR(io::global_local_filesystem()->read_file_to_string(
strings::Substitute("/proc/self/task/$0/stat", tid),
&buf));
strings::Substitute("/proc/self/task/$0/stat", tid), &buf));
return parse_stat(buf, nullptr, stats);
}
void disable_core_dumps() {
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/core/block_spill_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ Status BlockSpillReader::open() {
file_description.path = file_path_;
io::FileReaderOptions reader_options = io::FileReaderOptions::DEFAULT;
RETURN_IF_ERROR(FileFactory::create_file_reader(system_properties, file_description,
reader_options,
&file_system, &file_reader_));
reader_options, &file_system, &file_reader_));

size_t file_size = file_reader_->size();

Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ Status CsvReader::init_reader(bool is_load) {
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, reader_options, &_file_system, &_file_reader,
io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
_profile, _system_properties, _file_description, reader_options, &_file_system,
&_file_reader, io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
io::PrefetchRange(_range.start_offset, _range.size)));
}
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
Expand Down Expand Up @@ -659,8 +659,8 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
_file_description.start_offset = start_offset;
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description, reader_options,
&_file_system, &_file_reader));
RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description,
reader_options, &_file_system, &_file_reader));
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
_params.file_type != TFileType::FILE_BROKER) {
return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path);
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ Status NewJsonReader::_open_file_reader() {
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, reader_options, &_file_system, &_file_reader,
io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
_profile, _system_properties, _file_description, reader_options, &_file_system,
&_file_reader, io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
io::PrefetchRange(_range.start_offset, _range.size)));
}
return Status::OK();
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,11 @@ Status OrcReader::_create_file_reader() {
if (_file_input_stream == nullptr) {
io::FileReaderSPtr inner_reader;
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
_file_description.mtime = _scan_range.__isset.modification_time ? _scan_range.modification_time : 0;
_file_description.mtime =
_scan_range.__isset.modification_time ? _scan_range.modification_time : 0;
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, reader_options, &_file_system, &inner_reader,
io::DelegateReader::AccessMode::RANDOM, _io_ctx));
_profile, _system_properties, _file_description, reader_options, &_file_system,
&inner_reader, io::DelegateReader::AccessMode::RANDOM, _io_ctx));
_file_input_stream.reset(new ORCFileInputStream(_scan_range.path, inner_reader,
&_statistics, _io_ctx, _profile));
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,11 @@ Status ParquetReader::_open_file() {
SCOPED_RAW_TIMER(&_statistics.open_file_time);
++_statistics.open_file_num;
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
_file_description.mtime = _scan_range.__isset.modification_time ? _scan_range.modification_time : 0;
_file_description.mtime =
_scan_range.__isset.modification_time ? _scan_range.modification_time : 0;
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, reader_options, &_file_system, &_file_reader,
io::DelegateReader::AccessMode::RANDOM, _io_ctx));
_profile, _system_properties, _file_description, reader_options, &_file_system,
&_file_reader, io::DelegateReader::AccessMode::RANDOM, _io_ctx));
}
if (_file_metadata == nullptr) {
SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
Expand Down

0 comments on commit 11dd995

Please sign in to comment.