Skip to content

Commit

Permalink
feat: Use SQLite WAL transaction mode
Browse files Browse the repository at this point in the history
  • Loading branch information
peasee committed Aug 1, 2024
1 parent 4356be6 commit 36103d6
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/sql/db_connection_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub trait DbConnectionPool<T, P: 'static> {
fn join_push_down(&self) -> JoinPushDown;
}

#[derive(Default)]
#[derive(Default, Clone)]
pub enum Mode {
#[default]
Memory,
Expand Down
12 changes: 11 additions & 1 deletion src/sql/db_connection_pool/sqlitepool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,17 @@ impl DbConnectionPool<Connection, &'static (dyn ToSql + Sync)> for SqliteConnect
async fn connect(
&self,
) -> Result<Box<dyn DbConnection<Connection, &'static (dyn ToSql + Sync)>>> {
Ok(Box::new(SqliteConnection::new(self.conn.clone())))
let conn = self.conn.clone();

// change transaction mode to Write-Ahead log instead of default atomic rollback journal: https://www.sqlite.org/wal.html
conn.call(|conn| {
conn.execute_batch("PRAGMA journal_mode = WAL;")?;
Ok(())
})
.await
.context(ConnectionPoolSnafu)?;

Ok(Box::new(SqliteConnection::new(conn)))
}

fn join_push_down(&self) -> JoinPushDown {
Expand Down
12 changes: 11 additions & 1 deletion src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ impl TableProviderFactory for SqliteTableProviderFactory {

let db_path = self.sqlite_file_path(&name, &cmd.options);

// use a separate pool instance from writing to allow for concurrent reads+writes
// even though we setup SQLite to use WAL mode, the pool isn't really a pool so shares the same connection
// and we can't have concurrent writes when sharing the same connection
let read_pool: Arc<SqliteConnectionPool> = Arc::new(
SqliteConnectionPool::new(&db_path, mode.clone())
.await
.context(DbConnectionPoolSnafu)
.map_err(to_datafusion_error)?,
);

let pool: Arc<SqliteConnectionPool> = Arc::new(
SqliteConnectionPool::new(&db_path, mode)
.await
Expand Down Expand Up @@ -200,7 +210,7 @@ impl TableProviderFactory for SqliteTableProviderFactory {
.map_err(to_datafusion_error)?;
}

let dyn_pool: Arc<DynSqliteConnectionPool> = pool;
let dyn_pool: Arc<DynSqliteConnectionPool> = read_pool;

let read_provider = Arc::new(SqlTable::new_with_schema(
"sqlite",
Expand Down

0 comments on commit 36103d6

Please sign in to comment.