Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for asynchronous flushing in DiskManager::WriteLog() && improve overall structure of DiskManager #580

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
99 changes: 79 additions & 20 deletions src/include/storage/disk/disk_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
#pragma once

#include <atomic>
#include <cassert>
#include <deque>
#include <fstream>
#include <future> // NOLINT
#include <mutex> // NOLINT
#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <utility>

#include "common/config.h"

Expand All @@ -37,7 +41,12 @@ class DiskManager {
/** FOR TEST / LEADERBOARD ONLY, used by DiskManagerMemory */
DiskManager() = default;

virtual ~DiskManager() = default;
virtual ~DiskManager() {
// If ShutDown() is not manually called, shut it down when DiskManager goes out of scope
if (!has_shut_down_.load()) {
ShutDown();
}
}

/**
* Shut down the disk manager and close all the file resources.
Expand Down Expand Up @@ -65,6 +74,13 @@ class DiskManager {
*/
void WriteLog(char *log_data, int size);

/**
* The non-blocking, asynchronous version of WriteLog()
* @param log_data
* @param size
*/
void WriteLogAsync(char *log_data, int size);

/**
* Read a log entry from the log file.
* @param[out] log_data output buffer
Expand All @@ -74,38 +90,81 @@ class DiskManager {
*/
auto ReadLog(char *log_data, int size, int offset) -> bool;

/** @return the number of disk flushes */
auto GetNumFlushes() const -> int;

/** @return true iff the in-memory content has not been flushed yet */
auto GetFlushState() const -> bool;
/**
* @brief Returns number of flushes made so far
* @return The number of disk flushes
*/
auto GetNumFlushes() const -> int { return num_flushes_.load(); };

/** @return the number of disk writes */
auto GetNumWrites() const -> int;
/**
* @brief Returns true if the log is currently being flushed
* @return True iff the in-memory content has currently been flushed
*/
auto GetFlushState() const -> bool { return flush_log_.load(); };

/**
* Sets the future which is used to check for non-blocking flushes.
* @param f the non-blocking flush check
* @brief Returns number of Writes made so far
* @return The number of disk writes
*/
inline void SetFlushLogFuture(std::future<void> *f) { flush_log_f_ = f; }
auto GetNumWrites() const -> int { return num_writes_; };

/** Checks if the non-blocking flush future was set. */
inline auto HasFlushLogFuture() -> bool { return flush_log_f_ != nullptr; }
/**
* Sets the future which is used to check for non-blocking writes.
* @param f the non-blocking write check
*/
inline void PushWriteLogFuture(std::unique_ptr<std::future<void>> f) {
std::scoped_lock write_log_f_deque_latch(write_log_f_deque_latch_);
write_log_f_deque_.push_back(std::move(f));
}

inline void PopWriteLogFuture() {
std::scoped_lock write_log_f_deque_latch(write_log_f_deque_latch_);
write_log_f_deque_.pop_front();
}

/** Checks if the non-blocking write future exists. */
inline auto HasWriteLogFuture() -> bool {
std::scoped_lock lock(write_log_f_deque_latch_);
return !write_log_f_deque_.empty();
}

inline void WaitForAsyncToComplete() {
std::scoped_lock lock(write_log_f_deque_latch_);
while (!write_log_f_deque_.empty()) {
if (write_log_f_deque_.front()->valid()) {
// Be consistent with previous solution
assert(write_log_f_deque_.front()->wait_for(std::chrono::seconds(10)) == std::future_status::ready);
}
write_log_f_deque_.pop_front();
}
}

protected:
auto GetFileSize(const std::string &file_name) -> int;
// stream to write log file

/** Stream to write log file */
std::fstream log_io_;
std::string log_name_;
// stream to write db file

/** Stream to write db file */
std::fstream db_io_;
std::string file_name_;
int num_flushes_{0};

std::atomic<int> num_flushes_{0};
int num_writes_{0};
bool flush_log_{false};
std::future<void> *flush_log_f_{nullptr};
// With multiple buffer pool instances, need to protect file access
std::atomic<bool> flush_log_{false};
/** Used for Destructor */
std::atomic<bool> has_shut_down_{false};

/** For asynchronous functions */
std::deque<std::unique_ptr<std::future<void>>> write_log_f_deque_;
std::mutex write_log_f_deque_latch_;

/** With multiple buffer pool instances, need to protect file access */
std::mutex db_io_latch_;

/** Same as above, the log access also needs to be protected, since std::fstream is NOT thread-safe */
std::mutex log_io_latch_;
};

} // namespace bustub
121 changes: 92 additions & 29 deletions src/storage/disk/disk_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@
//===----------------------------------------------------------------------===//

#include <sys/stat.h>
#include <cassert>
#include <cstring>
#include <iostream>
#include <mutex> // NOLINT
#include <string>
#include <thread> // NOLINT

