Skip to content

Commit

Permalink
tmp fix
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Sep 13, 2024
1 parent 57e247a commit 53d8028
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 16 deletions.
34 changes: 29 additions & 5 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,18 +415,34 @@ void PrefetchBuffer::reset_offset(size_t offset, const IOContext* io_ctx) {
} else {
_exceed = false;
}
// First update and reset file cache stats

// The "io_ctx" in the input parameter belongs to the upper caller.
// Because PrefetchBuffer actually runs in another thread,
// its life cycle may be different from that of the caller.
// Therefore, before submitting PrefetchBuffer to the thread pool,
// we need to copy io_ctx to _owned_io_ctx to ensure that
// the life cycle of IOContext is consistent with that of PrefetchBuffer.
// _update_and_reset_io_context(io_ctx);
_prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
[buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); });
}

void PrefetchBuffer::_update_and_reset_io_context(const IOContext* io_ctx) {
if (io_ctx) {
// If file_cache_stats is set in the input parameter,
// first need to update the last statistics in _owned_cache_stats
// to the file_cache_stats in the input parameter.
// Then reset _owned_cache_stats
if (io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->update(_owned_cache_stats);
LOG(INFO) << "yy debug _update_and_reset_io_context. old: "
<< _owned_cache_stats.debug_string() << ", new: " << io_ctx->debug_string();
_owned_cache_stats.reset();
}
// Copy io_ctx
_owned_io_ctx = *io_ctx;
_owned_io_ctx.file_cache_stats = &_owned_cache_stats;
}

_prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
[buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); });
}

// only this function would run concurrently in another thread
Expand Down Expand Up @@ -466,7 +482,8 @@ void PrefetchBuffer::prefetch_buffer() {
{
SCOPED_RAW_TIMER(&_statis.read_time);
// Use owned io_ctx here
s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, &_owned_io_ctx);
s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, nullptr);
// LOG(INFO) << "yy debug prefetch_buffer: " << _owned_io_ctx.debug_string();
}
if (UNLIKELY(s.ok() && buf_size != _len)) {
// This indicates that the data size returned by S3 object storage is smaller than what we requested,
Expand Down Expand Up @@ -609,6 +626,9 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
*bytes_read = read_len;
_statis.request_io += 1;
_statis.request_bytes += read_len;
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_remote += read_len;
}
}
if (off + *bytes_read == _offset + _len) {
reset_offset(_offset + _whole_buffer_size, io_ctx);
Expand Down Expand Up @@ -703,6 +723,10 @@ Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t*
&read_num, io_ctx));
actual_bytes_read += read_num;
offset += read_num;
if (io_ctx) {
LOG(INFO) << "yy debug PrefetchBufferedReader::read_at_impl: "
<< io_ctx->debug_string();
}
}
*bytes_read = actual_bytes_read;
return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public Pro
size_t _whole_buffer_size;
io::FileReader* _reader = nullptr;
// PrefetchBuffer is running in separate thread.
// MUST use self owned FileCacheStatistics to avoid stack-use-after-scope error.
// MUST use self owned FileCacheStatistics and IOContext to avoid stack-use-after-scope error.
// And after reading finish, the caller should update the parent's
// FileCacheStatistics by using stats from this one.
FileCacheStatistics _owned_cache_stats;
Expand Down Expand Up @@ -359,6 +359,8 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public Pro
size_t merge_small_ranges(size_t off, int range_index) const;

void _collect_profile_before_close() override;

void _update_and_reset_io_context(const IOContext* io_ctx);
};

constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB
Expand Down Expand Up @@ -418,6 +420,8 @@ class PrefetchBufferedReader final : public io::FileReader {
int64_t cur_pos = position + i * s_max_pre_buffer_size;
int cur_buf_pos = get_buffer_pos(cur_pos);
// reset would do all the prefetch work
// reset all buffer is done only once when initializing,
// no need to pass IOContext.
_pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos), nullptr);
}
}
Expand Down
37 changes: 27 additions & 10 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <gen_cpp/Types_types.h>

#include <sstream>

namespace doris {

enum class ReaderType : uint8_t {
Expand Down Expand Up @@ -47,16 +49,16 @@ struct FileCacheStatistics {
int64_t num_skip_cache_io_total = 0;

void update(const FileCacheStatistics& other) {
num_local_io_total += num_local_io_total;
num_remote_io_total += num_remote_io_total;
local_io_timer += local_io_timer;
bytes_read_from_local += bytes_read_from_local;
bytes_read_from_remote += bytes_read_from_remote;
remote_io_timer += remote_io_timer;
write_cache_io_timer += write_cache_io_timer;
write_cache_io_timer += write_cache_io_timer;
bytes_write_into_cache += bytes_write_into_cache;
num_skip_cache_io_total += num_skip_cache_io_total;
num_local_io_total += other.num_local_io_total;
num_remote_io_total += other.num_remote_io_total;
local_io_timer += other.local_io_timer;
bytes_read_from_local += other.bytes_read_from_local;
bytes_read_from_remote += other.bytes_read_from_remote;
remote_io_timer += other.remote_io_timer;
write_cache_io_timer += other.write_cache_io_timer;
write_cache_io_timer += other.write_cache_io_timer;
bytes_write_into_cache += other.bytes_write_into_cache;
num_skip_cache_io_total += other.num_skip_cache_io_total;
}

void reset() {
Expand All @@ -70,6 +72,13 @@ struct FileCacheStatistics {
bytes_write_into_cache = 0;
num_skip_cache_io_total = 0;
}

std::string debug_string() const {
std::stringstream ss;
ss << "bytes_read_from_local: " << bytes_read_from_local
<< ", bytes_read_from_remote: " << bytes_read_from_remote;
return ss.str();
}
};

struct IOContext {
Expand Down Expand Up @@ -116,6 +125,14 @@ struct IOContext {
file_cache_stats = other.file_cache_stats;
return *this;
}

std::string debug_string() const {
if (file_cache_stats != nullptr) {
return file_cache_stats->debug_string();
} else {
return "no file cache stats";
}
}
};

} // namespace io
Expand Down

0 comments on commit 53d8028

Please sign in to comment.