Skip to content

Commit

Permalink
[Opt](ShortCircuit) add more stats info to trace
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Oct 29, 2024
1 parent c921be8 commit f58b4ec
Show file tree
Hide file tree
Showing 20 changed files with 232 additions and 81 deletions.
55 changes: 36 additions & 19 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "io/cache/cached_remote_file_reader.h"

#include <butil/time.h>
#include <fmt/format.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
Expand Down Expand Up @@ -174,29 +175,41 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
// read from cache or remote
auto [align_left, align_size] = s_align_size(offset, bytes_req, size());
CacheContext cache_context(io_ctx);
int64_t start = butil::cpuwide_time_ns();
FileBlocksHolder holder =
_cache->get_or_set(_cache_hash, align_left, align_size, cache_context);
stats.get_cache_lock_timer += butil::cpuwide_time_ns() - start;
if (butil::cpuwide_time_ns() - start > 50 * 1000 * 1000) {
LOG(WARNING) << "Get cache lock time too long: " << stats.get_cache_lock_timer;
}
std::vector<FileBlockSPtr> empty_blocks;
for (auto& block : holder.file_blocks) {
switch (block->state()) {
case FileBlock::State::EMPTY:
block->get_or_set_downloader();
if (block->is_downloader()) {
{
int64_t start = butil::cpuwide_time_ns();
for (auto& block : holder.file_blocks) {
switch (block->state()) {
case FileBlock::State::EMPTY:
block->get_or_set_downloader();
if (block->is_downloader()) {
empty_blocks.push_back(block);
TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::EMPTY");
}
stats.hit_cache = false;
break;
case FileBlock::State::SKIP_CACHE:
empty_blocks.push_back(block);
TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::EMPTY");
stats.hit_cache = false;
stats.skip_cache = true;
break;
case FileBlock::State::DOWNLOADING:
stats.hit_cache = false;
break;
case FileBlock::State::DOWNLOADED:
break;
}
stats.hit_cache = false;
break;
case FileBlock::State::SKIP_CACHE:
empty_blocks.push_back(block);
stats.hit_cache = false;
stats.skip_cache = true;
break;
case FileBlock::State::DOWNLOADING:
stats.hit_cache = false;
break;
case FileBlock::State::DOWNLOADED:
break;
}
stats.get_cache_lock_timer += butil::cpuwide_time_ns() - start;
if (butil::cpuwide_time_ns() - start > 50 * 1000 * 1000) {
LOG(WARNING) << "Get cache lock time too long: " << stats.get_cache_lock_timer;
}
}
size_t empty_start = 0;
Expand Down Expand Up @@ -231,6 +244,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
stats.bytes_write_into_file_cache += block_size;
}
// copy from memory directly
int64_t start = butil::cpuwide_time_ns();
size_t right_offset = offset + bytes_req - 1;
if (empty_start <= right_offset && empty_end >= offset) {
size_t copy_left_offset = offset < empty_start ? empty_start : offset;
Expand All @@ -240,6 +254,9 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
size_t copy_size = copy_right_offset - copy_left_offset + 1;
memcpy(dst, src, copy_size);
}
if (butil::cpuwide_time_ns() - start > 50 * 1000 * 1000) {
LOG(WARNING) << "copy from memory directly too long: " << stats.get_cache_lock_timer;
}
}

size_t current_offset = offset;
Expand Down Expand Up @@ -326,9 +343,9 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
statis->num_skip_cache_io_total += read_stats.skip_cache;
statis->bytes_write_into_cache += read_stats.bytes_write_into_file_cache;
statis->write_cache_io_timer += read_stats.local_write_timer;

g_skip_cache_num << read_stats.skip_cache;
g_skip_cache_sum << read_stats.skip_cache;
statis->get_cache_lock_timer += read_stats.get_cache_lock_timer;
}

} // namespace doris::io
1 change: 1 addition & 0 deletions be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class CachedRemoteFileReader final : public FileReader {
int64_t remote_read_timer = 0;
int64_t local_read_timer = 0;
int64_t local_write_timer = 0;
int64_t get_cache_lock_timer = 0;
};
void _update_state(const ReadStatistics& stats, FileCacheStatistics* state) const;
};
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/file_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "common/status.h"
#include "io/cache/file_cache_common.h"
#include "olap/olap_common.h"
#include "util/slice.h"