#include "common/exception.h"
Expand Down Expand Up @@ -65,13 +62,27 @@ DiskManager::DiskManager(const std::string &db_file) : file_name_(db_file) {

/**
* Close all file streams
* Note: ShutDown should now be aware of none-terminated future flush
*/
void DiskManager::ShutDown() {
has_shut_down_ = true;
// Close the fstream, to prevent memory leak
{
std::scoped_lock scoped_db_io_latch(db_io_latch_);
std::scoped_lock scoped_log_io_latch(log_io_latch_);
db_io_.close();
log_io_.close();
}
log_io_.close();
// Not for production code, only to check the state under DEBUG mode
assert(!db_io_.is_open() && !log_io_.is_open());

// Wait for the ever existing flushing operation to finish
if (HasWriteLogFuture()) {
// Same problem as in DiskManager::WriteLog(), may crash the program
WaitForAsyncToComplete();
}
// Sanity Check
assert(!HasWriteLogFuture());
}

/**
Expand Down Expand Up @@ -125,55 +136,122 @@ void DiskManager::ReadPage(page_id_t page_id, char *page_data) {
/**
* Write the contents of the log into disk file
* Only return when sync is done, and only perform sequence write
* Note: This version of WriteLog() will block until all asynchronous tasks are finished
*/
void DiskManager::WriteLog(char *log_data, int size) {
// enforce swap log buffer
std::scoped_lock scoped_log_io_latch(log_io_latch_);
// Enforce swap log buffer
assert(log_data != buffer_used);
buffer_used = log_data;

if (size == 0) { // no effect on num_flushes_ if log buffer is empty
return;
}

flush_log_ = true;

if (flush_log_f_ != nullptr) {
// used for checking non-blocking flushing
assert(flush_log_f_->wait_for(std::chrono::seconds(10)) == std::future_status::ready);
if (HasWriteLogFuture()) {
// Used for checking non-blocking flushing, the default (and expected) time for a logging thread is 10s
// TODO(Zihao): Now the program will crash if the flush is still persisting after 10s,
// Should it be handled more softly (e.g, Print a warning and terminate just that thread by resetting the ptr?)
WaitForAsyncToComplete();
}

num_flushes_ += 1;
// sequence write
// Write the log
log_io_.write(log_data, size);

// check for I/O error
// Check for I/O error
if (log_io_.bad()) {
LOG_DEBUG("I/O error while writing log");
return;
}
// needs to flush to keep disk file in sync

// Flush the log
log_io_.flush();

// Update flush status
flush_log_ = true;

// Increase number of flushing by one
num_flushes_ += 1;
}

/**
* @brief A non-blocking, asynchronous version of WriteLog
*
* @param log_data
* @param size
*/
void DiskManager::WriteLogAsync(char *log_data, int size) {
auto curr_write_f = std::make_unique<std::future<void>>(std::async(std::launch::async, [&] {
// First acquire the lock, the underlying threads will sequentially acquire the log_io_latch_
std::scoped_lock scoped_log_io_latch(log_io_latch_);

// Ensure swap log buffer
if (log_data != buffer_used) {
// Directly return
// TODO(Zihao): Warning the user?
return;
}
// Otherwise set the buffer_used to the incoming new log_data
buffer_used = log_data;

// No effect on num_flushes_ if log buffer is empty
if (size == 0) {
return;
}

// Write the log
log_io_.write(log_data, size);

// Check for I/O error
if (log_io_.bad()) {
// Directly return, may be changed in the future
return;
}

// Flush the log to disk
log_io_.flush();
// The current flushing is done
flush_log_ = true;
// Increase the num of flush by one
num_flushes_ += 1;
}));

// Set flush status to false, since the execution of this task is unknown
flush_log_ = false;

// Push the new future handle in to the deque
PushWriteLogFuture(std::move(curr_write_f));
}

/**
* Read the contents of the log into the given memory area
* Always read from the beginning and perform sequence read
* @return: false means already reach the end
* Note: This function will read the log AFTER all write operations (including asynchronous) are finished
*/
auto DiskManager::ReadLog(char *log_data, int size, int offset) -> bool {
std::scoped_lock scoped_log_io_latch(log_io_latch_);

// Wait for potential async write operations
if (HasWriteLogFuture()) {
WaitForAsyncToComplete();
}

// Perform sanity check here
if (offset >= GetFileSize(log_name_)) {
// LOG_DEBUG("end of log file");
// LOG_DEBUG("file size is %d", GetFileSize(log_name_));
return false;
}

log_io_.seekp(offset);
log_io_.read(log_data, size);

if (log_io_.bad()) {
LOG_DEBUG("I/O error while reading log");
return false;
}

// if log file ends before reading "size"
int read_count = log_io_.gcount();
if (read_count < size) {
Expand All @@ -184,21 +262,6 @@ auto DiskManager::ReadLog(char *log_data, int size, int offset) -> bool {
return true;
}

/**
* Returns number of flushes made so far
*/
auto DiskManager::GetNumFlushes() const -> int { return num_flushes_; }

/**
* Returns number of Writes made so far
*/
auto DiskManager::GetNumWrites() const -> int { return num_writes_; }

/**
* Returns true if the log is currently being flushed
*/
auto DiskManager::GetFlushState() const -> bool { return flush_log_; }

/**
* Private helper function to get disk file size
*/
Expand Down