Skip to content

Commit

Permalink
feat(services/sqlite): Support blocking_get/set/delete in sqlite serv…
Browse files Browse the repository at this point in the history
…ice (#3218)
  • Loading branch information
Zheaoli authored Sep 30, 2023
1 parent e759da4 commit b6b1214
Showing 1 changed file with 53 additions and 57 deletions.
110 changes: 53 additions & 57 deletions core/src/services/sqlite/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,76 +204,72 @@ impl kv::Adapter for Adapter {
}

async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
let connection_string = self.connection_string.clone();
let value_field = self.value_field.clone();
let table = self.table.clone();
let key_field = self.key_field.clone();
let cloned_path = path.to_string();
let cloned_self = self.clone();

task::spawn_blocking(move || {
let query = format!(
"SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1",
value_field, table, key_field
);
let conn = Connection::open(connection_string).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(Error::from)?;
let result = statement.query_row([cloned_path.as_str()], |row| row.get(0));
match result {
Ok(v) => Ok(Some(v)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(err) => Err(Error::from(err)),
}
})
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str()))
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
}

fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
let query = format!(
"SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1",
self.value_field, self.table, self.key_field
);
let conn = Connection::open(self.connection_string.clone()).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(Error::from)?;
let result = statement.query_row([path], |row| row.get(0));
match result {
Ok(v) => Ok(Some(v)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(err) => Err(Error::from(err)),
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
let connection_string = self.connection_string.clone();
let table = self.table.clone();
let key_field = self.key_field.clone();
let value_field = self.value_field.clone();
let cloned_path = path.to_string();
let cloned_value = value.to_vec();
let cloned_self = self.clone();

task::spawn_blocking(move || {
let query = format!(
"INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
table, key_field, value_field
);
let conn = Connection::open(connection_string).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(Error::from)?;
statement
.execute(params![cloned_path, cloned_value])
.map_err(Error::from)?;
Ok(())
})
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
let query = format!(
"INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
self.table, self.key_field, self.value_field
);
let conn = Connection::open(self.connection_string.clone()).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(Error::from)?;
statement
.execute(params![path, value])
.map_err(Error::from)?;
Ok(())
}

async fn delete(&self, path: &str) -> Result<()> {
let connection_string = self.connection_string.clone();
let table = self.table.clone();
let key_field = self.key_field.clone();
let cloned_path = path.to_string();
let cloned_self = self.clone();

task::spawn_blocking(move || {
let conn = Connection::open(connection_string).map_err(|err| {
Error::new(ErrorKind::Unexpected, "Sqlite open error").set_source(err)
})?;
let query = format!("DELETE FROM {} WHERE `{}` = $1", table, key_field);
let mut statement = conn.prepare(&query).map_err(Error::from)?;
statement
.execute([cloned_path.as_str()])
.map_err(Error::from)?;
Ok(())
})
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str()))
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
}

fn blocking_delete(&self, path: &str) -> Result<()> {
let conn = Connection::open(self.connection_string.clone()).map_err(|err| {
Error::new(ErrorKind::Unexpected, "Sqlite open error").set_source(err)
})?;
let query = format!("DELETE FROM {} WHERE `{}` = $1", self.table, self.key_field);
let mut statement = conn.prepare(&query).map_err(Error::from)?;
statement.execute([path]).map_err(Error::from)?;
Ok(())
}
}

Expand Down

0 comments on commit b6b1214

Please sign in to comment.