Skip to content

Commit

Permalink
fix: concurrent writes on native SQLite (#4907)
Browse files Browse the repository at this point in the history
* feat(user-facing-errors): add missing branch for SQLite

* feat(quaint): use explicit connection flags for SQLite

* feat(quaint): use BEGIN IMMEDIATE to avoid busy errors on concurrent SQLite writes

* test(connector-test-kit-rs): add Rust regression test for prisma/prisma#11789

* chore(quaint): adjust sqlite comment

* chore(user-facing-errors): simplify busy error message for SQLite
  • Loading branch information
jkomyno committed Jun 17, 2024
1 parent 27c0eb3 commit 4c3db41
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 2 deletions.
11 changes: 10 additions & 1 deletion libs/user-facing-errors/src/quaint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub fn render_quaint_error(kind: &ErrorKind, connection_info: &ConnectionInfo) -
},

(ErrorKind::SocketTimeout { .. }, ConnectionInfo::External(_)) => default_value,
#[cfg(any(feature = "mssql-native", feature = "mysql-native", feature = "postgresql-native"))]
#[cfg(any(feature = "mssql-native", feature = "mysql-native", feature = "postgresql-native", feature = "sqlite-native"))]
(ErrorKind::SocketTimeout, _) => match connection_info {
ConnectionInfo::Native(NativeConnectionInfo::Postgres(url)) => {
let time = match url.socket_timeout() {
Expand Down Expand Up @@ -181,6 +181,15 @@ pub fn render_quaint_error(kind: &ErrorKind, connection_info: &ConnectionInfo) -
.into(),
}))
}
ConnectionInfo::Native(NativeConnectionInfo::Sqlite { file_path, db_name: _ }) => {
Some(KnownError::new(common::DatabaseOperationTimeout {
time: "N/A".into(),
context: format!(
"The database failed to respond to a query within the configured timeout — see https://pris.ly/d/sqlite-connector for more details. Database: {}",
file_path
),
}))
}
_ => unreachable!(),
},

