diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 4e2827b45e4..3f62fba12b8 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -771,6 +771,12 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, bool FilePrefetchBuffer::TryReadFromCacheUntracked( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, bool for_compaction) { + // We disallow async IO for compaction reads since they are performed in + // the background anyways and are less latency sensitive compareed to + // user-initiated reads + (void)for_compaction; + assert(!for_compaction || num_buffers_ == 1); + if (track_min_offset_ && offset < min_offset_read_) { min_offset_read_ = static_cast(offset); } @@ -819,27 +825,22 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked( assert(reader != nullptr); assert(max_readahead_size_ >= readahead_size_); - if (for_compaction) { - s = Prefetch(opts, reader, offset, std::max(n, readahead_size_)); - } else { - if (implicit_auto_readahead_) { - if (!IsEligibleForPrefetch(offset, n)) { - // Ignore status as Prefetch is not called. - s.PermitUncheckedError(); - return false; - } + if (implicit_auto_readahead_) { + if (!IsEligibleForPrefetch(offset, n)) { + // Ignore status as Prefetch is not called. + s.PermitUncheckedError(); + return false; } - - // Prefetch n + readahead_size_/2 synchronously as remaining - // readahead_size_/2 will be prefetched asynchronously if num_buffers_ - // > 1. - s = PrefetchInternal( - opts, reader, offset, n, - (num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_), - copy_to_overlap_buffer); - explicit_prefetch_submitted_ = false; } + // Prefetch n + readahead_size_/2 synchronously as remaining + // readahead_size_/2 will be prefetched asynchronously if num_buffers_ + // > 1. + s = PrefetchInternal( + opts, reader, offset, n, + (num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_), + copy_to_overlap_buffer); + explicit_prefetch_submitted_ = false; if (!s.ok()) { if (status) { *status = s; @@ -853,7 +854,7 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked( } else { return false; } - } else if (!for_compaction) { + } else { UpdateStats(/*found_in_buffer=*/true, n); } @@ -864,6 +865,7 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked( buf = overlap_buf_; } assert(buf->offset_ <= offset); + assert(buf->IsDataBlockInBuffer(offset, n)); uint64_t offset_in_buffer = offset - buf->offset_; *result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n); if (prefetched) { diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 43f394394cd..d2eb52f359e 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -3290,8 +3290,57 @@ TEST_F(FilePrefetchBufferTest, SyncReadaheadStats) { /* 24576(end offset of the buffer) - 16000(requested offset) =*/8576); } -class FSBufferPrefetchTest : public testing::Test, - public ::testing::WithParamInterface { +TEST_F(FilePrefetchBufferTest, ForCompaction) { + // Make sure TryReadWithCache with for_compaction=true works without file + // system buffer reuse optimization + std::string fname = "fs-prefetch-buffer-for-compaction"; + Random rand(0); + std::string content = rand.RandomString(64 * 1024); + Write(fname, content); + + FileOptions opts; + std::unique_ptr r; + Read(fname, opts, &r); + + std::shared_ptr stats = CreateDBStatistics(); + ReadaheadParams readahead_params; + readahead_params.initial_readahead_size = 8192; + readahead_params.max_readahead_size = 8192; + readahead_params.num_buffers = 1; + + FilePrefetchBuffer fpb(readahead_params, true /* enable */, + false /* track_min_offset */, fs(), nullptr, + stats.get()); + + Slice result; + Status s; + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 0 /* offset */, + 3000 /* n */, &result, &s, true)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(0, 3000).c_str(), 3000), 0); + + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 3000 /* offset */, + 10000 /* n */, &result, &s, true)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(3000, 10000).c_str(), 10000), + 0); + + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 15000 /* offset */, + 4096 /* n */, &result, &s, true)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(15000, 4096).c_str(), 4096), + 0); + + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 40000 /* offset */, + 20000 /* n */, &result, &s, true)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(40000, 20000).c_str(), 20000), + 0); +} + +class FSBufferPrefetchTest + : public testing::Test, + public ::testing::WithParamInterface> { public: // Mock file system supporting the kFSBuffer buffer reuse operation class BufferReuseFS : public FileSystemWrapper { @@ -3355,7 +3404,7 @@ class FSBufferPrefetchTest : public testing::Test, void SetUp() override { SetupSyncPointsToMockDirectIO(); env_ = Env::Default(); - bool use_async_prefetch = GetParam(); + bool use_async_prefetch = std::get<0>(GetParam()); if (use_async_prefetch) { fs_ = FileSystem::Default(); } else { @@ -3399,8 +3448,13 @@ class FSBufferPrefetchTest : public testing::Test, std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } }; +// param 1: whether async IO is enabled (num_buffers_ > 1) +// param 2: whether for_compaction is set to true for TryReadFromCache requests +// 3 out of these 4 combinations are tested (async IO is not allowed for +// compaction reads) INSTANTIATE_TEST_CASE_P(FSBufferPrefetchTest, FSBufferPrefetchTest, - ::testing::Bool()); + ::testing::Combine(::testing::Bool(), + ::testing::Bool())); TEST_P(FSBufferPrefetchTest, FSBufferPrefetchStatsInternals) { // Check that the main buffer, the overlap_buf_, and the secondary buffer (in @@ -3419,11 +3473,18 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchStatsInternals) { ReadaheadParams readahead_params; readahead_params.initial_readahead_size = 8192; readahead_params.max_readahead_size = 8192; - bool use_async_prefetch = GetParam(); + bool use_async_prefetch = std::get<0>(GetParam()); + bool for_compaction = std::get<1>(GetParam()); + // We disallow async IO for compaction reads since they are background + // operations anyways and not as latency sensitive as user-initiated reads + if (use_async_prefetch && for_compaction) { + return; + } size_t num_buffers = use_async_prefetch ? 2 : 1; readahead_params.num_buffers = num_buffers; - FilePrefetchBuffer fpb(readahead_params, true, false, fs(), clock(), + FilePrefetchBuffer fpb(readahead_params, true /* enable */, + false /* track_min_offset */, fs(), clock(), stats.get()); int overlap_buffer_write_ct = 0; @@ -3438,15 +3499,18 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchStatsInternals) { std::vector> buffer_info(num_buffers); std::pair overlap_buffer_info; bool could_read_from_cache = - fpb.TryReadFromCache(IOOptions(), r.get(), 0, 4096, &result, &s); + fpb.TryReadFromCache(IOOptions(), r.get(), 0 /* offset */, 4096 /* n */, + &result, &s, for_compaction); // Platforms that don't have IO uring may not support async IO. if (use_async_prefetch && s.IsNotSupported()) { return; } ASSERT_TRUE(could_read_from_cache); ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0); ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), 0); + ASSERT_EQ(strncmp(result.data(), content.substr(0, 4096).c_str(), 4096), 0); fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); fpb.TEST_GetBufferOffsetandSize(buffer_info); @@ -3472,12 +3536,13 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchStatsInternals) { // Simulate a block cache hit fpb.UpdateReadPattern(4096, 4096, false); - ASSERT_TRUE( - fpb.TryReadFromCache(IOOptions(), r.get(), 8192, 8192, &result, &s)); + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 8192 /* offset */, + 8192 /* n */, &result, &s, for_compaction)); ASSERT_EQ(s, Status::OK()); ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0); ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), 4096); // 8192-12288 + ASSERT_EQ(strncmp(result.data(), content.substr(8192, 8192).c_str(), 8192), 0); fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); @@ -3508,12 +3573,14 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchStatsInternals) { ASSERT_EQ(std::get<1>(buffer_info[0]), 12288); } - ASSERT_TRUE( - fpb.TryReadFromCache(IOOptions(), r.get(), 12288, 4096, &result, &s)); + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 12288 /* offset */, + 4096 /* n */, &result, &s, for_compaction)); ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 1); ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), 4096); // 12288-16384 + ASSERT_EQ(strncmp(result.data(), content.substr(12288, 4096).c_str(), 4096), 0); fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); @@ -3540,13 +3607,15 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchStatsInternals) { } // Read from 16000-26000 (start and end do not meet normal alignment) - ASSERT_TRUE( - fpb.TryReadFromCache(IOOptions(), r.get(), 16000, 10000, &result, &s)); + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 16000 /* offset */, + 10000 /* n */, &result, &s, for_compaction)); ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0); ASSERT_EQ( stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), /* 24576(end offset of the buffer) - 16000(requested offset) =*/8576); + ASSERT_EQ(strncmp(result.data(), content.substr(16000, 10000).c_str(), 10000), 0); fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); @@ -3595,10 +3664,17 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchUnalignedReads) { // Readahead size will double each time readahead_params.initial_readahead_size = 5; readahead_params.max_readahead_size = 100; - bool use_async_prefetch = GetParam(); + bool use_async_prefetch = std::get<0>(GetParam()); + bool for_compaction = std::get<1>(GetParam()); + // We disallow async IO for compaction reads since they are background + // operations anyways and their latencies are not visible to the end user + if (use_async_prefetch && for_compaction) { + return; + } size_t num_buffers = use_async_prefetch ? 2 : 1; readahead_params.num_buffers = num_buffers; - FilePrefetchBuffer fpb(readahead_params, true, false, fs(), clock(), + FilePrefetchBuffer fpb(readahead_params, true /* enable */, + false /* track_min_offset */, fs(), clock(), stats.get()); int overlap_buffer_write_ct = 0; @@ -3613,7 +3689,8 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchUnalignedReads) { std::vector> buffer_info(num_buffers); std::pair overlap_buffer_info; bool could_read_from_cache = - fpb.TryReadFromCache(IOOptions(), r.get(), 5, 3, &result, &s); + fpb.TryReadFromCache(IOOptions(), r.get(), 5 /* offset */, 3 /* n */, + &result, &s, for_compaction); // Platforms that don't have IO uring may not support async IO. if (use_async_prefetch && s.IsNotSupported()) { return; @@ -3645,7 +3722,8 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchUnalignedReads) { ASSERT_EQ(std::get<1>(buffer_info[0]), 3 + 5); } - ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 16, 7, &result, &s)); + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 16 /* offset */, + 7 /* n */, &result, &s, for_compaction)); ASSERT_EQ(s, Status::OK()); ASSERT_EQ(strncmp(result.data(), content.substr(16, 7).c_str(), 7), 0); fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); @@ -3669,16 +3747,18 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchUnalignedReads) { // Go backwards if (use_async_prefetch) { - ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 10, 8, &result, &s)); + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 10 /* offset */, + 8 /* n */, &result, &s, for_compaction)); } else { // TryReadFromCacheUntracked returns false since the offset // requested is less than the start of our buffer - ASSERT_FALSE( - fpb.TryReadFromCache(IOOptions(), r.get(), 10, 8, &result, &s)); + ASSERT_FALSE(fpb.TryReadFromCache(IOOptions(), r.get(), 10 /* offset */, + 8 /* n */, &result, &s, for_compaction)); } ASSERT_EQ(s, Status::OK()); - ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 27, 6, &result, &s)); + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 27 /* offset */, + 6 /* n */, &result, &s, for_compaction)); ASSERT_EQ(s, Status::OK()); ASSERT_EQ(strncmp(result.data(), content.substr(27, 6).c_str(), 6), 0); fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); @@ -3699,7 +3779,8 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchUnalignedReads) { ASSERT_EQ(std::get<1>(buffer_info[0]), 7 + 10); } - ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 30, 20, &result, &s)); + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 30 /* offset */, + 20 /* n */, &result, &s, for_compaction)); ASSERT_EQ(s, Status::OK()); ASSERT_EQ(strncmp(result.data(), content.substr(30, 20).c_str(), 20), 0); fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); @@ -3722,13 +3803,13 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchUnalignedReads) { } } -TEST_P(FSBufferPrefetchTest, FSBufferPrefetchForCompaction) { - // Quick test to make sure file system buffer reuse is disabled for compaction - // reads. Will update once it is re-enabled - // Primarily making sure we do not hit unsigned integer overflow issues - std::string fname = "fs-buffer-prefetch-for-compaction"; +TEST_P(FSBufferPrefetchTest, FSBufferPrefetchRandomized) { + // This test is meant to find untested code paths. It does very simple + // verifications and relies on debug assertions to catch invariant violations + // We scan through a file reading between 0 and 16 KiB at a time + std::string fname = "fs-buffer-prefetch-randomized"; Random rand(0); - std::string content = rand.RandomString(32768); + std::string content = rand.RandomString(16 * 1024 * 1024); Write(fname, content); FileOptions opts; @@ -3737,45 +3818,57 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchForCompaction) { std::shared_ptr stats = CreateDBStatistics(); ReadaheadParams readahead_params; - readahead_params.initial_readahead_size = 8192; - readahead_params.max_readahead_size = 8192; - bool use_async_prefetch = GetParam(); + readahead_params.initial_readahead_size = 512; + readahead_params.max_readahead_size = 2048; + bool use_async_prefetch = std::get<0>(GetParam()); + bool for_compaction = std::get<1>(GetParam()); // Async IO is not enabled for compaction prefetching - if (use_async_prefetch) { + if (use_async_prefetch && for_compaction) { return; } - readahead_params.num_buffers = 1; + size_t num_buffers = use_async_prefetch ? 2 : 1; + readahead_params.num_buffers = num_buffers; - FilePrefetchBuffer fpb(readahead_params, true, false, fs(), clock(), + FilePrefetchBuffer fpb(readahead_params, true /* enable */, + false /* track_min_offset */, fs(), clock(), stats.get()); Slice result; Status s; - ASSERT_TRUE( - fpb.TryReadFromCache(IOOptions(), r.get(), 0, 4096, &result, &s, true)); - ASSERT_EQ(s, Status::OK()); - ASSERT_EQ(strncmp(result.data(), content.substr(0, 4096).c_str(), 4096), 0); - - fpb.UpdateReadPattern(4096, 4096, false); - - ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 8192, 8192, &result, - &s, true)); - ASSERT_EQ(s, Status::OK()); - ASSERT_EQ(strncmp(result.data(), content.substr(8192, 8192).c_str(), 8192), - 0); - ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 12288, 4096, &result, - &s, true)); - ASSERT_EQ(s, Status::OK()); - ASSERT_EQ(strncmp(result.data(), content.substr(12288, 4096).c_str(), 4096), - 0); - - // Read from 16000-26000 (start and end do not meet normal alignment) - ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 16000, 10000, &result, - &s, true)); - ASSERT_EQ(s, Status::OK()); - ASSERT_EQ(strncmp(result.data(), content.substr(16000, 10000).c_str(), 10000), - 0); + uint64_t offset = 0; + Random rnd(987654); + for (int i = 0; i < 1000; i++) { + size_t len = rnd.Uniform(16 * 1024); + if (offset >= content.size()) { + std::cout << "Stopped early after " << i << " iterations" << std::endl; + break; + } + bool could_read_from_cache = fpb.TryReadFromCache( + IOOptions(), r.get(), offset, len, &result, &s, for_compaction); + // Platforms that don't have IO uring may not support async IO. + if (use_async_prefetch && s.IsNotSupported()) { + return; + } + ASSERT_TRUE(could_read_from_cache); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), + content.substr(offset, offset + len).c_str(), len), + 0); + if (i % 4 == 0) { + // Test reads where we "skip forward" in the file more than we could read + // ahead + offset += len + 2 * readahead_params.max_readahead_size; + } else if (i % 4 == 1) { + // Test reads where we "skip forward" in the file but should have some + // overlap with the read ahead data + offset += len + readahead_params.max_readahead_size / 2; + } else { + // Test "back to back" reads (next read starts right at end of previous + // one) + offset += len; + } + } } } // namespace ROCKSDB_NAMESPACE