diff --git a/velox/common/base/Counters.cpp b/velox/common/base/Counters.cpp index 1ff64da4232c..5d14528f9c34 100644 --- a/velox/common/base/Counters.cpp +++ b/velox/common/base/Counters.cpp @@ -265,6 +265,13 @@ void registerVeloxMetrics() { DEFINE_METRIC( kMetricSsdCacheRecoveredEntries, facebook::velox::StatType::SUM); + // Total number of local file space allocation failures. + // + // NOTE: space allocation is attempted by fallocate wherever it is supported. + DEFINE_METRIC( + kMetricLocalFileSpaceAllocationFailuresCount, + facebook::velox::StatType::COUNT); + /// ================== Memory Arbitration Counters ================= // The number of arbitration requests. diff --git a/velox/common/base/Counters.h b/velox/common/base/Counters.h index 4ee6da39d9c1..b72060c6c99b 100644 --- a/velox/common/base/Counters.h +++ b/velox/common/base/Counters.h @@ -328,6 +328,9 @@ constexpr folly::StringPiece kMetricSsdCacheRegionsEvicted{ constexpr folly::StringPiece kMetricSsdCacheRecoveredEntries{ "velox.ssd_cache_recovered_entries"}; +constexpr folly::StringPiece kMetricLocalFileSpaceAllocationFailuresCount{ + "velox.local_file_space_allocation_failures_count"}; + constexpr folly::StringPiece kMetricExchangeDataTimeMs{ "velox.exchange_data_time_ms"}; diff --git a/velox/common/caching/SsdCache.h b/velox/common/caching/SsdCache.h index 386e79ed8eec..73f405306dee 100644 --- a/velox/common/caching/SsdCache.h +++ b/velox/common/caching/SsdCache.h @@ -24,6 +24,10 @@ namespace facebook::velox::cache { #define VELOX_SSD_CACHE_LOG(severity) \ LOG(severity) << VELOX_SSD_CACHE_LOG_PREFIX +namespace test { +class SsdCacheTestHelper; +} + class SsdCache { public: struct Config { @@ -191,6 +195,8 @@ class SsdCache { // Count of shards with unfinished writes. std::atomic_int32_t writesInProgress_{0}; bool shutdown_{false}; + + friend class test::SsdCacheTestHelper; }; } // namespace facebook::velox::cache diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index bbf8e72c805f..846048077407 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -150,11 +150,15 @@ SsdFile::SsdFile(const Config& config) writeFile_ = fs_->openFileForWrite(fileName_, fileOptions); readFile_ = fs_->openFileForRead(fileName_); - const uint64_t size = writeFile_->size(); - numRegions_ = std::min(size / kRegionSize, maxRegions_); - fileSize_ = numRegions_ * kRegionSize; - if ((size % kRegionSize > 0) || (size > numRegions_ * kRegionSize)) { - writeFile_->truncate(fileSize_); + // NOTE: checkpoint recovery will set 'numRegions_' and 'dataSize_' + // accordingly. + numRegions_ = 0; + dataSize_ = 0; + + const auto maxFileSize = kRegionSize * maxRegions_; + if (writeFile_->size() != maxFileSize) { + // Initialize and pre-allocate (if possible) the data file with fixed space. + writeFile_->truncate(static_cast(maxFileSize)); } // The existing regions in the file are writable. writableRegions_.resize(numRegions_); @@ -315,10 +319,8 @@ std::optional> SsdFile::getSpace( bool SsdFile::growOrEvictLocked() { process::TraceContext trace("SsdFile::growOrEvictLocked"); if (numRegions_ < maxRegions_) { - const auto newSize = (numRegions_ + 1) * kRegionSize; try { - writeFile_->truncate(newSize); - fileSize_ = newSize; + dataSize_ = (numRegions_ + 1) * kRegionSize; writableRegions_.push_back(numRegions_); regionSizes_[numRegions_] = 0; erasedRegionSizes_[numRegions_] = 0; @@ -429,7 +431,7 @@ void SsdFile::write(std::vector& pins) { writeOffset += writeLength; writeLength = 0; } - VELOX_CHECK_GE(fileSize_, writeOffset); + VELOX_CHECK_GE(dataSize_, writeOffset); { std::lock_guard l(mutex_); @@ -993,6 +995,7 @@ void SsdFile::readCheckpoint(std::ifstream& state) { maxRegions_, "Trying to start from checkpoint with a different capacity"); numRegions_ = readNumber(state); + dataSize_ = numRegions_ * kRegionSize; std::vector scores(maxRegions); state.read(asChar(scores.data()), maxRegions_ * sizeof(double)); std::unordered_map idMap; diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index e166fda1542b..acffd6b660cc 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -29,6 +29,11 @@ DECLARE_bool(ssd_verify_write); namespace facebook::velox::cache { +namespace test { +class SsdFileTestHelper; +class SsdCacheTestHelper; +} + /// A 64 bit word describing a SSD cache entry in an SsdFile. The low 23 bits /// are the size, for a maximum entry size of 8MB. The high bits are the offset. class SsdRun { @@ -569,8 +574,8 @@ class SsdFile { // File system. std::shared_ptr fs_; - // Size of the backing file in bytes. Must be multiple of kRegionSize. - uint64_t fileSize_{0}; + // The size of actual cached data in bytes. Must be multiple of kRegionSize. + uint64_t dataSize_{0}; // ReadFile for cache data file. std::unique_ptr readFile_; @@ -596,6 +601,9 @@ class SsdFile { // True if there was an error with checkpoint and the checkpoint was deleted. bool checkpointDeleted_{false}; + + friend class test::SsdFileTestHelper; + friend class test::SsdCacheTestHelper; }; } // namespace facebook::velox::cache diff --git a/velox/common/caching/tests/AsyncDataCacheTest.cpp b/velox/common/caching/tests/AsyncDataCacheTest.cpp index d0c163d64a85..30cacb848a43 100644 --- a/velox/common/caching/tests/AsyncDataCacheTest.cpp +++ b/velox/common/caching/tests/AsyncDataCacheTest.cpp @@ -20,6 +20,7 @@ #include "velox/common/caching/CacheTTLController.h" #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" +#include "velox/common/caching/tests/CacheTestUtil.cpp" #include "velox/common/file/FileSystems.h" #include "velox/common/memory/Memory.h" #include "velox/common/memory/MmapAllocator.h" @@ -141,6 +142,17 @@ class AsyncDataCacheTest : public ::testing::TestWithParam { GetParam().checksumEnabled, GetParam().checksumVerificationEnabled); ssdCache = std::make_unique(config); + if (ssdCache != nullptr) { + ASSERT_EQ(test::SsdCacheTestHelper::numShards(ssdCache), kNumSsdShards); + const auto sizeQuantum = kNumSsdShards * SsdFile::kRegionSize; + const auto maxNumRegions = static_cast( + bits::roundUp(config.maxBytes, sizeQuantum) / sizeQuantum); + for (int32_t i = 0; i < kNumSsdShards; ++i) { + ASSERT_EQ( + test::SsdCacheTestHelper::writeFileSize(ssdCache, i), + maxNumRegions * SsdFile::kRegionSize); + } + } } memory::MemoryManagerOptions options; diff --git a/velox/common/caching/tests/CMakeLists.txt b/velox/common/caching/tests/CMakeLists.txt index 0bec2e101fd5..8ece44fc19ca 100644 --- a/velox/common/caching/tests/CMakeLists.txt +++ b/velox/common/caching/tests/CMakeLists.txt @@ -27,6 +27,7 @@ add_executable( velox_cache_test AsyncDataCacheTest.cpp CacheTTLControllerTest.cpp + CacheTestUtil.cpp SsdFileTest.cpp SsdFileTrackerTest.cpp StringIdMapTest.cpp) diff --git a/velox/common/caching/tests/CacheTestUtil.cpp b/velox/common/caching/tests/CacheTestUtil.cpp new file mode 100644 index 000000000000..4400834cd127 --- /dev/null +++ b/velox/common/caching/tests/CacheTestUtil.cpp @@ -0,0 +1,52 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "velox/common/caching/SsdCache.h" +#include "velox/common/caching/SsdFile.h" + +namespace facebook::velox::cache::test { + +class SsdFileTestHelper { + public: + explicit SsdFileTestHelper(std::shared_ptr ssdFile) + : ssdFile_(std::move(ssdFile)) {} + + uint64_t writeFileSize() { + return ssdFile_->writeFile_->size(); + } + + private: + std::shared_ptr const ssdFile_; +}; + +class SsdCacheTestHelper { + public: + explicit SsdCacheTestHelper() = default; + + static int32_t numShards(std::unique_ptr& ssdCache) { + return ssdCache->numShards_; + } + + static uint64_t writeFileSize( + std::unique_ptr& ssdCache, + uint64_t fileId) { + return ssdCache->file(fileId).writeFile_->size(); + } +}; +} // namespace facebook::velox::cache::test diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index 771420243f6c..c18f54b26183 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -17,6 +17,7 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" +#include "velox/common/caching/tests/CacheTestUtil.cpp" #include "velox/common/file/FileSystems.h" #include "velox/common/file/tests/FaultyFileSystem.h" #include "velox/common/memory/Memory.h" @@ -91,15 +92,22 @@ class SsdFileTest : public testing::Test { bool checksumEnabled = false, bool checksumReadVerificationEnabled = false, bool disableFileCow = false) { + const auto maxNumRegions = static_cast( + bits::roundUp(ssdBytes, SsdFile::kRegionSize) / SsdFile::kRegionSize); SsdFile::Config config( fmt::format("{}/ssdtest", tempDirectory_->getPath()), 0, // shardId - bits::roundUp(ssdBytes, SsdFile::kRegionSize) / SsdFile::kRegionSize, + maxNumRegions, checkpointIntervalBytes, disableFileCow, checksumEnabled, checksumReadVerificationEnabled); - ssdFile_ = std::make_unique(config); + ssdFile_ = std::make_shared(config); + if (ssdFile_ != nullptr) { + test::SsdFileTestHelper ssdFileHelper(ssdFile_); + ASSERT_EQ( + ssdFileHelper.writeFileSize(), maxNumRegions * ssdFile_->kRegionSize); + } } // Corrupts the file by invalidate the last 1/10th of its content. @@ -304,7 +312,7 @@ class SsdFileTest : public testing::Test { std::shared_ptr cache_; StringIdLease fileName_; - std::unique_ptr ssdFile_; + std::shared_ptr ssdFile_; }; TEST_F(SsdFileTest, writeAndRead) { @@ -653,7 +661,7 @@ TEST_F(SsdFileTest, recoverFromCheckpointWithChecksum) { ASSERT_EQ(statsAfterRecover.entriesCached, stats.entriesCached); } else { ASSERT_EQ(statsAfterRecover.bytesCached, 0); - ASSERT_EQ(statsAfterRecover.regionsCached, stats.regionsCached); + ASSERT_EQ(statsAfterRecover.regionsCached, 0); ASSERT_EQ(statsAfterRecover.entriesCached, 0); } diff --git a/velox/common/file/File.cpp b/velox/common/file/File.cpp index 6a30f0a26159..b187d004cb2e 100644 --- a/velox/common/file/File.cpp +++ b/velox/common/file/File.cpp @@ -15,7 +15,9 @@ */ #include "velox/common/file/File.h" +#include "velox/common/base/Counters.h" #include "velox/common/base/Fs.h" +#include "velox/common/base/StatsReporter.h" #include #include @@ -377,6 +379,25 @@ void LocalWriteFile::write( void LocalWriteFile::truncate(int64_t newSize) { checkNotClosed(closed_); VELOX_CHECK_GE(newSize, 0, "New size cannot be negative."); +#ifdef linux + if (newSize > size_) { + // Use fallocate to extend the file. + const auto ret = ::fallocate(fd_, 0, 0, newSize); + try { + VELOX_CHECK_EQ( + ret, + 0, + "fallocate failed in LocalWriteFile::truncate: {}.", + folly::errnoStr(errno)); + size_ = newSize; + return; + } catch (const std::exception& e) { + RECORD_METRIC_VALUE(kMetricLocalFileSpaceAllocationFailuresCount); + } + } +#endif // linux + + // Fallback to ftruncate. const auto ret = ::ftruncate(fd_, newSize); VELOX_CHECK_EQ( ret,