Skip to content

Commit

Permalink
Use CachingSession on scyllaDB.
Browse files Browse the repository at this point in the history
  • Loading branch information
zensh committed Aug 21, 2023
1 parent d23d55a commit ddacf82
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 82 deletions.
53 changes: 31 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "jarvis"
version = "0.7.11"
version = "0.7.12"
edition = "2021"
rust-version = "1.64"
description = ""
Expand Down Expand Up @@ -38,7 +38,7 @@ isolang = { git = "https://github.com/yiwen-ai/isolang-rs.git", branch = "master
libflate = "1"
log = "0.4"
mime = "0.3"
scylla = "0.8"
scylla = "0.9"
serde = "1"
serde_json = { version = "1.0", features = ["preserve_order"] }
structured-logger = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion src/api/translating.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub async fn get(
})))
}

const IGNORE_LANGGUAGES: [&'static str; 5] = ["abk", "ava", "bak", "lim", "nya"];
const IGNORE_LANGGUAGES: [&str; 5] = ["abk", "ava", "bak", "lim", "nya"];

pub async fn list_languages(
to: PackObject<()>,
Expand Down
5 changes: 2 additions & 3 deletions src/db/model_embedding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,10 @@ impl Embedding {
) -> anyhow::Result<Vec<Embedding>> {
let fields = Self::select_fields(select_fields, true)?;

let query = scylladb::Query::new(format!(
let query = format!(
"SELECT {} FROM embedding WHERE cid=? AND language=? AND version=? AND gid=? LIMIT 1000 ALLOW FILTERING BYPASS CACHE USING TIMEOUT 10s",
fields.clone().join(",")
))
.with_page_size(1000i32);
);
let params = (cid.to_cql(), lang.to_cql(), version, gid.to_cql());
let rows = db.execute_iter(query, params).await?;

Expand Down
75 changes: 21 additions & 54 deletions src/db/scylladb.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures::{stream::StreamExt, Stream};
use scylla::{
frame::value::{BatchValues, ValueList},
statement::{prepared_statement::PreparedStatement, Consistency, SerialConsistency},
statement::{Consistency, SerialConsistency},
transport::{query_result::QueryResult, Compression, ExecutionProfile},
Metrics, Session, SessionBuilder,
CachingSession, Metrics, Session, SessionBuilder,
};
use std::{sync::Arc, time::Duration};

Expand All @@ -17,7 +17,7 @@ pub use scylla::{
use crate::conf;

pub struct ScyllaDB {
session: Session,
session: CachingSession,
}

impl ScyllaDB {
Expand All @@ -43,41 +43,21 @@ impl ScyllaDB {
session.use_keyspace(keyspace, false).await?;
}

Ok(Self { session })
Ok(Self {
session: CachingSession::from(session, 10000),
})
}

pub fn metrics(&self) -> Arc<Metrics> {
self.session.get_metrics()
}

#[cfg(test)]
pub async fn init_tables_for_test(&self) -> anyhow::Result<()> {
let schema = std::include_str!("../../cql/schema_keyspace_test.cql");
exec_cqls(self, schema).await?;

let schema = std::include_str!("../../cql/schema_table.cql");
exec_cqls(self, schema).await?;
Ok(())
}

pub async fn query(
&self,
query: impl Into<Query>,
params: impl ValueList,
) -> anyhow::Result<QueryResult> {
let res = self.session.query(query, params).await?;
Ok(res)
self.session.get_session().get_metrics()
}

pub async fn execute(
&self,
query: impl Into<Query>,
params: impl ValueList,
) -> anyhow::Result<QueryResult> {
let mut prepared: PreparedStatement = self.session.prepare(query).await?;

prepared.set_consistency(Consistency::Quorum);
let res = self.session.execute(&prepared, params).await?;
let res = self.session.execute(query, params).await?;
Ok(res)
}

Expand All @@ -86,10 +66,7 @@ impl ScyllaDB {
query: impl Into<Query>,
params: impl ValueList,
) -> anyhow::Result<Vec<Row>> {
let mut prepared: PreparedStatement = self.session.prepare(query).await?;

prepared.set_consistency(Consistency::Quorum);
let mut rows_stream = self.session.execute_iter(prepared, params).await?;
let mut rows_stream = self.session.execute_iter(query, params).await?;

let (capacity, _) = rows_stream.size_hint();
let mut rows: Vec<Row> = Vec::with_capacity(capacity);
Expand All @@ -99,23 +76,9 @@ impl ScyllaDB {
Ok(rows)
}

pub async fn execute_paged(
&self,
query: impl Into<Query>,
params: impl ValueList,
paging_state: Option<Bytes>,
) -> anyhow::Result<Vec<Row>> {
let mut prepared: PreparedStatement = self.session.prepare(query).await?;

prepared.set_consistency(Consistency::Quorum);
let res = self
.session
.execute_paged(&prepared, params, paging_state)
.await?;

Ok(res.rows.unwrap_or_default())
}

// https://opensource.docs.scylladb.com/master/cql/dml.html#batch-statement
// BATCH operations are only isolated within a single partition.
// BATCH with conditions cannot span multiple tables
pub async fn batch(
&self,
statements: Vec<&str>,
Expand All @@ -125,9 +88,7 @@ impl ScyllaDB {
for statement in statements {
batch.append_statement(statement);
}
let mut prepared_batch: Batch = self.session.prepare_batch(&batch).await?;
prepared_batch.set_consistency(Consistency::Quorum);
let res = self.session.batch(&prepared_batch, values).await?;
let res = self.session.batch(&batch, values).await?;
Ok(res)
}
}
Expand Down Expand Up @@ -159,7 +120,7 @@ pub async fn exec_cqls(db: &ScyllaDB, cqls: &str) -> anyhow::Result<()> {

for cql in cqls {
let res = db
.query(cql.clone(), &[])
.execute(cql.clone(), &[])
.await
.map_err(|err| anyhow::anyhow!("\ncql: {}\nerror: {}", &cql, &err));
if res.is_err() {
Expand All @@ -177,6 +138,7 @@ pub async fn exec_cqls(db: &ScyllaDB, cqls: &str) -> anyhow::Result<()> {

#[cfg(test)]
mod tests {
use super::*;
use crate::conf;
use crate::db;
use tokio::sync::OnceCell;
Expand All @@ -195,6 +157,11 @@ mod tests {
#[tokio::test(flavor = "current_thread")]
async fn exec_cqls_works() {
let db = get_db().await;
db.init_tables_for_test().await.unwrap();

let schema = std::include_str!("../../cql/schema_keyspace_test.cql");
exec_cqls(db, schema).await.unwrap();

let schema = std::include_str!("../../cql/schema_table.cql");
exec_cqls(db, schema).await.unwrap();
}
}

0 comments on commit ddacf82

Please sign in to comment.