namespace doris {
Expand Down Expand Up @@ -153,6 +154,7 @@ class FileBlock {
std::condition_variable _cv;
FileCacheKey _key;
size_t _downloaded_size {0};
io::FileCacheStatistics* _file_cache_stat;
};

extern std::ostream& operator<<(std::ostream& os, const FileBlock::State& value);
Expand Down
50 changes: 45 additions & 5 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "io/cache/fs_file_cache_storage.h"

#include <bvar/scoped_timer.h>

#include <filesystem>
#include <mutex>
#include <system_error>
Expand Down Expand Up @@ -113,6 +115,7 @@ Status FSFileCacheStorage::append(const FileCacheKey& key, const Slice& value) {
FileWriter* writer = nullptr;
{
std::lock_guard lock(_mtx);
int64_t start = butil::cpuwide_time_ms();
auto file_writer_map_key = std::make_pair(key.hash, key.offset);
if (auto iter = _key_to_writer.find(file_writer_map_key); iter != _key_to_writer.end()) {
writer = iter->second.get();
Expand All @@ -129,20 +132,37 @@ Status FSFileCacheStorage::append(const FileCacheKey& key, const Slice& value) {
writer = file_writer.get();
_key_to_writer.emplace(file_writer_map_key, std::move(file_writer));
}
int cost = butil::cpuwide_time_ms() - start;
if (cost > 50) {
LOG(WARNING) << "FSFileCacheStorage 1 append cost " << cost << "ms";
}
}
DCHECK_NE(writer, nullptr);
return writer->append(value);
{
int64_t start = butil::cpuwide_time_ms();
auto s = writer->append(value);
int cost = butil::cpuwide_time_ms() - start;
if (cost > 50) {
LOG(WARNING) << "FSFileCacheStorage 2 append cost " << cost << "ms";
}
return s;
}
}

Status FSFileCacheStorage::finalize(const FileCacheKey& key) {
FileWriterPtr file_writer;
{
std::lock_guard lock(_mtx);
int64_t start = butil::cpuwide_time_ms();
auto file_writer_map_key = std::make_pair(key.hash, key.offset);
auto iter = _key_to_writer.find(file_writer_map_key);
DCHECK(iter != _key_to_writer.end());
file_writer = std::move(iter->second);
_key_to_writer.erase(iter);
int cost = butil::cpuwide_time_ms() - start;
if (cost > 50) {
LOG(WARNING) << "FSFileCacheStorage finalize cost " << cost << "ms";
}
}
if (file_writer->state() != FileWriter::State::CLOSED) {
RETURN_IF_ERROR(file_writer->close());
Expand All @@ -154,8 +174,17 @@ Status FSFileCacheStorage::finalize(const FileCacheKey& key) {

Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Slice buffer) {
AccessKeyAndOffset fd_key = std::make_pair(key.hash, key.offset);
FileReaderSPtr file_reader = FDCache::instance()->get_file_reader(fd_key);
FileReaderSPtr file_reader;
{
int64_t start = butil::cpuwide_time_ms();
file_reader = FDCache::instance()->get_file_reader(fd_key);
int cost = butil::cpuwide_time_ms() - start;
if (cost > 50) {
LOG(WARNING) << "FSFileCacheStorage get_file_reader 1 cost " << cost << "ms";
}
}
if (!file_reader) {
int64_t start = butil::cpuwide_time_ms();
std::string file =
get_path_in_local_cache(get_path_in_local_cache(key.hash, key.meta.expiration_time),
key.offset, key.meta.type);
Expand All @@ -181,10 +210,21 @@ Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Sl
}
}
FDCache::instance()->insert_file_reader(fd_key, file_reader);
int cost = butil::cpuwide_time_ms() - start;
if (cost > 50) {
LOG(WARNING) << "FSFileCacheStorage get_file_reader 2 cost " << cost << "ms";
}
}
{
int64_t start = butil::cpuwide_time_ms();
size_t bytes_read = 0;
RETURN_IF_ERROR(file_reader->read_at(value_offset, buffer, &bytes_read));
DCHECK(bytes_read == buffer.get_size());
int cost = butil::cpuwide_time_ms() - start;
if (cost > 50) {
LOG(WARNING) << "FSFileCacheStorage get_file_reader 3 cost " << cost << "ms";
}
}
size_t bytes_read = 0;
RETURN_IF_ERROR(file_reader->read_at(value_offset, buffer, &bytes_read));
DCHECK(bytes_read == buffer.get_size());
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct FileCacheStatistics {
int64_t write_cache_io_timer = 0;
int64_t bytes_write_into_cache = 0;
int64_t num_skip_cache_io_total = 0;
int64_t get_cache_lock_timer = 0;
};

struct IOContext {
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ Status _get_segment_column_iterator(const BetaRowsetSharedPtr& rowset, uint32_t
.use_page_cache = !config::disable_storage_page_cache,
.file_reader = segment->file_reader().get(),
.stats = stats,
.io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY},
.io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY,
.file_cache_stats = &stats->file_cache_stats},
};
RETURN_IF_ERROR((*column_iterator)->init(opt));
return Status::OK();
Expand Down Expand Up @@ -443,7 +444,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest
RowLocation* row_location, uint32_t version,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
RowsetSharedPtr* rowset, bool with_rowid,
std::string* encoded_seq_value) {
std::string* encoded_seq_value, OlapReaderStatistics* stats) {
SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency);
size_t seq_col_length = 0;
// use the latest tablet schema to decide if the tablet has sequence column currently
Expand Down Expand Up @@ -491,7 +492,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest

for (auto id : picked_segments) {
Status s = segments[id]->lookup_row_key(encoded_key, schema, with_seq_col, with_rowid,
&loc, encoded_seq_value);
&loc, encoded_seq_value, stats);
if (s.is<KEY_NOT_FOUND>()) {
continue;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ class BaseTablet {
RowLocation* row_location, uint32_t version,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
RowsetSharedPtr* rowset = nullptr, bool with_rowid = true,
std::string* encoded_seq_value = nullptr);
std::string* encoded_seq_value = nullptr,
OlapReaderStatistics* stats = nullptr);

// calc delete bitmap when flush memtable, use a fake version to calc
// For example, cur max version is 5, and we use version 6 to calc but
Expand Down
12 changes: 10 additions & 2 deletions be/src/olap/primary_key_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "olap/primary_key_index.h"

#include <butil/time.h>
#include <gen_cpp/segment_v2.pb.h>

#include <utility>
Expand Down Expand Up @@ -95,7 +96,8 @@ Status PrimaryKeyIndexReader::parse_index(io::FileReaderSPtr file_reader,
// parse primary key index
_index_reader.reset(new segment_v2::IndexedColumnReader(file_reader, meta.primary_key_index()));
_index_reader->set_is_pk_index(true);
RETURN_IF_ERROR(_index_reader->load(!config::disable_pk_storage_page_cache, false));
RETURN_IF_ERROR(_index_reader->load(!config::disable_pk_storage_page_cache, false,
_pk_index_load_stats));

_index_parsed = true;
return Status::OK();
Expand All @@ -107,7 +109,9 @@ Status PrimaryKeyIndexReader::parse_bf(io::FileReaderSPtr file_reader,
segment_v2::ColumnIndexMetaPB column_index_meta = meta.bloom_filter_index();
segment_v2::BloomFilterIndexReader bf_index_reader(std::move(file_reader),
column_index_meta.bloom_filter_index());
RETURN_IF_ERROR(bf_index_reader.load(!config::disable_pk_storage_page_cache, false));
RETURN_IF_ERROR(bf_index_reader.load(!config::disable_pk_storage_page_cache, false,
_pk_index_load_stats));
int64_t start = butil::cpuwide_time_ms();
std::unique_ptr<segment_v2::BloomFilterIndexIterator> bf_iter;
RETURN_IF_ERROR(bf_index_reader.new_iterator(&bf_iter));
RETURN_IF_ERROR(bf_iter->read_bloom_filter(0, &_bf));
Expand All @@ -119,6 +123,10 @@ Status PrimaryKeyIndexReader::parse_bf(io::FileReaderSPtr file_reader,
_bf_bytes += _bf->size();

_bf_parsed = true;
if (butil::cpuwide_time_ms() - start > 50) {
LOG(WARNING) << "read bloom filter index too slow, cost "
<< butil::cpuwide_time_ms() - start << "ms";
}

return Status::OK();
}
Expand Down
10 changes: 7 additions & 3 deletions be/src/olap/primary_key_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/rowset/segment_v2/bloom_filter_index_writer.h"
#include "olap/rowset/segment_v2/indexed_column_reader.h"
Expand Down Expand Up @@ -97,7 +98,8 @@ class PrimaryKeyIndexBuilder {

class PrimaryKeyIndexReader {
public:
PrimaryKeyIndexReader() : _index_parsed(false), _bf_parsed(false) {}
PrimaryKeyIndexReader(OlapReaderStatistics* pk_index_load_stats = nullptr)
: _index_parsed(false), _bf_parsed(false), _pk_index_load_stats(pk_index_load_stats) {}

~PrimaryKeyIndexReader() {
segment_v2::g_pk_total_bloom_filter_num << -static_cast<int64_t>(_bf_num);
Expand All @@ -111,9 +113,10 @@ class PrimaryKeyIndexReader {

Status parse_bf(io::FileReaderSPtr file_reader, const segment_v2::PrimaryKeyIndexMetaPB& meta);

Status new_iterator(std::unique_ptr<segment_v2::IndexedColumnIterator>* index_iterator) const {
Status new_iterator(std::unique_ptr<segment_v2::IndexedColumnIterator>* index_iterator,
OlapReaderStatistics* stats = nullptr) const {
DCHECK(_index_parsed);
index_iterator->reset(new segment_v2::IndexedColumnIterator(_index_reader.get()));
index_iterator->reset(new segment_v2::IndexedColumnIterator(_index_reader.get(), stats));
return Status::OK();
}

Expand Down Expand Up @@ -152,6 +155,7 @@ class PrimaryKeyIndexReader {
std::unique_ptr<segment_v2::BloomFilter> _bf;
size_t _bf_num = 0;
uint64 _bf_bytes = 0;
OlapReaderStatistics* _pk_index_load_stats = nullptr;
};

} // namespace doris
6 changes: 4 additions & 2 deletions be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
namespace doris {
namespace segment_v2 {

Status BloomFilterIndexReader::load(bool use_page_cache, bool kept_in_memory) {
Status BloomFilterIndexReader::load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* index_load_stats) {
// TODO yyq: implement a new once flag to avoid status construct.
_index_load_stats = index_load_stats;
return _load_once.call([this, use_page_cache, kept_in_memory] {
return _load(use_page_cache, kept_in_memory);
});
Expand All @@ -47,7 +49,7 @@ Status BloomFilterIndexReader::_load(bool use_page_cache, bool kept_in_memory) {
const IndexedColumnMetaPB& bf_index_meta = _bloom_filter_index_meta->bloom_filter();

_bloom_filter_reader.reset(new IndexedColumnReader(_file_reader, bf_index_meta));
RETURN_IF_ERROR(_bloom_filter_reader->load(use_page_cache, kept_in_memory));
RETURN_IF_ERROR(_bloom_filter_reader->load(use_page_cache, kept_in_memory, _index_load_stats));
update_metadata_size();
return Status::OK();
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class BloomFilterIndexReader : public MetadataAdder<BloomFilterIndexReader> {
_bloom_filter_index_meta.reset(new BloomFilterIndexPB(bloom_filter_index_meta));
}

Status load(bool use_page_cache, bool kept_in_memory);
Status load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* _bf_index_load_stats = nullptr);

BloomFilterAlgorithmPB algorithm() { return _bloom_filter_index_meta->algorithm(); }

Expand All @@ -69,6 +70,7 @@ class BloomFilterIndexReader : public MetadataAdder<BloomFilterIndexReader> {
const TypeInfo* _type_info = nullptr;
std::unique_ptr<BloomFilterIndexPB> _bloom_filter_index_meta = nullptr;
std::unique_ptr<IndexedColumnReader> _bloom_filter_reader;
OlapReaderStatistics* _index_load_stats = nullptr;
};

class BloomFilterIndexIterator {
Expand Down
Loading

0 comments on commit f58b4ec

Please sign in to comment.