Skip to content

Commit

Permalink
safekeeper: send AppendResponse on segment flush
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Nov 9, 2024
1 parent 2fcac0e commit 10223d1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
3 changes: 3 additions & 0 deletions safekeeper/src/receive_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,9 @@ impl WalAcceptor {
// Don't flush the WAL on every append, only periodically via flush_ticker.
// This batches multiple appends per fsync. If the channel is empty after
// sending the reply, we'll schedule an immediate flush.
//
// Note that a flush can still happen on segment bounds, which will result
// in an AppendResponse.
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
dirty = true;
Expand Down
5 changes: 4 additions & 1 deletion safekeeper/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,7 @@ where
// while first connection still gets some packets later. It might be
// better to not log this as error! above.
let write_lsn = self.wal_store.write_lsn();
let flush_lsn = self.wal_store.flush_lsn();
if write_lsn > msg.h.begin_lsn {
bail!(
"append request rewrites WAL written before, write_lsn={}, msg lsn={}",
Expand Down Expand Up @@ -1012,7 +1013,9 @@ where
);

// If flush_lsn hasn't updated, AppendResponse is not very useful.
if !require_flush {
// This is the common case for !require_flush, but a flush can still
// happen on segment bounds.
if flush_lsn == self.flush_lsn() {
return Ok(None);
}

Expand Down
14 changes: 14 additions & 0 deletions safekeeper/src/wal_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ pub struct PhysicalStorage {
/// non-aligned chunks of data.
write_record_lsn: Lsn,

/// The last LSN flushed to disk. May be in the middle of a record.
flush_lsn: Lsn,

/// The LSN of the last WAL record flushed to disk.
flush_record_lsn: Lsn,

Expand Down Expand Up @@ -205,6 +208,7 @@ impl PhysicalStorage {
system_id: state.server.system_id,
write_lsn,
write_record_lsn: write_lsn,
flush_lsn,
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
file: None,
Expand Down Expand Up @@ -308,6 +312,7 @@ impl PhysicalStorage {
if xlogoff + buf.len() == self.wal_seg_size {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&file).await?;
self.flush_lsn = Lsn((segno + 1) * self.wal_seg_size as u64);

// Rename partial file to completed file
let (wal_file_path, wal_file_partial_path) =
Expand All @@ -327,6 +332,7 @@ impl PhysicalStorage {
///
/// Updates `write_lsn`.
async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
// TODO: is this ever legal? If so, need to clamp flush_lsn.
if self.write_lsn != pos {
// need to flush the file before discarding it
if let Some(file) = self.file.take() {
Expand Down Expand Up @@ -364,6 +370,9 @@ impl Storage for PhysicalStorage {
self.write_lsn
}
/// flush_lsn returns LSN of last durably stored WAL record.
///
/// TODO: consider renaming this to flush_record_lsn().
#[allow(clippy::misnamed_getters)]
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn
}
Expand Down Expand Up @@ -427,6 +436,10 @@ impl Storage for PhysicalStorage {
None => break, // no full record yet
Some((lsn, _rec)) => {
self.write_record_lsn = lsn;
if lsn <= self.flush_lsn {
debug_assert!(lsn > self.flush_lsn, "flush LSN regressed");
self.flush_record_lsn = lsn;
}
}
}
}
Expand Down Expand Up @@ -504,6 +517,7 @@ impl Storage for PhysicalStorage {
file.set_len(xlogoff as u64).await?;
file.set_len(self.wal_seg_size as u64).await?;
self.fsync_file(&file).await?;
self.flush_lsn = end_pos;

if !is_partial {
// Make segment partial once again
Expand Down

0 comments on commit 10223d1

Please sign in to comment.