diff --git a/src/include/storage/disk/disk_manager.h b/src/include/storage/disk/disk_manager.h index 04e0ee6d6..7711b7b82 100644 --- a/src/include/storage/disk/disk_manager.h +++ b/src/include/storage/disk/disk_manager.h @@ -13,10 +13,14 @@ #pragma once #include +#include +#include #include #include // NOLINT -#include // NOLINT +#include +#include // NOLINT #include +#include #include "common/config.h" @@ -37,7 +41,12 @@ class DiskManager { /** FOR TEST / LEADERBOARD ONLY, used by DiskManagerMemory */ DiskManager() = default; - virtual ~DiskManager() = default; + virtual ~DiskManager() { + // If ShutDown() is not manually called, shut it down when DiskManager goes out of scope + if (!has_shut_down_.load()) { + ShutDown(); + } + } /** * Shut down the disk manager and close all the file resources. @@ -65,6 +74,13 @@ class DiskManager { */ void WriteLog(char *log_data, int size); + /** + * The non-blocking, asynchronous version of WriteLog() + * @param log_data + * @param size + */ + void WriteLogAsync(char *log_data, int size); + /** * Read a log entry from the log file. * @param[out] log_data output buffer @@ -74,38 +90,81 @@ class DiskManager { */ auto ReadLog(char *log_data, int size, int offset) -> bool; - /** @return the number of disk flushes */ - auto GetNumFlushes() const -> int; - - /** @return true iff the in-memory content has not been flushed yet */ - auto GetFlushState() const -> bool; + /** + * @brief Returns number of flushes made so far + * @return The number of disk flushes + */ + auto GetNumFlushes() const -> int { return num_flushes_.load(); }; - /** @return the number of disk writes */ - auto GetNumWrites() const -> int; + /** + * @brief Returns true if the log is currently being flushed + * @return True iff the in-memory content has currently been flushed + */ + auto GetFlushState() const -> bool { return flush_log_.load(); }; /** - * Sets the future which is used to check for non-blocking flushes. - * @param f the non-blocking flush check + * @brief Returns number of Writes made so far + * @return The number of disk writes */ - inline void SetFlushLogFuture(std::future *f) { flush_log_f_ = f; } + auto GetNumWrites() const -> int { return num_writes_; }; - /** Checks if the non-blocking flush future was set. */ - inline auto HasFlushLogFuture() -> bool { return flush_log_f_ != nullptr; } + /** + * Sets the future which is used to check for non-blocking writes. + * @param f the non-blocking write check + */ + inline void PushWriteLogFuture(std::unique_ptr> f) { + std::scoped_lock write_log_f_deque_latch(write_log_f_deque_latch_); + write_log_f_deque_.push_back(std::move(f)); + } + + inline void PopWriteLogFuture() { + std::scoped_lock write_log_f_deque_latch(write_log_f_deque_latch_); + write_log_f_deque_.pop_front(); + } + + /** Checks if the non-blocking write future exists. */ + inline auto HasWriteLogFuture() -> bool { + std::scoped_lock lock(write_log_f_deque_latch_); + return !write_log_f_deque_.empty(); + } + + inline void WaitForAsyncToComplete() { + std::scoped_lock lock(write_log_f_deque_latch_); + while (!write_log_f_deque_.empty()) { + if (write_log_f_deque_.front()->valid()) { + // Be consistent with previous solution + assert(write_log_f_deque_.front()->wait_for(std::chrono::seconds(10)) == std::future_status::ready); + } + write_log_f_deque_.pop_front(); + } + } protected: auto GetFileSize(const std::string &file_name) -> int; - // stream to write log file + + /** Stream to write log file */ std::fstream log_io_; std::string log_name_; - // stream to write db file + + /** Stream to write db file */ std::fstream db_io_; std::string file_name_; - int num_flushes_{0}; + + std::atomic num_flushes_{0}; int num_writes_{0}; - bool flush_log_{false}; - std::future *flush_log_f_{nullptr}; - // With multiple buffer pool instances, need to protect file access + std::atomic flush_log_{false}; + /** Used for Destructor */ + std::atomic has_shut_down_{false}; + + /** For asynchronous functions */ + std::deque>> write_log_f_deque_; + std::mutex write_log_f_deque_latch_; + + /** With multiple buffer pool instances, need to protect file access */ std::mutex db_io_latch_; + + /** Same as above, the log access also needs to be protected, since std::fstream is NOT thread-safe */ + std::mutex log_io_latch_; }; } // namespace bustub diff --git a/src/storage/disk/disk_manager.cpp b/src/storage/disk/disk_manager.cpp index 8faaa1ad4..6a6b33789 100644 --- a/src/storage/disk/disk_manager.cpp +++ b/src/storage/disk/disk_manager.cpp @@ -11,11 +11,8 @@ //===----------------------------------------------------------------------===// #include -#include #include #include -#include // NOLINT -#include #include // NOLINT #include "common/exception.h" @@ -65,13 +62,27 @@ DiskManager::DiskManager(const std::string &db_file) : file_name_(db_file) { /** * Close all file streams + * Note: ShutDown should now be aware of none-terminated future flush */ void DiskManager::ShutDown() { + has_shut_down_ = true; + // Close the fstream, to prevent memory leak { std::scoped_lock scoped_db_io_latch(db_io_latch_); + std::scoped_lock scoped_log_io_latch(log_io_latch_); db_io_.close(); + log_io_.close(); } - log_io_.close(); + // Not for production code, only to check the state under DEBUG mode + assert(!db_io_.is_open() && !log_io_.is_open()); + + // Wait for the ever existing flushing operation to finish + if (HasWriteLogFuture()) { + // Same problem as in DiskManager::WriteLog(), may crash the program + WaitForAsyncToComplete(); + } + // Sanity Check + assert(!HasWriteLogFuture()); } /** @@ -125,9 +136,11 @@ void DiskManager::ReadPage(page_id_t page_id, char *page_data) { /** * Write the contents of the log into disk file * Only return when sync is done, and only perform sequence write + * Note: This version of WriteLog() will block until all asynchronous tasks are finished */ void DiskManager::WriteLog(char *log_data, int size) { - // enforce swap log buffer + std::scoped_lock scoped_log_io_latch(log_io_latch_); + // Enforce swap log buffer assert(log_data != buffer_used); buffer_used = log_data; @@ -135,38 +148,102 @@ void DiskManager::WriteLog(char *log_data, int size) { return; } - flush_log_ = true; - - if (flush_log_f_ != nullptr) { - // used for checking non-blocking flushing - assert(flush_log_f_->wait_for(std::chrono::seconds(10)) == std::future_status::ready); + if (HasWriteLogFuture()) { + // Used for checking non-blocking flushing, the default (and expected) time for a logging thread is 10s + // TODO(Zihao): Now the program will crash if the flush is still persisting after 10s, + // Should it be handled more softly (e.g, Print a warning and terminate just that thread by resetting the ptr?) + WaitForAsyncToComplete(); } - num_flushes_ += 1; - // sequence write + // Write the log log_io_.write(log_data, size); - // check for I/O error + // Check for I/O error if (log_io_.bad()) { LOG_DEBUG("I/O error while writing log"); return; } - // needs to flush to keep disk file in sync + + // Flush the log log_io_.flush(); + + // Update flush status + flush_log_ = true; + + // Increase number of flushing by one + num_flushes_ += 1; +} + +/** + * @brief A non-blocking, asynchronous version of WriteLog + * + * @param log_data + * @param size + */ +void DiskManager::WriteLogAsync(char *log_data, int size) { + auto curr_write_f = std::make_unique>(std::async(std::launch::async, [&] { + // First acquire the lock, the underlying threads will sequentially acquire the log_io_latch_ + std::scoped_lock scoped_log_io_latch(log_io_latch_); + + // Ensure swap log buffer + if (log_data != buffer_used) { + // Directly return + // TODO(Zihao): Warning the user? + return; + } + // Otherwise set the buffer_used to the incoming new log_data + buffer_used = log_data; + + // No effect on num_flushes_ if log buffer is empty + if (size == 0) { + return; + } + + // Write the log + log_io_.write(log_data, size); + + // Check for I/O error + if (log_io_.bad()) { + // Directly return, may be changed in the future + return; + } + + // Flush the log to disk + log_io_.flush(); + // The current flushing is done + flush_log_ = true; + // Increase the num of flush by one + num_flushes_ += 1; + })); + + // Set flush status to false, since the execution of this task is unknown flush_log_ = false; + + // Push the new future handle in to the deque + PushWriteLogFuture(std::move(curr_write_f)); } /** * Read the contents of the log into the given memory area * Always read from the beginning and perform sequence read * @return: false means already reach the end + * Note: This function will read the log AFTER all write operations (including asynchronous) are finished */ auto DiskManager::ReadLog(char *log_data, int size, int offset) -> bool { + std::scoped_lock scoped_log_io_latch(log_io_latch_); + + // Wait for potential async write operations + if (HasWriteLogFuture()) { + WaitForAsyncToComplete(); + } + + // Perform sanity check here if (offset >= GetFileSize(log_name_)) { // LOG_DEBUG("end of log file"); // LOG_DEBUG("file size is %d", GetFileSize(log_name_)); return false; } + log_io_.seekp(offset); log_io_.read(log_data, size); @@ -174,6 +251,7 @@ auto DiskManager::ReadLog(char *log_data, int size, int offset) -> bool { LOG_DEBUG("I/O error while reading log"); return false; } + // if log file ends before reading "size" int read_count = log_io_.gcount(); if (read_count < size) { @@ -184,21 +262,6 @@ auto DiskManager::ReadLog(char *log_data, int size, int offset) -> bool { return true; } -/** - * Returns number of flushes made so far - */ -auto DiskManager::GetNumFlushes() const -> int { return num_flushes_; } - -/** - * Returns number of Writes made so far - */ -auto DiskManager::GetNumWrites() const -> int { return num_writes_; } - -/** - * Returns true if the log is currently being flushed - */ -auto DiskManager::GetFlushState() const -> bool { return flush_log_; } - /** * Private helper function to get disk file size */