diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index e977f610ebe3..e7a388962d41 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -81,14 +81,6 @@ void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector& iovecs) { }; } } - -// Returns the number of entries in a cache 'entry'. -int32_t numIoVectorsFromEntry(AsyncDataCacheEntry& entry) { - if (entry.tinyData() != nullptr) { - return 1; - } - return entry.data().numRuns(); -} } // namespace SsdPin::SsdPin(SsdFile& file, SsdRun run) : file_(&file), run_(run) { @@ -384,56 +376,46 @@ void SsdFile::write(std::vector& pins) { VELOX_CHECK_NULL(entry->ssdFile()); } - int32_t writeIndex = 0; - while (writeIndex < pins.size()) { - const auto space = getSpace(pins, writeIndex); + int32_t storeIndex = 0; + while (storeIndex < pins.size()) { + auto space = getSpace(pins, storeIndex); if (!space.has_value()) { // No space can be reclaimed. The pins are freed when the caller is freed. return; } auto [offset, available] = space.value(); - int32_t numWrittenEntries = 0; - uint64_t writeOffset = offset; - int32_t writeLength = 0; - std::vector writeIovecs; - for (auto i = writeIndex; i < pins.size(); ++i) { + int32_t numWritten = 0; + int32_t bytes = 0; + std::vector iovecs; + for (auto i = storeIndex; i < pins.size(); ++i) { auto* entry = pins[i].checkedEntry(); const auto entrySize = entry->size(); - const auto numIovecs = numIoVectorsFromEntry(*entry); - VELOX_CHECK_LE(numIovecs, IOV_MAX); - if (writeIovecs.size() + numIovecs > IOV_MAX) { - // Writes out the accumulated iovecs if it exceeds IOV_MAX limit. - if (!write(writeOffset, writeLength, writeIovecs)) { - // If write fails, we return without adding the pins to the cache. The - // entries are unchanged. - return; - } - writeIovecs.clear(); - writeOffset += writeLength; - writeLength = 0; - } - if (writeLength + entrySize > available) { + if (bytes + entrySize > available) { break; } - addEntryToIovecs(*entry, writeIovecs); - writeLength += entrySize; - ++numWrittenEntries; - } - if (writeLength > 0) { - VELOX_CHECK(!writeIovecs.empty()); - if (!write(writeOffset, writeLength, writeIovecs)) { - return; - } - writeIovecs.clear(); - writeOffset += writeLength; - writeLength = 0; + addEntryToIovecs(*entry, iovecs); + bytes += entrySize; + ++numWritten; + } + VELOX_CHECK_GE(fileSize_, offset + bytes); + + const auto rc = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset); + if (rc != bytes) { + VELOX_SSD_CACHE_LOG(ERROR) + << "Failed to write to SSD, file name: " << fileName_ + << ", fd: " << fd_ << ", size: " << iovecs.size() + << ", offset: " << offset << ", error code: " << errno + << ", error string: " << folly::errnoStr(errno); + ++stats_.writeSsdErrors; + // If write fails, we return without adding the pins to the cache. The + // entries are unchanged. + return; } - VELOX_CHECK_GE(fileSize_, writeOffset); { std::lock_guard l(mutex_); - for (auto i = writeIndex; i < writeIndex + numWrittenEntries; ++i) { + for (auto i = storeIndex; i < storeIndex + numWritten; ++i) { auto* entry = pins[i].checkedEntry(); entry->setSsdFile(this, offset); const auto size = entry->size(); @@ -449,7 +431,7 @@ void SsdFile::write(std::vector& pins) { bytesAfterCheckpoint_ += size; } } - writeIndex += numWrittenEntries; + storeIndex += numWritten; } if ((checkpointIntervalBytes_ > 0) && @@ -458,23 +440,6 @@ void SsdFile::write(std::vector& pins) { } } -bool SsdFile::write( - uint64_t offset, - uint64_t length, - const std::vector& iovecs) { - const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset); - if (ret == length) { - return true; - } - VELOX_SSD_CACHE_LOG(ERROR) - << "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_ - << ", size: " << iovecs.size() << ", offset: " << offset - << ", error code: " << errno - << ", error string: " << folly::errnoStr(errno); - ++stats_.writeSsdErrors; - return false; -} - namespace { int32_t indexOfFirstMismatch(char* x, char* y, int n) { for (auto i = 0; i < n; ++i) { diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index 9b027cf9cb60..857abad13957 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -339,11 +339,6 @@ class SsdFile { // the files for making new checkpoints. void initializeCheckpoint(); - // Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write - // succeeds; otherwise, log the error and return false. - bool - write(uint64_t offset, uint64_t length, const std::vector& iovecs); - // Synchronously logs that 'regions' are no longer valid in a possibly xisting // checkpoint. void logEviction(const std::vector& regions); diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index cded09951472..b91ea782c8c7 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -118,12 +118,12 @@ class SsdFileTest : public testing::Test { } } - // Gets consecutive entries from file 'fileId' starting at 'startOffset' with + // Gets consecutive entries from file 'fileId' starting at 'startOffset' with // sizes between 'minSize' and 'maxSize'. Sizes start at 'minSize' and double // each time and go back to 'minSize' after exceeding 'maxSize'. This stops // after the total size has exceeded 'totalSize'. The entries are returned as // pins. The pins are exclusive for newly created entries and shared for - // existing ones. New entries are deterministically initialized from 'fileId' + // existing ones. New entries are deterministically initialized from 'fileId' // and the entry's offset. std::vector makePins( uint64_t fileId, @@ -280,7 +280,7 @@ TEST_F(SsdFileTest, writeAndRead) { } } - // We check how many entries are found. The earliest writes will have been + // We check howmany entries are found. The earliest writes will have been // evicted. We read back the found entries and check their contents. int32_t numFound = 0; for (auto& entry : allEntries) { @@ -302,16 +302,6 @@ TEST_F(SsdFileTest, writeAndRead) { } } } - - // Test cache writes with different iobufs sizes. - for (int numPins : {0, 1, IOV_MAX - 1, IOV_MAX, IOV_MAX + 1}) { - SCOPED_TRACE(fmt::format("numPins: {}", numPins)); - auto pins = makePins(fileName_.id(), 0, 4096, 4096, 4096 * numPins); - EXPECT_EQ(pins.size(), numPins); - ssdFile_->write(pins); - readAndCheckPins(pins); - pins.clear(); - } } #ifdef VELOX_SSD_FILE_TEST_SET_NO_COW_FLAG