Expand Down
30 changes: 29 additions & 1 deletion quaint/src/connector/sqlite/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,27 @@ impl TryFrom<&str> for Sqlite {
let params = SqliteParams::try_from(path)?;
let file_path = params.file_path;

let conn = rusqlite::Connection::open(file_path.as_str())?;
// Read about SQLite threading modes here: https://www.sqlite.org/threadsafe.html.
// - "single-thread". In this mode, all mutexes are disabled and SQLite is unsafe to use in more than a single thread at once.
// - "multi-thread". In this mode, SQLite can be safely used by multiple threads provided that no single database connection nor any
// object derived from database connection, such as a prepared statement, is used in two or more threads at the same time.
// - "serialized". In serialized mode, API calls to affect or use any SQLite database connection or any object derived from such a
// database connection can be made safely from multiple threads. The effect on an individual object is the same as if the API calls
// had all been made in the same order from a single thread.
//
// `rusqlite` uses `SQLITE_OPEN_NO_MUTEX` by default, which means that the connection uses the "multi-thread" threading mode.

let conn = rusqlite::Connection::open_with_flags(
file_path.as_str(),
// The database is opened for reading and writing if possible, or reading only if the file is write protected by the operating system.
// The database is created if it does not already exist.
rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
| rusqlite::OpenFlags::SQLITE_OPEN_CREATE
// The new database connection will use the "multi-thread" threading mode.
| rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
// The filename can be interpreted as a URI if this flag is set.
| rusqlite::OpenFlags::SQLITE_OPEN_URI,
)?;

if let Some(timeout) = params.socket_timeout {
conn.busy_timeout(timeout)?;
Expand Down Expand Up @@ -154,6 +174,14 @@ impl Queryable for Sqlite {
fn requires_isolation_first(&self) -> bool {
false
}

fn begin_statement(&self) -> &'static str {
// From https://sqlite.org/isolation.html:
// `BEGIN IMMEDIATE` avoids possible `SQLITE_BUSY_SNAPSHOT` that arise when another connection jumps ahead in line.
// The BEGIN IMMEDIATE command goes ahead and starts a write transaction, and thus blocks all other writers.
// If the BEGIN IMMEDIATE operation succeeds, then no subsequent operations in that transaction will ever fail with an SQLITE_BUSY error.
"BEGIN IMMEDIATE"
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod max_integer;
mod prisma_10098;
mod prisma_10935;
mod prisma_11789;
mod prisma_12572;
mod prisma_12929;
mod prisma_13089;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use indoc::formatdoc;
use query_engine_tests::*;
use std::sync::Arc;

#[test_suite(schema(schema), only(Sqlite))]
mod prisma_concurrent_write {
fn schema() -> String {
let schema = indoc! {
r#"
model User {
id String @id
email String @unique
profile Profile?
}
model Profile {
id String @id @default(uuid())
user User @relation(fields: [userId], references: [id])
userId String @unique
}
"#
};

schema.to_owned()
}

#[connector_test]
// Runs 100 `run_create_user` queries in parallel, followed by 100 `run_create_profile` queries in parallel.
async fn concurrent_creates_should_succeed(runner: Runner) -> TestResult<()> {
let n = 100;
let ids: Vec<String> = (1..=n).map(|i| format!("{:05}", i)).collect();

let runner_arc = Arc::new(runner);

let create_user_tasks: Vec<_> = ids
.iter()
.map(|id| {
let runner = runner_arc.clone();
let id = id.clone();
tokio::spawn(async move { run_create_user(runner, &id).await })
})
.collect();

let created_users: Vec<TestResult<String>> = futures::future::join_all(create_user_tasks)
.await
.into_iter()
.map(|res| res.expect("Task panicked"))
.collect();

assert_eq!(created_users.len(), n);

let create_profile_tasks: Vec<_> = ids
.iter()
.map(|id| {
let runner = runner_arc.clone();
let id = id.clone();
tokio::spawn(async move { run_create_profile(runner, &id).await })
})
.collect();

let queries: Vec<TestResult<String>> = futures::future::join_all(create_profile_tasks)
.await
.into_iter()
.map(|res| res.expect("Task panicked"))
.collect();

assert_eq!(queries.len(), n);

Ok(())
}

#[connector_test]
// Runs 2 `run_create_user` queries in parallel, followed by 2 `run_upsert_profile` queries in parallel.
async fn concurrent_upserts_should_succeed(runner: Runner) -> TestResult<()> {
let n = 2;
let ids: Vec<String> = (1..=n).map(|i| format!("{:05}", i)).collect();

let runner_arc = Arc::new(runner);

let create_user_tasks: Vec<_> = ids
.iter()
.map(|id| {
let runner = runner_arc.clone();
let id = id.clone();
tokio::spawn(async move { run_create_user(runner, &id).await })
})
.collect();

// Collect the results from the spawned tasks
let created_users: Vec<TestResult<String>> = futures::future::join_all(create_user_tasks)
.await
.into_iter()
.map(|res| res.expect("Task panicked"))
.collect();

assert_eq!(created_users.len(), n);

let upsert_profile_tasks: Vec<_> = ids
.iter()
.map(|id| {
let runner = runner_arc.clone();
let id = id.clone();
tokio::spawn(async move { run_upsert_profile(runner, &id).await })
})
.collect();

// Collect the results from the spawned tasks
let queries: Vec<TestResult<String>> = futures::future::join_all(upsert_profile_tasks)
.await
.into_iter()
.map(|res| res.expect("Task panicked"))
.collect();

assert_eq!(queries.len(), n);

Ok(())
}

async fn run_create_user(runner: Arc<Runner>, id: &str) -> TestResult<String> {
Ok(run_query!(
runner,
formatdoc! { r#"
mutation {{
createOneUser(data: {{ id: "{id}", email: "{id}@test.com" }}) {{
id
email
}}
}}
"#
}
))
}

async fn run_create_profile(runner: Arc<Runner>, id: &str) -> TestResult<String> {
Ok(run_query!(
runner,
formatdoc! { r#"
mutation {{
createOneProfile(
data: {{
user: {{
connect: {{ id: "{id}" }}
}}
}}
) {{
id
}}
}}
"# }
))
}

async fn run_upsert_profile(runner: Arc<Runner>, id: &str) -> TestResult<String> {
Ok(run_query!(
runner,
formatdoc! { r#"
mutation {{
upsertOneProfile(where: {{
id: "{id}"
}}, create: {{
user: {{
connect: {{ id: "{id}" }}
}}
}}, update: {{
}}) {{
id
}}
}}
"# }
))
}
}

0 comments on commit 4c3db41

Please sign in to comment.