Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: improve documentation #300

Merged
merged 3 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,32 @@ where
}

/// open an optimistic ACID transaction
///
/// ## Examples
/// ```ignore
/// #[derive(Record, Debug)]
/// pub struct User {
/// #[record(primary_key)]
/// name: String,
/// age: u8,
/// }
/// ```
/// ```ignore
/// let mut txn = db.transaction().await;
/// txn.insert(User {
/// name: "Alice".into(),
/// email: None,
/// age: 20,
/// });
/// txn.scan((Bound::Included("Alice"), Bound::Excluded("Bob")))
/// .projection(&["age"])
/// .limit(10)
/// .take()
/// .await
/// .unwrap();
///
/// txn.commit().await.unwrap();
/// ```
pub async fn transaction(&self) -> Transaction<'_, R> {
Transaction::new(self.snapshot().await, self.lock_map.clone())
}
Expand Down Expand Up @@ -337,6 +363,7 @@ where
.await?)
}

/// trigger compaction manually. This will flush the WAL and trigger compaction
pub async fn flush(&self) -> Result<(), CommitError<R>> {
let (tx, rx) = oneshot::channel();
let compaction_tx = { self.schema.read().await.compaction_tx.clone() };
Expand Down Expand Up @@ -443,11 +470,19 @@ where
Ok(())
}

/// flush WAL to the stable storage. If WAL is disabled, this method will do nothing.
///
/// There is no guarantee that the data will be flushed to WAL because of the buffer. So it is
/// necessary to call this method before exiting if data loss is not acceptable. See also
/// [`DbOption::disable_wal`] and [`DbOption::wal_buffer_size`].
pub async fn flush_wal(&self) -> Result<(), DbError<R>> {
self.schema.write().await.flush_wal().await?;
Ok(())
}

/// destroy [`DB`].
///
/// **Note:** This will remove all wal and manifest file in the directory.
pub async fn destroy(self) -> Result<(), DbError<R>> {
self.schema.write().await.destroy(&self.ctx.manager).await?;
if let Some(ctx) = Arc::into_inner(self.ctx) {
Expand Down
8 changes: 7 additions & 1 deletion src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ impl DbOption {
}

/// Maximum size of WAL buffer, default value is 4KB
///
/// Set to 0 to disable WAL buffer
pub fn wal_buffer_size(self, wal_buffer_size: usize) -> Self {
DbOption {
wal_buffer_size,
Expand All @@ -165,7 +167,7 @@ impl DbOption {
..self
}
}

/// set the path where files will be stored in the level.
pub fn level_path(
mut self,
level: usize,
Expand All @@ -179,6 +181,10 @@ impl DbOption {
Ok(self)
}

/// set the base path option.
///
/// This will be the default option for all wal, manifest and SSTables. Use
/// [`DbOption::level_path`] to set the option for SStables.
pub fn base_fs(mut self, base_fs: FsOptions) -> Self {
self.base_fs = base_fs;
self
Expand Down
19 changes: 19 additions & 0 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@ pub trait Schema: Debug + Send + Sync {

type Key: Key;

/// Returns the [`arrow::datatypes::Schema`] of the record.
///
/// **Note**: The first column should be `_null`, and the second column should be `_ts`.
fn arrow_schema(&self) -> &Arc<ArrowSchema>;

/// Returns the index of the primary key column.
fn primary_key_index(&self) -> usize;

/// Returns the ([`ColumnPath`], [`Vec<SortingColumn>`]) of the primary key column, representing
/// the location of the primary key column in the parquet schema and the sort order within a
/// RowGroup of a leaf column
fn primary_key_path(&self) -> (ColumnPath, Vec<SortingColumn>);
}

Expand All @@ -37,22 +44,34 @@ pub trait Record: 'static + Sized + Decode + Debug + Send + Sync {
where
Self: 'r;

/// Returns the primary key of the record. This should be the type defined in the
/// [`Schema`].
fn key(&self) -> <<<Self as Record>::Schema as Schema>::Key as Key>::Ref<'_> {
self.as_record_ref().key()
}

/// Returns a reference to the record.
fn as_record_ref(&self) -> Self::Ref<'_>;

/// Returns the size of the record in bytes.
fn size(&self) -> usize;
}

pub trait RecordRef<'r>: Clone + Sized + Encode + Send + Sync {
type Record: Record;

/// Returns the primary key of the record. This should be the type that defined in the
/// [`Schema`].
fn key(self) -> <<<Self::Record as Record>::Schema as Schema>::Key as Key>::Ref<'r>;

/// Do projection on the record. Only keep the columns specified in the projection mask.
///
/// **Note**: Primary key column are always kept.
fn projection(&mut self, projection_mask: &ProjectionMask);

/// Get the [`RecordRef`] from the [`RecordBatch`] at the given offset.
///
/// `full_schema` is the combination of `_null`, `_ts` and all fields defined in the [`Schema`].
fn from_record_batch(
record_batch: &'r RecordBatch,
offset: usize,
Expand Down
31 changes: 30 additions & 1 deletion src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ where
}
/// optimistic ACID transaction, open with
/// [`DB::transaction`](crate::DB::transaction) method
///
/// Transaction will store all mutations in local [`BTreeMap`] and only write to memtable when
/// committed successfully. Otherwise, all mutations will be rolled back.
pub struct Transaction<'txn, R>
where
R: Record,
Expand Down Expand Up @@ -110,7 +113,28 @@ where
})
}

/// scan records with primary keys in the `range`
/// scan records with primary keys in the `range`, return a [`Scan`] that can be convert to a
/// [`futures_core::Stream`] by using [`Scan::take`].
///
/// [`Scan::projection`] and [`Scan::limit`] can be used to push down projection and limit.
///
/// # Example
///
/// ```ignore
/// let mut txn = db.transaction().await;
/// txn.scan((Bound::Included("Alice"), Bound::Excluded("Bob")))
/// // only read primary key and `age`
/// .projection(&["age"])
/// // read at most 10 records
/// .limit(10)
/// .take()
/// .await
/// .unwrap();
///
/// while let Some(entry) = scan_stream.next().await.transpose().unwrap() {
/// println!("{:#?}", entry.value())
/// }
/// ```
pub fn scan<'scan, 'range>(
&'scan self,
range: (
Expand Down Expand Up @@ -153,6 +177,10 @@ where

/// commit the data in the [`Transaction`] to the corresponding
/// [`DB`](crate::DB)
///
/// # Error
/// This function will return an error if the mutation in the transaction conflict with
/// other committed transaction
pub async fn commit(mut self) -> Result<(), CommitError<R>> {
let mut _key_guards = Vec::new();

Expand Down Expand Up @@ -235,6 +263,7 @@ impl<'entry, R> TransactionEntry<'entry, R>
where
R: Record,
{
/// get the [`RecordRef`] inside the entry.
pub fn get(&self) -> R::Ref<'_> {
match self {
TransactionEntry::Stream(entry) => entry.value().unwrap(),
Expand Down
Loading