Skip to content

Commit

Permalink
update all paths to join in lookaside
Browse files Browse the repository at this point in the history
  • Loading branch information
tantaman committed Sep 28, 2023
1 parent dd0106c commit 5baace6
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 174 deletions.
2 changes: 1 addition & 1 deletion core/rs/core/src/automigrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ fn migrate_to(
AND name NOT LIKE 'sqlite_%'
AND name NOT LIKE 'crsql_%'
AND name NOT LIKE '__crsql_%'
AND name NOT LIKE '%__crsql_clock'";
AND name NOT LIKE '%__crsql_%'";
let fetch_mem_tables = mem_db.prepare_v2(sql)?;
let fetch_local_tables = local_db.prepare_v2(sql)?;

Expand Down
5 changes: 3 additions & 2 deletions core/rs/core/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub fn backfill_table(
let sql = format!(
"SELECT {pk_cols} FROM \"{table}\" AS t1
WHERE NOT EXISTS
(SELECT 1 FROM \"{table}__crsql_clock\" AS t2 WHERE {pk_where_conditions})",
(SELECT 1 FROM \"{table}__crsql_pks\" AS t2 WHERE {pk_where_conditions})",
table = crate::util::escape_ident(table),
pk_cols = pk_cols
.iter()
Expand Down Expand Up @@ -180,7 +180,8 @@ fn fill_column(
let dflt_value = get_dflt_value(db, table, &non_pk_col.name)?;
let sql = format!(
"SELECT {pk_cols} FROM {table} as t1
LEFT JOIN \"{table}__crsql_clock\" as t2 ON {pk_on_conditions} AND t2.col_name = ?
LEFT JOIN \"{table}__crsql_pks\" as t2 ON {pk_on_conditions}
JOIN \"{table}__crsql_clock\" as t3 ON t3.key = t2.__crsql_key AND t3.col_name = ?
WHERE t2.\"{first_pk}\" IS NULL {dflt_value_condition}",
table = crate::util::escape_ident(table),
pk_cols = pk_cols
Expand Down
11 changes: 6 additions & 5 deletions core/rs/core/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn insert_site_id(db: *mut sqlite3) -> Result<[u8; 16], ResultCode> {

fn create_site_id_and_site_id_table(db: *mut sqlite3) -> Result<[u8; 16], ResultCode> {
db.exec_safe(&format!(
"CREATE TABLE \"{tbl}\" (site_id BLOB NOT NULL, ordinal INTEGER PRIMARY KEY AUTOINCREMENT);
"CREATE TABLE \"{tbl}\" (site_id BLOB NOT NULL, ordinal INTEGER PRIMARY KEY);
CREATE UNIQUE INDEX {tbl}_site_id ON \"{tbl}\" (site_id);",
tbl = consts::TBL_SITE_ID
))?;
Expand Down Expand Up @@ -202,15 +202,14 @@ pub fn create_clock_table(

db.exec_safe(&format!(
"CREATE TABLE IF NOT EXISTS \"{table_name}__crsql_clock\" (
{pk_list},
key,
col_name TEXT NOT NULL,
col_version INT NOT NULL,
db_version INT NOT NULL,
site_id INT,
seq INT NOT NULL,
PRIMARY KEY ({pk_list}, col_name)
PRIMARY KEY (key, col_name)
)",
pk_list = pk_list,
table_name = crate::util::escape_ident(table_name),
))?;

Expand All @@ -228,7 +227,9 @@ pub fn create_clock_table(
)?;
db.exec_safe(
&format!(
"CREATE UNIQUE INDEX IF NOT EXISTS \"{table_name}__crsql_pks_pks\" ON \"{table_name}__crsql_pks\" ({pk_list})", table_name = table_name, pk_list = pk_list
"CREATE UNIQUE INDEX IF NOT EXISTS \"{table_name}__crsql_pks_pks\" ON \"{table_name}__crsql_pks\" ({pk_list})",
table_name = table_name,
pk_list = pk_list
)
)
}
20 changes: 9 additions & 11 deletions core/rs/core/src/changes_vtab_read.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
extern crate alloc;
use crate::{tableinfo::TableInfo, util};
use crate::tableinfo::TableInfo;
use alloc::format;
use alloc::string::String;
use alloc::vec;
Expand All @@ -15,12 +15,8 @@ fn crsql_changes_query_for_table(table_info: &TableInfo) -> Result<String, Resul
return Err(ResultCode::ABORT);
}

let pk_list = crate::util::as_identifier_list(&table_info.pks, Some("t1."))?;
let pk_list = crate::util::as_identifier_list(&table_info.pks, Some("pk_tbl."))?;
// TODO: we can remove the self join if we put causal length in the primary key table
let self_join = util::map_columns(&table_info.pks, |c| {
format!("t1.\"{c}\" = t2.\"{c}\"", c = crate::util::escape_ident(c))
})?
.join(" AND ");

// We LEFT JOIN and COALESCE the causal length
// since we incorporated an optimization to not store causal length records
Expand All @@ -34,17 +30,19 @@ fn crsql_changes_query_for_table(table_info: &TableInfo) -> Result<String, Resul
t1.col_name as cid,
t1.col_version as col_vrsn,
t1.db_version as db_vrsn,
t3.site_id as site_id,
site_tbl.site_id as site_id,
t1._rowid_,
t1.seq as seq,
COALESCE(t2.col_version, 1) as cl
FROM \"{table_name_ident}__crsql_clock\" AS t1 LEFT JOIN \"{table_name_ident}__crsql_clock\" AS t2 ON
{self_join} AND t2.col_name = '{sentinel}' LEFT JOIN crsql_site_id as t3 ON t1.site_id = t3.ordinal",
FROM \"{table_name_ident}__crsql_clock\" AS t1
JOIN \"{table_name_ident}__crsql_pks\" AS pk_tbl ON t1.key = pk_tbl.__crsql_key
LEFT JOIN crsql_site_id AS site_tbl ON t1.site_id = site_tbl.ordinal
LEFT JOIN \"{table_name_ident}__crsql_clock\" AS t2 ON
t1.key = t2.key AND t2.col_name = '{sentinel}'",
table_name_val = crate::util::escape_ident_as_value(&table_info.tbl_name),
pk_list = pk_list,
table_name_ident = crate::util::escape_ident(&table_info.tbl_name),
sentinel = crate::c::INSERT_SENTINEL,
self_join = self_join
sentinel = crate::c::INSERT_SENTINEL
))
}

Expand Down
93 changes: 37 additions & 56 deletions core/rs/core/src/changes_vtab_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ fn did_cid_win(
insert_tbl: &str,
tbl_info: &TableInfo,
unpacked_pks: &Vec<ColumnValue>,
key: sqlite::int64,
col_name: &str,
insert_val: *mut sqlite::value,
col_version: sqlite::int64,
Expand All @@ -39,16 +40,12 @@ fn did_cid_win(
let col_vrsn_stmt_ref = tbl_info.get_col_version_stmt(db)?;
let col_vrsn_stmt = col_vrsn_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?;

let bind_result = bind_package_to_stmt(col_vrsn_stmt.stmt, &unpacked_pks, 0);
let bind_result = col_vrsn_stmt.bind_int64(1, key);
if let Err(rc) = bind_result {
reset_cached_stmt(col_vrsn_stmt.stmt)?;
return Err(rc);
}
if let Err(rc) = col_vrsn_stmt.bind_text(
unpacked_pks.len() as i32 + 1,
col_name,
sqlite::Destructor::STATIC,
) {
if let Err(rc) = col_vrsn_stmt.bind_text(2, col_name, sqlite::Destructor::STATIC) {
reset_cached_stmt(col_vrsn_stmt.stmt)?;
return Err(rc);
}
Expand Down Expand Up @@ -117,7 +114,7 @@ fn set_winner_clock(
db: *mut sqlite3,
ext_data: *mut crsql_ExtData,
tbl_info: &TableInfo,
unpacked_pks: &Vec<ColumnValue>,
key: sqlite::int64,
insert_col_name: &str,
insert_col_vrsn: sqlite::int64,
insert_db_vrsn: sqlite::int64,
Expand Down Expand Up @@ -172,23 +169,19 @@ fn set_winner_clock(
let set_stmt_ref = tbl_info.get_set_winner_clock_stmt(db)?;
let set_stmt = set_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?;

let bind_result = bind_package_to_stmt(set_stmt.stmt, unpacked_pks, 0);
let bind_result = set_stmt.bind_int64(1, key);
if let Err(rc) = bind_result {
reset_cached_stmt(set_stmt.stmt)?;
return Err(rc);
}
let bind_result = set_stmt
.bind_text(
unpacked_pks.len() as i32 + 1,
insert_col_name,
sqlite::Destructor::STATIC,
)
.and_then(|_| set_stmt.bind_int64(unpacked_pks.len() as i32 + 2, insert_col_vrsn))
.and_then(|_| set_stmt.bind_int64(unpacked_pks.len() as i32 + 3, insert_db_vrsn))
.and_then(|_| set_stmt.bind_int64(unpacked_pks.len() as i32 + 4, insert_seq))
.bind_text(2, insert_col_name, sqlite::Destructor::STATIC)
.and_then(|_| set_stmt.bind_int64(3, insert_col_vrsn))
.and_then(|_| set_stmt.bind_int64(4, insert_db_vrsn))
.and_then(|_| set_stmt.bind_int64(5, insert_seq))
.and_then(|_| match ordinal {
Some(ordinal) => set_stmt.bind_int64(unpacked_pks.len() as i32 + 5, ordinal),
None => set_stmt.bind_null(unpacked_pks.len() as i32 + 5),
Some(ordinal) => set_stmt.bind_int64(6, ordinal),
None => set_stmt.bind_null(6),
});

if let Err(rc) = bind_result {
Expand All @@ -214,6 +207,7 @@ fn merge_sentinel_only_insert(
ext_data: *mut crsql_ExtData,
tbl_info: &TableInfo,
unpacked_pks: &Vec<ColumnValue>,
key: sqlite::int64,
remote_col_vrsn: sqlite::int64,
remote_db_vsn: sqlite::int64,
remote_site_id: &[u8],
Expand Down Expand Up @@ -253,12 +247,12 @@ fn merge_sentinel_only_insert(
}

if let Ok(_) = rc {
zero_clocks_on_resurrect(db, tbl_info, unpacked_pks, remote_db_vsn)?;
zero_clocks_on_resurrect(db, tbl_info, key, remote_db_vsn)?;
return set_winner_clock(
db,
ext_data,
tbl_info,
unpacked_pks,
key,
crate::c::INSERT_SENTINEL,
remote_col_vrsn,
remote_db_vsn,
Expand All @@ -273,35 +267,25 @@ fn merge_sentinel_only_insert(
fn zero_clocks_on_resurrect(
db: *mut sqlite3,
tbl_info: &TableInfo,
unpacked_pks: &Vec<ColumnValue>,
key: sqlite::int64,
insert_db_vrsn: sqlite::int64,
) -> Result<ResultCode, ResultCode> {
let zero_stmt_ref = tbl_info.get_zero_clocks_on_resurrect_stmt(db)?;
let zero_stmt = zero_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?;

if let Err(rc) = zero_stmt.bind_int64(1, insert_db_vrsn) {
reset_cached_stmt(zero_stmt.stmt)?;
return Err(rc);
}

if let Err(rc) = bind_package_to_stmt(zero_stmt.stmt, unpacked_pks, 1) {
reset_cached_stmt(zero_stmt.stmt)?;
return Err(rc);
}

if let Err(rc) = zero_stmt.step() {
reset_cached_stmt(zero_stmt.stmt)?;
return Err(rc);
}

reset_cached_stmt(zero_stmt.stmt)
let ret = zero_stmt
.bind_int64(1, insert_db_vrsn)
.and_then(|_| zero_stmt.bind_int64(2, key))
.and_then(|_| zero_stmt.step());
reset_cached_stmt(zero_stmt.stmt)?;
return ret;
}

unsafe fn merge_delete(
db: *mut sqlite3,
ext_data: *mut crsql_ExtData,
tbl_info: &TableInfo,
unpacked_pks: &Vec<ColumnValue>,
key: sqlite::int64,
remote_col_vrsn: sqlite::int64,
remote_db_vrsn: sqlite::int64,
remote_site_id: &[u8],
Expand All @@ -310,7 +294,7 @@ unsafe fn merge_delete(
let delete_stmt_ref = tbl_info.get_merge_delete_stmt(db)?;
let delete_stmt = delete_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?;

if let Err(rc) = bind_package_to_stmt(delete_stmt.stmt, unpacked_pks, 0) {
if let Err(rc) = delete_stmt.bind_int64(1, key) {
reset_cached_stmt(delete_stmt.stmt)?;
return Err(rc);
}
Expand Down Expand Up @@ -338,7 +322,7 @@ unsafe fn merge_delete(
db,
ext_data,
tbl_info,
unpacked_pks,
key,
crate::c::DELETE_SENTINEL,
remote_col_vrsn,
remote_db_vrsn,
Expand All @@ -351,17 +335,11 @@ unsafe fn merge_delete(
let drop_clocks_stmt_ref = tbl_info.get_merge_delete_drop_clocks_stmt(db)?;
let drop_clocks_stmt = drop_clocks_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?;

if let Err(rc) = bind_package_to_stmt(drop_clocks_stmt.stmt, unpacked_pks, 0) {
reset_cached_stmt(drop_clocks_stmt.stmt)?;
return Err(rc);
}

if let Err(rc) = drop_clocks_stmt.step() {
reset_cached_stmt(drop_clocks_stmt.stmt)?;
return Err(rc);
}

let rc = drop_clocks_stmt
.bind_int64(1, key)
.and_then(|_| drop_clocks_stmt.step());
reset_cached_stmt(drop_clocks_stmt.stmt)?;
rc?;

return Ok(ret);
}
Expand All @@ -382,17 +360,17 @@ pub unsafe extern "C" fn crsql_merge_insert(
fn get_local_cl(
db: *mut sqlite::sqlite3,
tbl_info: &TableInfo,
unpacked_pks: &Vec<ColumnValue>,
key: sqlite::int64,
) -> Result<sqlite::int64, ResultCode> {
let local_cl_stmt_ref = tbl_info.get_local_cl_stmt(db)?;
let local_cl_stmt = local_cl_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?;

let rc = bind_package_to_stmt(local_cl_stmt.stmt, unpacked_pks, 0);
let rc = local_cl_stmt.bind_int64(1, key);
if let Err(rc) = rc {
reset_cached_stmt(local_cl_stmt.stmt)?;
return Err(rc);
}
let rc = bind_package_to_stmt(local_cl_stmt.stmt, unpacked_pks, unpacked_pks.len());
let rc = local_cl_stmt.bind_int64(2, key);
if let Err(rc) = rc {
reset_cached_stmt(local_cl_stmt.stmt)?;
return Err(rc);
Expand Down Expand Up @@ -489,7 +467,7 @@ unsafe fn merge_insert(
// We'll need the key for all later operations.
let key = tbl_info.get_or_create_key(db, &unpacked_pks)?;

let local_cl = get_local_cl(db, &tbl_info, &unpacked_pks)?;
let local_cl = get_local_cl(db, &tbl_info, key)?;

// We can ignore all updates from older causal lengths.
// They won't win at anything.
Expand Down Expand Up @@ -517,7 +495,7 @@ unsafe fn merge_insert(
db,
(*tab).pExtData,
&tbl_info,
&unpacked_pks,
key,
insert_col_vrsn,
insert_db_vrsn,
insert_site_id,
Expand Down Expand Up @@ -554,6 +532,7 @@ unsafe fn merge_insert(
(*tab).pExtData,
&tbl_info,
&unpacked_pks,
key,
insert_col_vrsn,
insert_db_vrsn,
insert_site_id,
Expand Down Expand Up @@ -588,6 +567,7 @@ unsafe fn merge_insert(
(*tab).pExtData,
&tbl_info,
&unpacked_pks,
key,
insert_cl,
insert_db_vrsn,
insert_site_id,
Expand All @@ -606,6 +586,7 @@ unsafe fn merge_insert(
insert_tbl,
&tbl_info,
&unpacked_pks,
key,
insert_col,
insert_val,
insert_col_vrsn,
Expand Down Expand Up @@ -654,7 +635,7 @@ unsafe fn merge_insert(
db,
(*tab).pExtData,
&tbl_info,
&unpacked_pks,
key,
insert_col,
insert_col_vrsn,
insert_db_vrsn,
Expand Down
14 changes: 5 additions & 9 deletions core/rs/core/src/local_writes/after_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,12 @@ fn after_delete(
let mark_locally_deleted_stmt = mark_locally_deleted_stmt_ref
.as_ref()
.ok_or("Failed to deref sentinel stmt")?;
for (i, pk) in pks_old.iter().enumerate() {
mark_locally_deleted_stmt
.bind_value(i as i32 + 1, *pk)
.or_else(|_e| Err("failed to bind pks to mark_locally_deleted_stmt"))?;
}
mark_locally_deleted_stmt
.bind_int64(pks_old.len() as i32 + 1, db_version)
.and_then(|_| mark_locally_deleted_stmt.bind_int(pks_old.len() as i32 + 2, seq))
.and_then(|_| mark_locally_deleted_stmt.bind_int64(pks_old.len() as i32 + 3, db_version))
.and_then(|_| mark_locally_deleted_stmt.bind_int(pks_old.len() as i32 + 4, seq))
.bind_int64(1, key)
.and_then(|_| mark_locally_deleted_stmt.bind_int64(2, db_version))
.and_then(|_| mark_locally_deleted_stmt.bind_int(3, seq))
.and_then(|_| mark_locally_deleted_stmt.bind_int64(4, db_version))
.and_then(|_| mark_locally_deleted_stmt.bind_int(5, seq))
.or_else(|_| Err("failed binding to mark locally deleted stmt"))?;
super::step_trigger_stmt(mark_locally_deleted_stmt)?;

Expand Down
Loading

0 comments on commit 5baace6

Please sign in to comment.