Skip to content

Commit

Permalink
cr-sqlite 0.16 (#128)
Browse files Browse the repository at this point in the history
* initial attempt at upgrading to 0.16

* forgot to save corro-pg file

* upgrade cr-sqlite to 0.16.0-next.2

* upgrade crsqlite, migrate properly and set config 'merge-equal-values'

* update crsqlite to a working version from our fork

* actually rename the tables

* rename migrations to be more representative

* upgrade crsqlite, fix load issue with versions

* don't ensure indexes on clock tables as part of boot, add to migration instead. Log a bit more when migrating

* wrong table name
  • Loading branch information
jeromegn authored Dec 30, 2023
1 parent d7ea4b2 commit da71b51
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 84 deletions.
26 changes: 7 additions & 19 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,10 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
let schema = {
let mut conn = pool.write_priority().await?;
migrate(&mut conn)?;

let mut schema = init_schema(&conn)?;
schema.constrain()?;

info!("Ensuring clock table indexes for fast compaction");
let start = Instant::now();
for table in schema.tables.keys() {
conn.execute_batch(&format!("CREATE INDEX IF NOT EXISTS corro_{table}__crsql_clock_site_id_dbv ON {table}__crsql_clock (site_id, db_version);"))?;
}
info!("Ensured indexes in {:?}", start.elapsed());

schema
};

Expand Down Expand Up @@ -1259,7 +1253,6 @@ fn find_cleared_db_versions(
.query_row([actor_id], |row| row.get(0))
.optional()?
{
Some(0) => None, // this is the current crsql_site_id which is going to be NULL in clock tables
Some(ordinal) => Some(ordinal),
None => {
warn!(actor_id = %actor_id, "could not find crsql ordinal for actor");
Expand Down Expand Up @@ -1287,7 +1280,7 @@ fn find_cleared_db_versions(
.iter()
.map(|table| {
params.push(&clock_site_id);
format!("SELECT DISTINCT db_version FROM {table} WHERE site_id IS ?")
format!("SELECT DISTINCT db_version FROM {table} WHERE site_id = ?")
})
.collect::<Vec<_>>()
.join(" UNION ")
Expand Down Expand Up @@ -3208,14 +3201,9 @@ pub mod tests {
let conn = ta.agent.pool().read().await?;
let counts: HashMap<ActorId, i64> = conn
.prepare_cached(
"SELECT COALESCE(site_id, crsql_site_id()), count(*) FROM crsql_changes GROUP BY site_id;",
"SELECT site_id, count(*) FROM crsql_changes GROUP BY site_id;",
)?
.query_map([], |row| {
Ok((
row.get(0)?,
row.get(1)?,
))
})?
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<rusqlite::Result<_>>()?;

debug!("versions count: {counts:?}");
Expand Down Expand Up @@ -3336,8 +3324,8 @@ pub mod tests {
}

{
let mut prepped = conn.prepare("EXPLAIN QUERY PLAN SELECT DISTINCT db_version FROM foo2__crsql_clock WHERE site_id IS ? UNION SELECT DISTINCT db_version FROM foo__crsql_clock WHERE site_id IS ?;")?;
let mut rows = prepped.query([rusqlite::types::Null, rusqlite::types::Null])?;
let mut prepped = conn.prepare("EXPLAIN QUERY PLAN SELECT DISTINCT db_version FROM foo2__crsql_clock WHERE site_id = ? UNION SELECT DISTINCT db_version FROM foo__crsql_clock WHERE site_id = ?;")?;
let mut rows = prepped.query([0, 0])?;

println!("matching clock rows:");
while let Ok(Some(row)) = rows.next() {
Expand Down Expand Up @@ -3656,7 +3644,7 @@ pub mod tests {

#[test]
fn test_store_empty_changeset() -> eyre::Result<()> {
let mut conn = Connection::open_in_memory()?;
let mut conn = CrConn::init(Connection::open_in_memory()?)?;

corro_types::sqlite::setup_conn(&mut conn)?;
migrate(&mut conn)?;
Expand Down
38 changes: 10 additions & 28 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ const ADAPT_CHUNK_SIZE_THRESHOLD: Duration = Duration::from_millis(500);
fn handle_known_version(
conn: &mut Connection,
actor_id: ActorId,
is_local: bool,
version: Version,
init_known: KnownDbVersion,
booked: &Booked,
Expand Down Expand Up @@ -393,23 +392,22 @@ fn handle_known_version(
// this is a read transaction!
let tx = conn.transaction()?;

let mut prepped = tx.prepare_cached(r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl
let mut prepped = tx.prepare_cached(
r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl
FROM crsql_changes
WHERE site_id IS ?
WHERE site_id = ?
AND db_version = ?
AND seq >= ? AND seq <= ?
ORDER BY seq ASC
"#)?;
let site_id: Option<[u8; 16]> = (!is_local)
.then_some(actor_id)
.map(|actor_id| actor_id.to_bytes());
"#,
)?;

let start_seq = range_needed.start();
let end_seq = range_needed.end();

let rows = prepped.query_map(
params![site_id, db_version, start_seq, end_seq],
params![actor_id, db_version, start_seq, end_seq],
row_to_change,
)?;

Expand Down Expand Up @@ -499,7 +497,6 @@ fn handle_known_version(
return handle_known_version(
conn,
actor_id,
is_local,
version,
current,
booked,
Expand Down Expand Up @@ -573,7 +570,6 @@ fn handle_known_version(
async fn process_version(
pool: &SplitPool,
actor_id: ActorId,
is_local: bool,
version: Version,
known_version: KnownDbVersion,
booked: &Booked,
Expand All @@ -599,7 +595,6 @@ async fn process_version(
handle_known_version(
&mut conn,
actor_id,
is_local,
version,
known_version,
booked,
Expand Down Expand Up @@ -674,7 +669,6 @@ fn send_change_chunks<I: Iterator<Item = rusqlite::Result<Change>>>(
}

async fn process_sync(
local_actor_id: ActorId,
pool: SplitPool,
bookie: Bookie,
sender: Sender<SyncMessage>,
Expand Down Expand Up @@ -725,8 +719,6 @@ async fn process_sync(
None => continue,
};

let is_local = actor_id == local_actor_id;

let mut cleared: RangeInclusiveSet<Version> = RangeInclusiveSet::new();

{
Expand Down Expand Up @@ -787,7 +779,6 @@ async fn process_sync(
process_version(
&pool,
actor_id,
is_local,
version,
known_version,
&booked,
Expand All @@ -811,7 +802,6 @@ async fn process_sync(
process_version(
&pool,
actor_id,
is_local,
version,
known_version,
&booked,
Expand Down Expand Up @@ -1373,15 +1363,9 @@ pub async fn serve_sync(
let (tx, mut rx) = mpsc::channel::<SyncMessage>(256);

tokio::spawn(
process_sync(
agent.actor_id(),
agent.pool().clone(),
agent.bookie().clone(),
tx,
rx_need,
)
.instrument(info_span!("process_sync"))
.inspect_err(|e| error!("could not process sync request: {e}")),
process_sync(agent.pool().clone(), agent.bookie().clone(), tx, rx_need)
.instrument(info_span!("process_sync"))
.inspect_err(|e| error!("could not process sync request: {e}")),
);

let (send_res, recv_res) = tokio::join!(
Expand Down Expand Up @@ -1666,7 +1650,6 @@ mod tests {
handle_known_version(
&mut conn,
actor_id,
false,
Version(1),
known1,
&booked,
Expand Down Expand Up @@ -1696,7 +1679,6 @@ mod tests {
handle_known_version(
&mut conn,
actor_id,
false,
Version(2),
known2,
&booked,
Expand Down
31 changes: 19 additions & 12 deletions crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,18 @@ where
})?;

let has_changes: bool = tx
.prepare_cached(
"SELECT EXISTS(SELECT 1 FROM crsql_changes WHERE site_id IS NULL AND db_version = ?);",
).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: None})?
.query_row([db_version], |row| row.get(0)).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: None})?;
.prepare_cached("SELECT EXISTS(SELECT 1 FROM crsql_changes WHERE db_version = ?);")
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: None,
})?
.query_row([db_version], |row| row.get(0))
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: None,
})?;

if !has_changes {
tx.commit().map_err(|source| ChangeError::Rusqlite {
Expand All @@ -105,9 +113,7 @@ where
trace!("version: {version}");

let last_seq: CrsqlSeq = tx
.prepare_cached(
"SELECT MAX(seq) FROM crsql_changes WHERE site_id IS NULL AND db_version = ?",
)
.prepare_cached("SELECT MAX(seq) FROM crsql_changes WHERE db_version = ?")
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
Expand Down Expand Up @@ -174,13 +180,14 @@ where

block_in_place(|| {
// TODO: make this more generic so both sync and local changes can use it.
let mut prepped = conn.prepare_cached(r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl
let mut prepped = conn.prepare_cached(
r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl
FROM crsql_changes
WHERE site_id IS NULL
AND db_version = ?
WHERE db_version = ?
ORDER BY seq ASC
"#)?;
"#,
)?;
let rows = prepped.query_map([db_version], row_to_change)?;
let chunked =
ChunkedChanges::new(rows, CrsqlSeq(0), last_seq, MAX_CHANGES_BYTE_SIZE);
Expand Down
19 changes: 8 additions & 11 deletions crates/corro-pg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2288,9 +2288,7 @@ fn handle_commit(agent: &Agent, conn: &Connection) -> rusqlite::Result<()> {
.query_row((), |row| row.get(0))?;

let has_changes: bool = conn
.prepare_cached(
"SELECT EXISTS(SELECT 1 FROM crsql_changes WHERE site_id IS NULL AND db_version = ?);",
)?
.prepare_cached("SELECT EXISTS(SELECT 1 FROM crsql_changes WHERE db_version = ?);")?
.query_row([db_version], |row| row.get(0))?;

if !has_changes {
Expand All @@ -2306,9 +2304,7 @@ fn handle_commit(agent: &Agent, conn: &Connection) -> rusqlite::Result<()> {
};

let last_seq: CrsqlSeq = conn
.prepare_cached(
"SELECT MAX(seq) FROM crsql_changes WHERE site_id IS NULL AND db_version = ?",
)?
.prepare_cached("SELECT MAX(seq) FROM crsql_changes WHERE db_version = ?")?
.query_row([db_version], |row| row.get(0))?;

let mut book_writer = booked.blocking_write("handle_write_tx(book_writer)");
Expand Down Expand Up @@ -2356,13 +2352,14 @@ fn handle_commit(agent: &Agent, conn: &Connection) -> rusqlite::Result<()> {

block_in_place(|| {
// TODO: make this more generic so both sync and local changes can use it.
let mut prepped = conn.prepare_cached(r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl
let mut prepped = conn.prepare_cached(
r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl
FROM crsql_changes
WHERE site_id IS NULL
AND db_version = ?
WHERE db_version = ?
ORDER BY seq ASC
"#)?;
"#,
)?;
let rows = prepped.query_map([db_version], row_to_change)?;
let chunked =
ChunkedChanges::new(rows, CrsqlSeq(0), last_seq, MAX_CHANGES_BYTE_SIZE);
Expand Down
Binary file modified crates/corro-types/crsqlite-darwin-aarch64.dylib
100644 → 100755
Binary file not shown.
Binary file modified crates/corro-types/crsqlite-darwin-x86_64.dylib
100644 → 100755
Binary file not shown.
Binary file modified crates/corro-types/crsqlite-linux-aarch64.so
100644 → 100755
Binary file not shown.
Binary file modified crates/corro-types/crsqlite-linux-x86_64.so
100644 → 100755
Binary file not shown.
65 changes: 59 additions & 6 deletions crates/corro-types/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,10 @@ impl Agent {
pub fn migrate(conn: &mut Connection) -> rusqlite::Result<()> {
let migrations: Vec<Box<dyn Migration>> = vec![
Box::new(init_migration as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(v0_2_0_migration as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(v0_2_0_1_migration as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(v0_2_0_2_migration as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(bookkeeping_db_version_index as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(create_corro_subs as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(refactor_corro_members as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(crsqlite_v0_16_migration as fn(&Transaction) -> rusqlite::Result<()>),
];

crate::sqlite::migrate(conn, migrations)
Expand Down Expand Up @@ -321,15 +322,15 @@ fn init_migration(tx: &Transaction) -> rusqlite::Result<()> {
Ok(())
}

fn v0_2_0_migration(tx: &Transaction) -> rusqlite::Result<()> {
fn bookkeeping_db_version_index(tx: &Transaction) -> rusqlite::Result<()> {
tx.execute_batch(
"
CREATE INDEX __corro_bookkeeping_db_version ON __corro_bookkeeping (db_version);
",
)
}

fn v0_2_0_1_migration(tx: &Transaction) -> rusqlite::Result<()> {
fn create_corro_subs(tx: &Transaction) -> rusqlite::Result<()> {
tx.execute_batch(
r#"
-- where subscriptions are stored
Expand All @@ -342,7 +343,7 @@ fn v0_2_0_1_migration(tx: &Transaction) -> rusqlite::Result<()> {
)
}

fn v0_2_0_2_migration(tx: &Transaction) -> rusqlite::Result<()> {
fn refactor_corro_members(tx: &Transaction) -> rusqlite::Result<()> {
tx.execute_batch(
r#"
-- remove state
Expand All @@ -357,6 +358,58 @@ fn v0_2_0_2_migration(tx: &Transaction) -> rusqlite::Result<()> {
)
}

// since crsqlite 0.16, site_id is NOT NULL in clock tables
// also sets the new 'merge-equal-values' config to true.
fn crsqlite_v0_16_migration(tx: &Transaction) -> rusqlite::Result<()> {
let tables: Vec<String> = tx.prepare("SELECT tbl_name FROM sqlite_master WHERE type='table' AND tbl_name LIKE '%__crsql_clock'")?.query_map([], |row| row.get(0))?.collect::<rusqlite::Result<Vec<_>>>()?;

for table in tables {
let indexes: Vec<String> = tx
.prepare(&format!(
"SELECT sql FROM sqlite_master WHERE type='index' AND name LIKE '{table}%'"
))?
.query_map([], |row| row.get(0))?
.collect::<rusqlite::Result<Vec<_>>>()?;

tx.execute_batch(
&format!(r#"
CREATE TABLE {table}_new (
key INTEGER NOT NULL,
col_name TEXT NOT NULL,
col_version INTEGER NOT NULL,
db_version INTEGER NOT NULL,
site_id INTEGER NOT NULL DEFAULT 0,
seq INTEGER NOT NULL,
PRIMARY KEY (key, col_name)
) WITHOUT ROWID, STRICT;
INSERT INTO {table}_new SELECT key, col_name, col_version, db_version, COALESCE(site_id, 0), seq FROM {table};
ALTER TABLE {table} RENAME TO {table}_old;
ALTER TABLE {table}_new RENAME TO {table};
DROP TABLE {table}_old;
CREATE INDEX IF NOT EXISTS corro_{table}_site_id_dbv ON {table} (site_id, db_version);
"#),
)?;

// recreate the indexes
for sql in indexes {
tx.execute_batch(&sql)?;
}
}

// we want this to be true or else we'll assuredly make our DB inconsistent.
let _value: i64 = tx.query_row(
"SELECT crsql_config_set('merge-equal-values', 1);",
[],
|row| row.get(0),
)?;

Ok(())
}

#[derive(Debug, Clone)]
pub struct SplitPool(Arc<SplitPoolInner>);

Expand Down
Loading

0 comments on commit da71b51

Please sign in to comment.