Skip to content

Commit

Permalink
commitlog: Introduce epoch (#1851)
Browse files Browse the repository at this point in the history
  • Loading branch information
kim authored Nov 5, 2024
1 parent ac0053c commit f22b163
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 18 deletions.
1 change: 1 addition & 0 deletions crates/commitlog/proptest-regressions/tests/bitflip.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc a224c9559a4f825676852b58397b59027a14561c8bd9439b52691234fab848de # shrinks to inputs = Inputs { byte_pos: 354, bit_mask: 205, segment_offset: 30 }
cc a62542123f6c7a5c747cdf8d64246d93b1ba55e53f207dd0827d3bc65442cb35 # shrinks to inputs = Inputs { byte_pos: 25, bit_mask: 1, segment_offset: 0 }
100 changes: 95 additions & 5 deletions crates/commitlog/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,27 @@ use std::{
use crc32c::{Crc32cReader, Crc32cWriter};
use spacetimedb_sats::buffer::{BufReader, Cursor, DecodeError};

use crate::{error::ChecksumMismatch, payload::Decoder, segment::CHECKSUM_ALGORITHM_CRC32C, Transaction};
use crate::{
error::ChecksumMismatch, payload::Decoder, segment::CHECKSUM_ALGORITHM_CRC32C, Transaction,
DEFAULT_LOG_FORMAT_VERSION,
};

#[derive(Default)]
enum Version {
V0,
#[default]
V1,
}

pub struct Header {
pub min_tx_offset: u64,
pub epoch: u64,
pub n: u16,
pub len: u32,
}

impl Header {
pub const LEN: usize = /* offset */ 8 + /* n */ 2 + /* len */ 4;
pub const LEN: usize = /* offset */ 8 + /* epoch */ 8 + /* n */ 2 + /* len */ 4;

/// Read [`Self::LEN`] bytes from `reader` and interpret them as the
/// "header" of a [`Commit`].
Expand All @@ -30,7 +41,45 @@ impl Header {
///
/// This is to allow preallocation of segments.
///
pub fn decode<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
Self::decode_v1(reader)
}

fn decode_internal<R: Read>(reader: R, v: Version) -> io::Result<Option<Self>> {
use Version::*;
match v {
V0 => Self::decode_v0(reader),
V1 => Self::decode_v1(reader),
}
}

fn decode_v0<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
let mut hdr = [0; Self::LEN - 8];
if let Err(e) = reader.read_exact(&mut hdr) {
if e.kind() == io::ErrorKind::UnexpectedEof {
return Ok(None);
}

return Err(e);
}
match &mut hdr.as_slice() {
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
buf => {
let min_tx_offset = buf.get_u64().map_err(decode_error)?;
let n = buf.get_u16().map_err(decode_error)?;
let len = buf.get_u32().map_err(decode_error)?;

Ok(Some(Self {
min_tx_offset,
epoch: Commit::DEFAULT_EPOCH,
n,
len,
}))
}
}
}

fn decode_v1<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
let mut hdr = [0; Self::LEN];
if let Err(e) = reader.read_exact(&mut hdr) {
if e.kind() == io::ErrorKind::UnexpectedEof {
Expand All @@ -43,10 +92,16 @@ impl Header {
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
buf => {
let min_tx_offset = buf.get_u64().map_err(decode_error)?;
let epoch = buf.get_u64().map_err(decode_error)?;
let n = buf.get_u16().map_err(decode_error)?;
let len = buf.get_u32().map_err(decode_error)?;

Ok(Some(Self { min_tx_offset, n, len }))
Ok(Some(Self {
min_tx_offset,
epoch,
n,
len,
}))
}
}
}
Expand All @@ -60,6 +115,18 @@ pub struct Commit {
/// The offset starts from zero and is counted from the beginning of the
/// entire log.
pub min_tx_offset: u64,
/// The epoch within which the commit was created.
///
/// Indicates the monotonically increasing term number of the leader when
/// the commitlog is being written to in a distributed deployment.
///
/// The default epoch is 0 (zero). It should be used when the log is written
/// to by a single process.
///
/// Note, however, that an existing log may have a non-zero epoch.
/// It is currently unspecified how a commitlog is transitioned between
/// distributed and single-node deployment, wrt the epoch.
pub epoch: u64,
/// The number of records in the commit.
pub n: u16,
/// A buffer of all records in the commit in serialized form.
Expand All @@ -70,6 +137,8 @@ pub struct Commit {
}

impl Commit {
pub const DEFAULT_EPOCH: u64 = 0;

pub const FRAMING_LEN: usize = Header::LEN + /* crc32 */ 4;
pub const CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;

Expand All @@ -90,10 +159,12 @@ impl Commit {
let mut out = Crc32cWriter::new(out);

let min_tx_offset = self.min_tx_offset.to_le_bytes();
let epoch = self.epoch.to_le_bytes();
let n = self.n.to_le_bytes();
let len = (self.records.len() as u32).to_le_bytes();

out.write_all(&min_tx_offset)?;
out.write_all(&epoch)?;
out.write_all(&n)?;
out.write_all(&len)?;
out.write_all(&self.records)?;
Expand Down Expand Up @@ -173,13 +244,15 @@ impl From<StoredCommit> for Commit {
fn from(
StoredCommit {
min_tx_offset,
epoch,
n,
records,
checksum: _,
}: StoredCommit,
) -> Self {
Self {
min_tx_offset,
epoch,
n,
records,
}
Expand All @@ -194,6 +267,8 @@ impl From<StoredCommit> for Commit {
pub struct StoredCommit {
/// See [`Commit::min_tx_offset`].
pub min_tx_offset: u64,
/// See [`Commit::epoch`].
pub epoch: u64,
/// See [`Commit::n`].
pub n: u16,
/// See [`Commit::records`].
Expand All @@ -216,9 +291,18 @@ impl StoredCommit {
/// kind [`io::ErrorKind::InvalidData`] with an inner error downcastable to
/// [`ChecksumMismatch`] is returned.
pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
Self::decode_internal(reader, DEFAULT_LOG_FORMAT_VERSION)
}

pub(crate) fn decode_internal<R: Read>(reader: R, log_format_version: u8) -> io::Result<Option<Self>> {
let mut reader = Crc32cReader::new(reader);

let Some(hdr) = Header::decode(&mut reader)? else {
let v = if log_format_version == 0 {
Version::V0
} else {
Version::V1
};
let Some(hdr) = Header::decode_internal(&mut reader, v)? else {
return Ok(None);
};
let mut records = vec![0; hdr.len as usize];
Expand All @@ -233,6 +317,7 @@ impl StoredCommit {

Ok(Some(Self {
min_tx_offset: hdr.min_tx_offset,
epoch: hdr.epoch,
n: hdr.n,
records,
checksum: crc,
Expand All @@ -258,6 +343,7 @@ impl StoredCommit {
pub struct Metadata {
pub tx_range: Range<u64>,
pub size_in_bytes: u64,
pub epoch: u64,
}

impl Metadata {
Expand All @@ -275,6 +361,7 @@ impl From<Commit> for Metadata {
Self {
tx_range: commit.tx_range(),
size_in_bytes: commit.encoded_len() as u64,
epoch: commit.epoch,
}
}
}
Expand Down Expand Up @@ -312,6 +399,7 @@ mod tests {
min_tx_offset: 0,
n: 3,
records,
epoch: Commit::DEFAULT_EPOCH,
};

let mut buf = Vec::with_capacity(commit.encoded_len());
Expand All @@ -329,6 +417,7 @@ mod tests {
min_tx_offset: 0,
n: 4,
records: vec![0; 128],
epoch: Commit::DEFAULT_EPOCH,
};

let txs = commit
Expand Down Expand Up @@ -358,6 +447,7 @@ mod tests {
min_tx_offset: 42,
n: 10,
records: vec![1; 512],
epoch: Commit::DEFAULT_EPOCH,
};

let mut buf = Vec::with_capacity(commit.encoded_len());
Expand Down
44 changes: 41 additions & 3 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ impl<R: Repo, T> Generic<R, T> {
debug!("resuming last segment: {last}");
repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
tail.push(meta.tx_range.start);
repo::create_segment_writer(&repo, opts, meta.tx_range.end)
repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
})?
} else {
debug!("starting fresh log");
repo::create_segment_writer(&repo, opts, 0)?
repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
};

Ok(Self {
Expand All @@ -72,6 +72,43 @@ impl<R: Repo, T> Generic<R, T> {
})
}

/// Get the current epoch.
///
/// See also: [`Commit::epoch`].
pub fn epoch(&self) -> u64 {
self.head.commit.epoch
}

/// Update the current epoch.
///
/// Calls [`Self::commit`] to flush all data of the previous epoch, and
/// returns the result.
///
/// Does nothing if the given `epoch` is equal to the current epoch.
///
/// # Errors
///
/// If `epoch` is smaller than the current epoch, an error of kind
/// [`io::ErrorKind::InvalidInput`] is returned.
///
/// Also see [`Self::commit`].
pub fn set_epoch(&mut self, epoch: u64) -> io::Result<Option<Committed>> {
use std::cmp::Ordering::*;

match self.head.epoch().cmp(&epoch) {
Less => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"new epoch is smaller than current epoch",
)),
Equal => Ok(None),
Greater => {
let res = self.commit()?;
self.head.set_epoch(epoch);
Ok(res)
}
}
}

/// Write the currently buffered data to storage and rotate segments as
/// necessary.
///
Expand Down Expand Up @@ -254,7 +291,7 @@ impl<R: Repo, T> Generic<R, T> {
self.head.next_tx_offset(),
self.head.min_tx_offset()
);
let new = repo::create_segment_writer(&self.repo, self.opts, self.head.next_tx_offset())?;
let new = repo::create_segment_writer(&self.repo, self.opts, self.head.epoch(), self.head.next_tx_offset())?;
let old = mem::replace(&mut self.head, new);
self.tail.push(old.min_tx_offset());
self.head.commit = old.commit;
Expand Down Expand Up @@ -821,6 +858,7 @@ mod tests {
min_tx_offset: 0,
n: 1,
records: [43; 32].to_vec(),
epoch: 0,
};
log.commit().unwrap();

Expand Down
29 changes: 29 additions & 0 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,35 @@ impl<T> Commitlog<T> {
self.inner.read().unwrap().max_committed_offset()
}

/// Get the current epoch.
///
/// See also: [`Commit::epoch`].
pub fn epoch(&self) -> u64 {
self.inner.read().unwrap().epoch()
}

/// Update the current epoch.
///
/// Does nothing if the given `epoch` is equal to the current epoch.
/// Otherwise flushes outstanding transactions to disk (equivalent to
/// [`Self::flush`]) before updating the epoch.
///
/// Returns the maximum transaction offset written to disk. The offset is
/// `None` if the log is empty and no data was pending to be flushed.
///
/// # Errors
///
/// If `epoch` is smaller than the current epoch, an error of kind
/// [`io::ErrorKind::InvalidInput`] is returned.
///
/// Errors from the implicit flush are propagated.
pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
let mut inner = self.inner.write().unwrap();
inner.set_epoch(epoch)?;

Ok(inner.max_committed_offset())
}

/// Sync all OS-buffered writes to disk.
///
/// Note that this does **not** write outstanding records to disk.
Expand Down
4 changes: 2 additions & 2 deletions crates/commitlog/src/payload/txdata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use thiserror::Error;
use crate::{
error,
varint::{decode_varint, encode_varint},
Encode, Varchar,
Encode, Varchar, DEFAULT_LOG_FORMAT_VERSION,
};

// Re-export so we get a hyperlink in rustdocs by default
Expand Down Expand Up @@ -116,7 +116,7 @@ impl<T> Txdata<T> {
}

impl<T: Encode> Txdata<T> {
pub const VERSION: u8 = 0;
pub const VERSION: u8 = DEFAULT_LOG_FORMAT_VERSION;

pub fn encode(&self, buf: &mut impl BufWriter) {
let mut flags = Flags::empty();
Expand Down
Loading

2 comments on commit f22b163

@github-actions
Copy link

@github-actions github-actions bot commented on f22b163 Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

@github-actions
Copy link

@github-actions github-actions bot commented on f22b163 Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results

Callgrind Benchmark Report

These benchmarks were run using callgrind,
an instruction-level profiler. They allow comparisons between sqlite (sqlite), SpacetimeDB running through a module (stdb_module), and the underlying SpacetimeDB data storage engine (stdb_raw). Callgrind emulates a CPU to collect the below estimates.

Measurement changes larger than five percent are in bold.

In-memory benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 6364 6364 0.00% 6460 6460 0.00%
sqlite 5579 5579 0.00% 5965 6001 -0.60%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 76558 76559 -0.00% 77054 77059 -0.01%
stdb_raw u32_u64_str no_index 64 128 2 string 119056 120146 -0.91% 119806 120884 -0.89%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 25049 25048 0.00% 25643 25622 0.08%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 24016 24016 0.00% 24428 24456 -0.11%
sqlite u32_u64_str no_index 64 128 2 string 144695 144695 0.00% 146115 146075 0.03%
sqlite u32_u64_str no_index 64 128 1 u64 124044 124044 0.00% 125266 125270 -0.00%
sqlite u32_u64_str btree_each_column 64 128 1 u64 131361 131394 -0.03% 132803 132780 0.02%
sqlite u32_u64_str btree_each_column 64 128 2 string 134494 134494 0.00% 136178 136050 0.09%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 878509 876679 0.21% 903703 896611 0.79%
stdb_raw u32_u64_str btree_each_column 64 128 1033745 1026965 0.66% 1099079 1054209 4.26%
sqlite u32_u64_str unique_0 64 128 398320 398326 -0.00% 414686 415084 -0.10%
sqlite u32_u64_str btree_each_column 64 128 983637 983637 0.00% 1016419 1020751 -0.42%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 153691 153691 0.00% 153801 153773 0.02%
stdb_raw u32_u64_str unique_0 64 16716 16716 0.00% 16806 16778 0.17%
sqlite u32_u64_str unique_0 1024 1067255 1067255 0.00% 1070719 1070583 0.01%
sqlite u32_u64_str unique_0 64 76201 76201 0.00% 77315 77259 0.07%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50282 50146 0.27%
64 bsatn 25509 25509 0.00% 27753 27719 0.12%
16 bsatn 8200 8200 0.00% 9560 9526 0.36%
16 json 12188 12188 0.00% 14194 14058 0.97%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 20516923 20505695 0.05% 21184329 20985129 0.95%
stdb_raw u32_u64_str unique_0 64 128 1287079 1285311 0.14% 1333387 1319221 1.07%
sqlite u32_u64_str unique_0 1024 1024 1802182 1802182 0.00% 1811454 1811320 0.01%
sqlite u32_u64_str unique_0 64 128 128528 128528 0.00% 131498 131398 0.08%
On-disk benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 6369 6369 0.00% 6473 6473 0.00%
sqlite 5621 5621 0.00% 6059 6099 -0.66%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 76563 76564 -0.00% 77043 77036 0.01%
stdb_raw u32_u64_str no_index 64 128 2 string 119061 119062 -0.00% 119727 119856 -0.11%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 25053 25069 -0.06% 25571 25595 -0.09%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 24021 24021 0.00% 24401 24433 -0.13%
sqlite u32_u64_str no_index 64 128 1 u64 125965 125965 0.00% 127403 127459 -0.04%
sqlite u32_u64_str no_index 64 128 2 string 146616 146616 0.00% 148284 148312 -0.02%
sqlite u32_u64_str btree_each_column 64 128 2 string 136616 136616 0.00% 138714 138682 0.02%
sqlite u32_u64_str btree_each_column 64 128 1 u64 133457 133457 0.00% 135349 135301 0.04%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 827017 826719 0.04% 851473 876629 -2.87%
stdb_raw u32_u64_str btree_each_column 64 128 977688 974136 0.36% 1041490 1000692 4.08%
sqlite u32_u64_str unique_0 64 128 415857 415857 0.00% 431649 431839 -0.04%
sqlite u32_u64_str btree_each_column 64 128 1021898 1021898 0.00% 1054012 1057814 -0.36%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 153696 153696 0.00% 153782 153754 0.02%
stdb_raw u32_u64_str unique_0 64 16721 16721 0.00% 16807 16779 0.17%
sqlite u32_u64_str unique_0 1024 1070323 1070323 0.00% 1074085 1074081 0.00%
sqlite u32_u64_str unique_0 64 77973 77991 -0.02% 79303 79305 -0.00%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50282 50146 0.27%
64 bsatn 25509 25509 0.00% 27753 27719 0.12%
16 bsatn 8200 8200 0.00% 9560 9526 0.36%
16 json 12188 12188 0.00% 14194 14058 0.97%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 19039824 19025133 0.08% 19740930 19532411 1.07%
stdb_raw u32_u64_str unique_0 64 128 1239971 1239359 0.05% 1315503 1302409 1.01%
sqlite u32_u64_str unique_0 1024 1024 1809743 1809743 0.00% 1818367 1818389 -0.00%
sqlite u32_u64_str unique_0 64 128 132654 132654 0.00% 135668 135640 0.02%

Please sign in to comment.