Skip to content

Commit

Permalink
Introduce pagination for Scylla (linera-io#1394)
Browse files Browse the repository at this point in the history
* Upgrade of the version of ScyllaDb.
* Change the "query" to "query_paged".
* Remove several unwrap statements.
  • Loading branch information
MathieuDutSik authored Dec 19, 2023
1 parent 54a4e38 commit 0d3ff14
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 74 deletions.
43 changes: 26 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ aws-sdk-s3 = "1.4.0"
aws-smithy-http = "0.60.0"
aws-types = "1.0.1"
aws-smithy-types = "1.0.2"
scylla = "0.8.2"
scylla = "0.10.1"
axum = "0.6.20"
bcs = "0.1.6"
bincode = "1.3.3"
Expand Down
128 changes: 72 additions & 56 deletions linera-views/src/scylla_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub enum ScyllaDbContextError {
#[error("The key must have at most 1M")]
KeyTooLong,

/// A row error in ScyllaDB
#[error(transparent)]
ScyllaDbRowError(#[from] scylla::cql_to_rust::FromRowError),

/// A query error in ScyllaDB
#[error(transparent)]
ScyllaDbQueryError(#[from] scylla::transport::errors::QueryError),
Expand Down Expand Up @@ -205,10 +209,10 @@ impl ScyllaDbStoreInternal {
"SELECT v FROM kv.{} WHERE dummy = 0 AND k = ? ALLOW FILTERING",
table_name
);
let rows = session.query(query, values).await?;
if let Some(rows) = rows.rows {
let result = session.query(query, values).await?;
if let Some(rows) = result.rows {
if let Some(row) = rows.into_typed::<(Vec<u8>,)>().next() {
let value = row.unwrap();
let value = row?;
return Ok(Some(value.0));
}
}
Expand All @@ -228,8 +232,8 @@ impl ScyllaDbStoreInternal {
"SELECT dummy FROM kv.{} WHERE dummy = 0 AND k = ? ALLOW FILTERING",
table_name
);
let rows = session.query(query, values).await?;
if let Some(rows) = rows.rows {
let result = session.query(query, values).await?;
if let Some(rows) = result.rows {
if let Some(_row) = rows.into_typed::<(Vec<u8>,)>().next() {
return Ok(true);
}
Expand Down Expand Up @@ -308,33 +312,39 @@ impl ScyllaDbStoreInternal {
let table_name = &store.1;
// Read the value of a key
let len = key_prefix.len();
let rows = match get_upper_bound_option(&key_prefix) {
None => {
let values = (key_prefix,);
let query = format!(
"SELECT k FROM kv.{} WHERE dummy = 0 AND k >= ? ALLOW FILTERING",
table_name
);
session.query(query, values).await?
}
Some(upper_bound) => {
let values = (key_prefix, upper_bound);
let query = format!(
"SELECT k FROM kv.{} WHERE dummy = 0 AND k >= ? AND k < ? ALLOW FILTERING",
table_name
);
session.query(query, values).await?
}
};
let mut keys = Vec::new();
if let Some(rows) = rows.rows {
for row in rows.into_typed::<(Vec<u8>,)>() {
let key = row.unwrap();
let short_key = key.0[len..].to_vec();
keys.push(short_key);
let mut paging_state = None;
loop {
let result = match get_upper_bound_option(&key_prefix) {
None => {
let values = (key_prefix.clone(),);
let query = format!(
"SELECT k FROM kv.{} WHERE dummy = 0 AND k >= ? ALLOW FILTERING",
table_name
);
session.query_paged(query, values, paging_state).await?
}
Some(upper_bound) => {
let values = (key_prefix.clone(), upper_bound);
let query = format!(
"SELECT k FROM kv.{} WHERE dummy = 0 AND k >= ? AND k < ? ALLOW FILTERING",
table_name
);
session.query_paged(query, values, paging_state).await?
}
};
if let Some(rows) = result.rows {
for row in rows.into_typed::<(Vec<u8>,)>() {
let key = row?;
let short_key = key.0[len..].to_vec();
keys.push(short_key);
}
}
if result.paging_state.is_none() {
return Ok(keys);
}
paging_state = result.paging_state;
}
Ok(keys)
}

async fn find_key_values_by_prefix_internal(
Expand All @@ -349,33 +359,39 @@ impl ScyllaDbStoreInternal {
let table_name = &store.1;
// Read the value of a key
let len = key_prefix.len();
let rows = match get_upper_bound_option(&key_prefix) {
None => {
let values = (key_prefix,);
let query = format!(
"SELECT k,v FROM kv.{} WHERE dummy = 0 AND k >= ? ALLOW FILTERING",
table_name
);
session.query(query, values).await?
}
Some(upper_bound) => {
let values = (key_prefix, upper_bound);
let query = format!(
"SELECT k,v FROM kv.{} WHERE dummy = 0 AND k >= ? AND k < ? ALLOW FILTERING",
table_name
);
session.query(query, values).await?
}
};
let mut key_values = Vec::new();
if let Some(rows) = rows.rows {
for row in rows.into_typed::<(Vec<u8>, Vec<u8>)>() {
let key = row.unwrap();
let short_key = key.0[len..].to_vec();
key_values.push((short_key, key.1));
let mut paging_state = None;
loop {
let result = match get_upper_bound_option(&key_prefix) {
None => {
let values = (key_prefix.clone(),);
let query = format!(
"SELECT k,v FROM kv.{} WHERE dummy = 0 AND k >= ? ALLOW FILTERING",
table_name
);
session.query_paged(query, values, paging_state).await?
}
Some(upper_bound) => {
let values = (key_prefix.clone(), upper_bound);
let query = format!(
"SELECT k,v FROM kv.{} WHERE dummy = 0 AND k >= ? AND k < ? ALLOW FILTERING",
table_name
);
session.query_paged(query, values, paging_state).await?
}
};
if let Some(rows) = result.rows {
for row in rows.into_typed::<(Vec<u8>, Vec<u8>)>() {
let key = row?;
let short_key = key.0[len..].to_vec();
key_values.push((short_key, key.1));
}
}
if result.paging_state.is_none() {
return Ok(key_values);
}
paging_state = result.paging_state;
}
Ok(key_values)
}

/// Retrieves the table_name from the store
Expand Down Expand Up @@ -544,11 +560,11 @@ impl ScyllaDbStoreInternal {
.known_node(store_config.uri.as_str())
.build()
.await?;
let rows = session.query("DESCRIBE KEYSPACE kv", &[]).await?;
let result = session.query("DESCRIBE KEYSPACE kv", &[]).await?;
let mut tables = Vec::new();
if let Some(rows) = rows.rows {
if let Some(rows) = result.rows {
for row in rows.into_typed::<(String, String, String, String)>() {
let value = row.unwrap();
let value = row?;
if value.1 == "table" {
tables.push(value.2);
}
Expand Down

0 comments on commit 0d3ff14

Please sign in to comment.