From 53d8028534f5010db69c637777e918a04f7e7ea6 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 13 Sep 2024 15:56:31 +0800 Subject: [PATCH] tmp fix --- be/src/io/fs/buffered_reader.cpp | 34 ++++++++++++++++++++++++----- be/src/io/fs/buffered_reader.h | 6 +++++- be/src/io/io_common.h | 37 +++++++++++++++++++++++--------- 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 15d8f45349b0fed..8526c2825b393a3 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -415,18 +415,34 @@ void PrefetchBuffer::reset_offset(size_t offset, const IOContext* io_ctx) { } else { _exceed = false; } - // First update and reset file cache stats + + // The "io_ctx" in the input parameter belongs to the upper caller. + // Because PrefetchBuffer actually runs in another thread, + // its life cycle may be different from that of the caller. + // Therefore, before submitting PrefetchBuffer to the thread pool, + // we need to copy io_ctx to _owned_io_ctx to ensure that + // the life cycle of IOContext is consistent with that of PrefetchBuffer. + // _update_and_reset_io_context(io_ctx); + _prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( + [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); +} + +void PrefetchBuffer::_update_and_reset_io_context(const IOContext* io_ctx) { if (io_ctx) { + // If file_cache_stats is set in the input parameter, + // first need to update the last statistics in _owned_cache_stats + // to the file_cache_stats in the input parameter. + // Then reset _owned_cache_stats if (io_ctx->file_cache_stats) { io_ctx->file_cache_stats->update(_owned_cache_stats); + LOG(INFO) << "yy debug _update_and_reset_io_context. old: " + << _owned_cache_stats.debug_string() << ", new: " << io_ctx->debug_string(); _owned_cache_stats.reset(); } + // Copy io_ctx _owned_io_ctx = *io_ctx; _owned_io_ctx.file_cache_stats = &_owned_cache_stats; } - - _prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( - [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); } // only this function would run concurrently in another thread @@ -466,7 +482,8 @@ void PrefetchBuffer::prefetch_buffer() { { SCOPED_RAW_TIMER(&_statis.read_time); // Use owned io_ctx here - s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, &_owned_io_ctx); + s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, nullptr); + // LOG(INFO) << "yy debug prefetch_buffer: " << _owned_io_ctx.debug_string(); } if (UNLIKELY(s.ok() && buf_size != _len)) { // This indicates that the data size returned by S3 object storage is smaller than what we requested, @@ -609,6 +626,9 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, *bytes_read = read_len; _statis.request_io += 1; _statis.request_bytes += read_len; + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_remote += read_len; + } } if (off + *bytes_read == _offset + _len) { reset_offset(_offset + _whole_buffer_size, io_ctx); @@ -703,6 +723,10 @@ Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t* &read_num, io_ctx)); actual_bytes_read += read_num; offset += read_num; + if (io_ctx) { + LOG(INFO) << "yy debug PrefetchBufferedReader::read_at_impl: " + << io_ctx->debug_string(); + } } *bytes_read = actual_bytes_read; return Status::OK(); diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 100f664101eeb72..fd5c157b4521c70 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -309,7 +309,7 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro size_t _whole_buffer_size; io::FileReader* _reader = nullptr; // PrefetchBuffer is running in separate thread. - // MUST use self owned FileCacheStatistics to avoid stack-use-after-scope error. + // MUST use self owned FileCacheStatistics and IOContext to avoid stack-use-after-scope error. // And after reading finish, the caller should update the parent's // FileCacheStatistics by using stats from this one. FileCacheStatistics _owned_cache_stats; @@ -359,6 +359,8 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro size_t merge_small_ranges(size_t off, int range_index) const; void _collect_profile_before_close() override; + + void _update_and_reset_io_context(const IOContext* io_ctx); }; constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB @@ -418,6 +420,8 @@ class PrefetchBufferedReader final : public io::FileReader { int64_t cur_pos = position + i * s_max_pre_buffer_size; int cur_buf_pos = get_buffer_pos(cur_pos); // reset would do all the prefetch work + // reset all buffer is done only once when initializing, + // no need to pass IOContext. _pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos), nullptr); } } diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h index 9f4788f22fa7801..1d885f5f0f9ddc2 100644 --- a/be/src/io/io_common.h +++ b/be/src/io/io_common.h @@ -19,6 +19,8 @@ #include +#include + namespace doris { enum class ReaderType : uint8_t { @@ -47,16 +49,16 @@ struct FileCacheStatistics { int64_t num_skip_cache_io_total = 0; void update(const FileCacheStatistics& other) { - num_local_io_total += num_local_io_total; - num_remote_io_total += num_remote_io_total; - local_io_timer += local_io_timer; - bytes_read_from_local += bytes_read_from_local; - bytes_read_from_remote += bytes_read_from_remote; - remote_io_timer += remote_io_timer; - write_cache_io_timer += write_cache_io_timer; - write_cache_io_timer += write_cache_io_timer; - bytes_write_into_cache += bytes_write_into_cache; - num_skip_cache_io_total += num_skip_cache_io_total; + num_local_io_total += other.num_local_io_total; + num_remote_io_total += other.num_remote_io_total; + local_io_timer += other.local_io_timer; + bytes_read_from_local += other.bytes_read_from_local; + bytes_read_from_remote += other.bytes_read_from_remote; + remote_io_timer += other.remote_io_timer; + write_cache_io_timer += other.write_cache_io_timer; + write_cache_io_timer += other.write_cache_io_timer; + bytes_write_into_cache += other.bytes_write_into_cache; + num_skip_cache_io_total += other.num_skip_cache_io_total; } void reset() { @@ -70,6 +72,13 @@ struct FileCacheStatistics { bytes_write_into_cache = 0; num_skip_cache_io_total = 0; } + + std::string debug_string() const { + std::stringstream ss; + ss << "bytes_read_from_local: " << bytes_read_from_local + << ", bytes_read_from_remote: " << bytes_read_from_remote; + return ss.str(); + } }; struct IOContext { @@ -116,6 +125,14 @@ struct IOContext { file_cache_stats = other.file_cache_stats; return *this; } + + std::string debug_string() const { + if (file_cache_stats != nullptr) { + return file_cache_stats->debug_string(); + } else { + return "no file cache stats"; + } + } }; } // namespace io