Skip to content

Commit

Permalink
feat(storage): Support composite primary keys
Browse files Browse the repository at this point in the history
Signed-off-by: Dani Pozo <[email protected]>
  • Loading branch information
danipozo committed Dec 9, 2023
1 parent a85b544 commit 29a0147
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 31 deletions.
11 changes: 4 additions & 7 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,14 @@ impl ColumnCatalog {
}

/// Find the id of the sort key among column catalogs
pub fn find_sort_key_id(column_infos: &[ColumnCatalog]) -> Option<usize> {
let mut key = None;
pub fn find_sort_key_id(column_infos: &[ColumnCatalog]) -> Vec<usize> {
let mut keys = vec![];
for (id, column_info) in column_infos.iter().enumerate() {
if column_info.is_primary() {
if key.is_some() {
panic!("only one primary key is supported");
}
key = Some(id);
keys.push(id);
}
}
key
keys
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions src/storage/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct InMemoryTable {
pub(super) table_ref_id: TableRefId,
pub(super) columns: Arc<[ColumnCatalog]>,
pub(super) inner: InMemoryTableInnerRef,
pub(super) ordered_pk_ids: Vec<ColumnId>,
}

pub(super) struct InMemoryTableInner {
Expand Down Expand Up @@ -58,6 +59,7 @@ impl InMemoryTable {
table_ref_id,
columns: columns.into(),
inner: Arc::new(RwLock::new(InMemoryTableInner::new())),
ordered_pk_ids: Vec::new(),
}
}
}
Expand All @@ -84,4 +86,8 @@ impl Table for InMemoryTable {
async fn update(&self) -> StorageResult<InMemoryTransaction> {
InMemoryTransaction::start(self)
}

fn ordered_pk_ids(&self) -> Vec<ColumnId> {
self.ordered_pk_ids.clone()
}
}
38 changes: 28 additions & 10 deletions src/storage/memory/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use itertools::Itertools;

use super::table::InMemoryTableInnerRef;
use super::{InMemoryRowHandler, InMemoryTable, InMemoryTxnIterator};
use crate::array::{ArrayBuilderImpl, ArrayImplBuilderPickExt, ArrayImplSortExt, DataChunk};
use crate::catalog::{find_sort_key_id, ColumnCatalog};
use crate::storage::{ScanOptions, StorageColumnRef, StorageResult, Transaction};
use crate::array::{ArrayBuilderImpl, ArrayImplBuilderPickExt, DataChunk};
use crate::storage::{ScanOptions, StorageColumnRef, StorageResult, Table, Transaction};

/// A transaction running on `InMemoryStorage`.
pub struct InMemoryTransaction {
Expand All @@ -34,31 +33,42 @@ pub struct InMemoryTransaction {
/// Snapshot of all deleted rows
deleted_rows: Arc<HashSet<usize>>,

/// All information about columns
column_infos: Arc<[ColumnCatalog]>,
/// Ordered primary key indexes in `column_infos`
ordered_pk_idx: Vec<usize>,
}

impl InMemoryTransaction {
pub(super) fn start(table: &InMemoryTable) -> StorageResult<Self> {
let inner = table.inner.read().unwrap();
let ordered_pk_idx = table
.ordered_pk_ids()
.iter()
.map(|id| {
table
.columns
.iter()
.position(|c| c.id() == *id)
.expect("Malformed table object")
})
.collect_vec();
Ok(Self {
finished: false,
buffer: vec![],
delete_buffer: vec![],
table: table.inner.clone(),
snapshot: Arc::new(inner.get_all_chunks()),
deleted_rows: Arc::new(inner.get_all_deleted_rows()),
column_infos: table.columns.clone(),
ordered_pk_idx,
})
}
}

/// If primary key is found in [`ColumnCatalog`], sort all in-memory data using that key.
fn sort_datachunk_by_pk(
chunks: &Arc<Vec<DataChunk>>,
column_infos: &[ColumnCatalog],
ordered_pk_idx: &[usize],
) -> Arc<Vec<DataChunk>> {
if let Some(sort_key_id) = find_sort_key_id(column_infos) {
if !ordered_pk_idx.is_empty() {
if chunks.is_empty() {
return chunks.clone();
}
Expand All @@ -78,7 +88,15 @@ fn sort_datachunk_by_pk(
.into_iter()
.map(|builder| builder.finish())
.collect_vec();
let sorted_index = arrays[sort_key_id].get_sorted_indices();

let pk_arrays = Vec::from(ordered_pk_idx)
.iter()
.map(|idx| &arrays[*idx])
.collect_vec();
let pk_array = itertools::izip!(pk_arrays).collect_vec();
let sorted_index = (0..pk_array.len())
.sorted_by_key(|idx| pk_array[*idx])
.collect_vec();

let chunk = arrays
.into_iter()
Expand Down Expand Up @@ -109,7 +127,7 @@ impl Transaction for InMemoryTransaction {
assert!(!opts.reversed, "reverse iterator is not supported for now");

let snapshot = if opts.is_sorted {
sort_datachunk_by_pk(&self.snapshot, &self.column_infos)
sort_datachunk_by_pk(&self.snapshot, &self.ordered_pk_idx)
} else {
self.snapshot.clone()
};
Expand Down
3 changes: 3 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ pub trait Table: Sync + Send + Clone + 'static {

/// Get table id
fn table_id(&self) -> TableRefId;

/// Get primary key
fn ordered_pk_ids(&self) -> Vec<ColumnId>;
}

/// Reference to a column.
Expand Down
6 changes: 3 additions & 3 deletions src/storage/secondary/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ impl Compactor {
);
}

let sort_key = find_sort_key_id(&table.columns);
let mut iter: SecondaryIterator = if let Some(sort_key) = sort_key {
let sort_keys = find_sort_key_id(&table.columns);
let mut iter: SecondaryIterator = if !sort_keys.is_empty() {
MergeIterator::new(
iters.into_iter().map(|iter| iter.into()).collect_vec(),
vec![sort_key],
sort_keys,
)
.into()
} else {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/secondary/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl SecondaryStorage {
table_name.clone(),
column_descs.to_vec(),
false,
ordered_pk_ids,
ordered_pk_ids.clone(),
)
.map_err(|_| TracedStorageError::duplicated("table", table_name))?;

Expand All @@ -222,6 +222,7 @@ impl SecondaryStorage {
self.version.clone(),
self.block_cache.clone(),
self.txn_mgr.clone(),
ordered_pk_ids,
);
self.tables.write().insert(id, table);

Expand Down
5 changes: 3 additions & 2 deletions src/storage/secondary/rowset/mem_rowset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ impl SecondaryMemRowsetImpl {
column_options: ColumnBuilderOptions,
rowset_id: u32,
) -> Self {
if let Some(sort_key_idx) = find_sort_key_id(&columns) {
let sort_keys = find_sort_key_id(&columns);
if !sort_keys.is_empty() {
Self::BTree(SecondaryMemRowset::<BTreeMapMemTable> {
mem_table: BTreeMapMemTable::new(columns.clone(), sort_key_idx),
mem_table: BTreeMapMemTable::new(columns.clone(), sort_keys[0]),
rowset_builder: RowsetBuilder::new(columns, column_options),
rowset_id,
})
Expand Down
11 changes: 10 additions & 1 deletion src/storage/secondary/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::storage::Table;
/// A table in Secondary engine.
///
/// As `SecondaryStorage` holds the reference to `SecondaryTable`, we cannot store
/// `Arc<SecondaryStorage>` inside `SecondaryTable`. This sturct only contains necessary information
/// `Arc<SecondaryStorage>` inside `SecondaryTable`. This struct only contains necessary information
/// to decode the columns of the table.
#[derive(Clone)]
pub struct SecondaryTable {
Expand All @@ -27,6 +27,9 @@ pub struct SecondaryTable {
/// Mapping from [`ColumnId`] to column index in `columns`.
pub column_map: HashMap<ColumnId, usize>,

/// Ordered list of primary key columns.
pub ordered_pk_ids: Vec<ColumnId>,

/// Root directory of the storage
pub storage_options: Arc<StorageOptions>,

Expand Down Expand Up @@ -54,6 +57,7 @@ impl SecondaryTable {
version: Arc<VersionManager>,
block_cache: Cache<BlockCacheKey, Block>,
txn_mgr: Arc<TransactionManager>,
ordered_pk_ids: Vec<ColumnId>,
) -> Self {
Self {
columns: columns.into(),
Expand All @@ -68,6 +72,7 @@ impl SecondaryTable {
version,
block_cache,
txn_mgr,
ordered_pk_ids,
}
}

Expand Down Expand Up @@ -126,4 +131,8 @@ impl Table for SecondaryTable {
async fn update(&self) -> StorageResult<SecondaryTransaction> {
SecondaryTransaction::start(self, false, true).await
}

fn ordered_pk_ids(&self) -> Vec<ColumnId> {
self.ordered_pk_ids.clone()
}
}
22 changes: 15 additions & 7 deletions src/storage/secondary/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,23 @@ impl SecondaryTransaction {
let final_iter = if iters.len() == 1 {
iters.pop().unwrap().into()
} else if opts.is_sorted {
let sort_key = find_sort_key_id(&self.table.columns);
if let Some(sort_key) = sort_key {
let real_col_idx = col_idx.iter().position(|x| match x {
StorageColumnRef::Idx(y) => *y as usize == sort_key,
_ => false,
});
let sort_keys = find_sort_key_id(&self.table.columns);
if !sort_keys.is_empty() {
let real_col_idx = sort_keys
.iter()
.map(|id| {
col_idx
.iter()
.position(|x| match x {
StorageColumnRef::Idx(y) => *y as usize == *id,
_ => false,
})
.expect("sorting key not in column list")
})
.collect_vec();
MergeIterator::new(
iters.into_iter().map(|iter| iter.into()).collect_vec(),
vec![real_col_idx.expect("sort key not in column list")],
real_col_idx,
)
.into()
} else {
Expand Down

0 comments on commit 29a0147

Please sign in to comment.