From 2303606abb791f01cee5c23ca97f06bbfa5ca430 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Fri, 9 May 2025 13:24:31 +0200 Subject: [PATCH 01/12] cargo: bump rust-driver to 32d179cb2 To get the support for enforcing/getting coordinator. --- scylla-rust-wrapper/Cargo.lock | 9 ++++----- scylla-rust-wrapper/Cargo.toml | 4 ++-- scylla-rust-wrapper/src/query_result.rs | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/scylla-rust-wrapper/Cargo.lock b/scylla-rust-wrapper/Cargo.lock index 278b6ef9..aff76799 100644 --- a/scylla-rust-wrapper/Cargo.lock +++ b/scylla-rust-wrapper/Cargo.lock @@ -1113,7 +1113,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "scylla" version = "1.1.0" -source = "git+https://github.com/scylladb/scylla-rust-driver.git?rev=v1.1.0#ef5b0ada61989cedf9bcf5d715c8b36214f8b36f" +source = "git+https://github.com/scylladb/scylla-rust-driver.git?rev=32d179cb2#32d179cb215fc26e3bd884f355c1d24f713cc3db" dependencies = [ "arc-swap", "async-trait", @@ -1125,7 +1125,6 @@ dependencies = [ "hashbrown 0.14.5", "histogram", "itertools", - "lazy_static", "lz4_flex", "openssl", "rand 0.9.0", @@ -1172,7 +1171,7 @@ dependencies = [ [[package]] name = "scylla-cql" version = "1.1.0" -source = "git+https://github.com/scylladb/scylla-rust-driver.git?rev=v1.1.0#ef5b0ada61989cedf9bcf5d715c8b36214f8b36f" +source = "git+https://github.com/scylladb/scylla-rust-driver.git?rev=32d179cb2#32d179cb215fc26e3bd884f355c1d24f713cc3db" dependencies = [ "async-trait", "byteorder", @@ -1192,7 +1191,7 @@ dependencies = [ [[package]] name = "scylla-macros" version = "1.1.0" -source = "git+https://github.com/scylladb/scylla-rust-driver.git?rev=v1.1.0#ef5b0ada61989cedf9bcf5d715c8b36214f8b36f" +source = "git+https://github.com/scylladb/scylla-rust-driver.git?rev=32d179cb2#32d179cb215fc26e3bd884f355c1d24f713cc3db" dependencies = [ "darling", "proc-macro2", @@ -1203,7 +1202,7 @@ dependencies = [ [[package]] name = "scylla-proxy" version = "0.0.3" -source = "git+https://github.com/scylladb/scylla-rust-driver.git?rev=v1.1.0#ef5b0ada61989cedf9bcf5d715c8b36214f8b36f" +source = "git+https://github.com/scylladb/scylla-rust-driver.git?rev=32d179cb2#32d179cb215fc26e3bd884f355c1d24f713cc3db" dependencies = [ "bigdecimal", "byteorder", diff --git a/scylla-rust-wrapper/Cargo.toml b/scylla-rust-wrapper/Cargo.toml index 11a48b38..02a6de53 100644 --- a/scylla-rust-wrapper/Cargo.toml +++ b/scylla-rust-wrapper/Cargo.toml @@ -10,7 +10,7 @@ categories = ["database"] license = "MIT OR Apache-2.0" [dependencies] -scylla = { git = "https://github.com/scylladb/scylla-rust-driver.git", rev = "v1.1.0", features = [ +scylla = { git = "https://github.com/scylladb/scylla-rust-driver.git", rev = "32d179cb2", features = [ "openssl-010", "metrics", ] } @@ -34,7 +34,7 @@ bindgen = "0.65" chrono = "0.4.20" [dev-dependencies] -scylla-proxy = { git = "https://github.com/scylladb/scylla-rust-driver.git", rev = "v1.1.0" } +scylla-proxy = { git = "https://github.com/scylladb/scylla-rust-driver.git", rev = "32d179cb2" } bytes = "1.10.0" assert_matches = "1.5.0" diff --git a/scylla-rust-wrapper/src/query_result.rs b/scylla-rust-wrapper/src/query_result.rs index a88b86b1..ac6d763e 100644 --- a/scylla-rust-wrapper/src/query_result.rs +++ b/scylla-rust-wrapper/src/query_result.rs @@ -73,7 +73,7 @@ impl CassResult { )) }); - let (raw_rows, tracing_id, _) = rows_result.into_inner(); + let (raw_rows, tracing_id, _, _coordinator) = rows_result.into_inner(); let shared_data = Arc::new(CassRowsResultSharedData { raw_rows, metadata }); let first_row = RowWithSelfBorrowedResultData::first_from_raw_rows_and_metadata( Arc::clone(&shared_data), From 00be833fe79118f2f94150f4294f74ed59f1d4da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Fri, 9 May 2025 14:09:56 +0200 Subject: [PATCH 02/12] statement: implement methods for enforcing host There is also `cass_statement_set_node` but it cooperates with `cass_future_coordinator` which will be implemented later. --- scylla-rust-wrapper/src/statement.rs | 239 +++++++++++++++++++++++++++ 1 file changed, 239 insertions(+) diff --git a/scylla-rust-wrapper/src/statement.rs b/scylla-rust-wrapper/src/statement.rs index 5aec5029..87bef0f4 100644 --- a/scylla-rust-wrapper/src/statement.rs +++ b/scylla-rust-wrapper/src/statement.rs @@ -1,6 +1,7 @@ use crate::cass_error::CassError; use crate::cass_types::CassConsistency; use crate::exec_profile::PerStatementExecProfile; +use crate::inet::CassInet; use crate::prepared::CassPrepared; use crate::query_result::CassResult; use crate::retry_policy::CassRetryPolicy; @@ -8,6 +9,7 @@ use crate::types::*; use crate::value::CassCqlValue; use crate::{argconv::*, value}; use scylla::frame::types::Consistency; +use scylla::policies::load_balancing::{NodeIdentifier, SingleTargetLoadBalancingPolicy}; use scylla::response::{PagingState, PagingStateResponse}; use scylla::serialize::SerializationError; use scylla::serialize::row::{RowSerializationContext, SerializeRow}; @@ -19,8 +21,10 @@ use scylla::value::MaybeUnset; use scylla::value::MaybeUnset::{Set, Unset}; use std::collections::HashMap; use std::convert::TryInto; +use std::net::{IpAddr, SocketAddr}; use std::os::raw::{c_char, c_int}; use std::slice; +use std::str::FromStr; use std::sync::Arc; use thiserror::Error; @@ -440,6 +444,103 @@ pub unsafe extern "C" fn cass_statement_set_tracing( CassError::CASS_OK } +#[unsafe(no_mangle)] +pub unsafe extern "C" fn cass_statement_set_host( + statement_raw: CassBorrowedExclusivePtr, + host: *const c_char, + port: c_int, +) -> CassError { + unsafe { cass_statement_set_host_n(statement_raw, host, strlen(host), port) } +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn cass_statement_set_host_n( + statement_raw: CassBorrowedExclusivePtr, + host: *const c_char, + host_length: size_t, + port: c_int, +) -> CassError { + let Some(statement) = BoxFFI::as_mut_ref(statement_raw) else { + tracing::error!("Provided null statement pointer to cass_statement_set_host_n!"); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + }; + let host = match unsafe { ptr_to_cstr_n(host, host_length) } { + Some(v) => v, + None => { + tracing::error!("Provided null or non-utf8 host pointer to cass_statement_set_host_n!"); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + } + }; + let Ok(port): Result = port.try_into() else { + tracing::error!("Provided invalid port value to cass_statement_set_host_n: {port}"); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + }; + + let address = match IpAddr::from_str(host) { + Ok(ip_addr) => SocketAddr::new(ip_addr, port), + Err(e) => { + tracing::error!("Failed to parse ip address <{}>: {}", host, e); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + } + }; + let enforce_target_lbp = + SingleTargetLoadBalancingPolicy::new(NodeIdentifier::NodeAddress(address), None); + + match &mut statement.statement { + BoundStatement::Simple(inner) => inner + .query + .set_load_balancing_policy(Some(enforce_target_lbp)), + BoundStatement::Prepared(inner) => Arc::make_mut(&mut inner.statement) + .statement + .set_load_balancing_policy(Some(enforce_target_lbp)), + } + + CassError::CASS_OK +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn cass_statement_set_host_inet( + statement_raw: CassBorrowedExclusivePtr, + host: *const CassInet, + port: c_int, +) -> CassError { + let Some(statement) = BoxFFI::as_mut_ref(statement_raw) else { + tracing::error!("Provided null statement pointer to cass_statement_set_host_inet!"); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + }; + if host.is_null() { + tracing::error!("Provided null host pointer to cass_statement_set_host_inet!"); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + } + // SAFETY: Assuming that user provided valid pointer. + let ip_addr: IpAddr = match unsafe { *host }.try_into() { + Ok(ip_addr) => ip_addr, + Err(_) => { + tracing::error!("Provided invalid CassInet value to cass_statement_set_host_inet!"); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + } + }; + let Ok(port): Result = port.try_into() else { + tracing::error!("Provided invalid port value to cass_statement_set_host_n: {port}"); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + }; + + let address = SocketAddr::new(ip_addr, port); + let enforce_target_lbp = + SingleTargetLoadBalancingPolicy::new(NodeIdentifier::NodeAddress(address), None); + + match &mut statement.statement { + BoundStatement::Simple(inner) => inner + .query + .set_load_balancing_policy(Some(enforce_target_lbp)), + BoundStatement::Prepared(inner) => Arc::make_mut(&mut inner.statement) + .statement + .set_load_balancing_policy(Some(enforce_target_lbp)), + } + + CassError::CASS_OK +} + #[unsafe(no_mangle)] pub unsafe extern "C" fn cass_statement_set_retry_policy( statement: CassBorrowedExclusivePtr, @@ -694,3 +795,141 @@ make_binders!( cass_statement_bind_user_type_by_name, cass_statement_bind_user_type_by_name_n ); + +#[cfg(test)] +mod tests { + use std::net::IpAddr; + use std::ptr::addr_of; + use std::str::FromStr; + + use crate::argconv::BoxFFI; + use crate::cass_error::CassError; + use crate::inet::CassInet; + use crate::statement::{cass_statement_set_host, cass_statement_set_host_inet}; + use crate::testing::assert_cass_error_eq; + + use super::{cass_statement_free, cass_statement_new}; + + #[test] + fn test_statement_set_host() { + unsafe { + let mut statement_raw = cass_statement_new(c"dummy".as_ptr(), 0); + + // cass_statement_set_host + { + // Null statement + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_host(BoxFFI::null_mut(), c"127.0.0.1".as_ptr(), 9042) + ); + + // Null ip address + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_host(statement_raw.borrow_mut(), std::ptr::null(), 9042) + ); + + // Unparsable ip address + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_host(statement_raw.borrow_mut(), c"invalid".as_ptr(), 9042) + ); + + // Negative port + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_host(statement_raw.borrow_mut(), c"127.0.0.1".as_ptr(), -1) + ); + + // Port too big + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_host( + statement_raw.borrow_mut(), + c"127.0.0.1".as_ptr(), + 70000 + ) + ); + + // Valid ip address and port + assert_cass_error_eq!( + CassError::CASS_OK, + cass_statement_set_host( + statement_raw.borrow_mut(), + c"127.0.0.1".as_ptr(), + 9042 + ) + ); + } + + // cass_statement_set_host_inet + { + let valid_inet: CassInet = IpAddr::from_str("127.0.0.1").unwrap().into(); + + let invalid_inet = CassInet { + address: [0; 16], + // invalid length - should be 4 or 16 + address_length: 3, + }; + + // Null statement + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_host_inet(BoxFFI::null_mut(), addr_of!(valid_inet), 9042) + ); + + // Null CassInet + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_host_inet( + statement_raw.borrow_mut(), + std::ptr::null(), + 9042 + ) + ); + + // Invalid CassInet + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_host_inet( + statement_raw.borrow_mut(), + addr_of!(invalid_inet), + 9042 + ) + ); + + // Negative port + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_host_inet( + statement_raw.borrow_mut(), + addr_of!(valid_inet), + -1 + ) + ); + + // Port too big + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_host_inet( + statement_raw.borrow_mut(), + addr_of!(valid_inet), + 70000 + ) + ); + + // Valid ip address and port + assert_cass_error_eq!( + CassError::CASS_OK, + cass_statement_set_host_inet( + statement_raw.borrow_mut(), + addr_of!(valid_inet), + 9042 + ) + ); + } + + cass_statement_free(statement_raw); + } + } +} From 490bfcda47e8e8a338e010ec7d8f1b6bbcb58ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Fri, 9 May 2025 16:03:47 +0200 Subject: [PATCH 03/12] query_result: store coordinator in CassResult I decided to store coordinator as Option. This is because we have to somehow mock it in unit tests - rust-driver does not expose any way to mock the coordinator. --- scylla-rust-wrapper/src/query_result.rs | 11 +++++++++-- scylla-rust-wrapper/src/session.rs | 3 ++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/scylla-rust-wrapper/src/query_result.rs b/scylla-rust-wrapper/src/query_result.rs index ac6d763e..a4744d17 100644 --- a/scylla-rust-wrapper/src/query_result.rs +++ b/scylla-rust-wrapper/src/query_result.rs @@ -17,8 +17,8 @@ use scylla::deserialize::row::{ use scylla::deserialize::value::DeserializeValue; use scylla::errors::{DeserializationError, IntoRowsResultError, TypeCheckError}; use scylla::frame::response::result::{ColumnSpec, DeserializedMetadataAndRawRows}; -use scylla::response::PagingStateResponse; use scylla::response::query_result::{ColumnSpecs, QueryResult}; +use scylla::response::{Coordinator, PagingStateResponse}; use scylla::value::{ Counter, CqlDate, CqlDecimalBorrowed, CqlDuration, CqlTime, CqlTimestamp, CqlTimeuuid, }; @@ -50,6 +50,9 @@ pub struct CassResult { pub tracing_id: Option, pub paging_state_response: PagingStateResponse, pub kind: CassResultKind, + // None only for tests - currently no way to mock coordinator in rust-driver. + // Should be able to do so under "cpp_rust_unstable". + pub(crate) coordinator: Option, } impl CassResult { @@ -73,7 +76,7 @@ impl CassResult { )) }); - let (raw_rows, tracing_id, _, _coordinator) = rows_result.into_inner(); + let (raw_rows, tracing_id, _, coordinator) = rows_result.into_inner(); let shared_data = Arc::new(CassRowsResultSharedData { raw_rows, metadata }); let first_row = RowWithSelfBorrowedResultData::first_from_raw_rows_and_metadata( Arc::clone(&shared_data), @@ -86,6 +89,7 @@ impl CassResult { shared_data, first_row, }), + coordinator, }; Ok(cass_result) @@ -95,6 +99,7 @@ impl CassResult { tracing_id: result.tracing_id(), paging_state_response, kind: CassResultKind::NonRows, + coordinator: Some(result.request_coordinator().clone()), }; Ok(cass_result) @@ -1207,6 +1212,7 @@ mod tests { shared_data, first_row, }), + coordinator: None, } } @@ -1310,6 +1316,7 @@ mod tests { tracing_id: None, paging_state_response: PagingStateResponse::NoMorePages, kind: CassResultKind::NonRows, + coordinator: None, } } diff --git a/scylla-rust-wrapper/src/session.rs b/scylla-rust-wrapper/src/session.rs index 2612c9e2..5fad5414 100644 --- a/scylla-rust-wrapper/src/session.rs +++ b/scylla-rust-wrapper/src/session.rs @@ -239,10 +239,11 @@ pub unsafe extern "C" fn cass_session_execute_batch( let query_res = session.batch(&state.batch, &state.bound_values).await; match query_res { - Ok(_result) => Ok(CassResultValue::QueryResult(Arc::new(CassResult { + Ok(result) => Ok(CassResultValue::QueryResult(Arc::new(CassResult { tracing_id: None, paging_state_response: PagingStateResponse::NoMorePages, kind: CassResultKind::NonRows, + coordinator: Some(result.request_coordinator().clone()), }))), Err(err) => Ok(CassResultValue::QueryError(Arc::new(err.into()))), } From 25cb370a4b755be728656b15dda369df830ca5dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Mon, 12 May 2025 12:24:42 +0200 Subject: [PATCH 04/12] future: implement cass_future_coordinator There is one tricky part that we need to handle: the coordinator is held by CassResult, which is stored under Mutex in CassFuture. In result, the reference we obtain in `CassFuture::with_waited_result` closure has a lifetime of the mutex guard (temporary lifetime during the function call). We need to extend the lifetime of returned coordinator - further reasoning is explained in the comment in code. --- scylla-rust-wrapper/src/future.rs | 35 ++++++++++++++++++++++++- scylla-rust-wrapper/src/query_result.rs | 7 +++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 0b279a3c..192d07bf 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -5,10 +5,11 @@ use crate::cass_error::CassErrorMessage; use crate::cass_error::ToCassError; use crate::execution_error::CassErrorResult; use crate::prepared::CassPrepared; -use crate::query_result::CassResult; +use crate::query_result::{CassNode, CassResult}; use crate::types::*; use crate::uuid::CassUuid; use futures::future; +use scylla::response::Coordinator; use std::future::Future; use std::mem; use std::os::raw::c_void; @@ -476,6 +477,38 @@ pub unsafe extern "C" fn cass_future_tracing_id( }) } +#[unsafe(no_mangle)] +pub unsafe extern "C" fn cass_future_coordinator( + future_raw: CassBorrowedSharedPtr, +) -> CassBorrowedSharedPtr { + let Some(future) = ArcFFI::as_ref(future_raw) else { + tracing::error!("Provided null future to cass_future_coordinator!"); + return RefFFI::null(); + }; + + future.with_waited_result(|r| match r { + Ok(CassResultValue::QueryResult(result)) => { + // unwrap: Coordinator is `None` only for tests. + let coordinator_ptr = result.coordinator.as_ref().unwrap() as *const Coordinator; + + // We need to 'extend' the lifetime of returned Coordinator so safe FFI api does not complain. + // The lifetime of "result" reference provided to this closure is the lifetime of a mutex guard. + // We are guaranteed, that once the future is resolved (i.e. this closure is called), the result will not + // be modified in any way. Thus, we can guarantee that returned coordinator lives as long as underlying + // CassResult lives (i.e. longer than the lifetime of acquired mutex guard). + // + // SAFETY: Coordinator's lifetime is tied to the lifetime of underlying CassResult, thus: + // 1. Coordinator lives as long as the underlying CassResult lives + // 2. Coordinator will not be moved as long as underlying CassResult is not freed + // 3. Coordinator is immutable once future is resolved (because CassResult is set once) + let coordinator_ref = unsafe { &*coordinator_ptr }; + + RefFFI::as_ptr(coordinator_ref) + } + _ => RefFFI::null(), + }) +} + #[cfg(test)] mod tests { use crate::testing::{assert_cass_error_eq, assert_cass_future_error_message_eq}; diff --git a/scylla-rust-wrapper/src/query_result.rs b/scylla-rust-wrapper/src/query_result.rs index a4744d17..26de9688 100644 --- a/scylla-rust-wrapper/src/query_result.rs +++ b/scylla-rust-wrapper/src/query_result.rs @@ -46,6 +46,13 @@ pub(crate) struct CassRowsResultSharedData { pub(crate) metadata: Arc, } +pub type CassNode = Coordinator; + +// Borrowed from CassResult in cass_future_coordinator. +impl FFI for CassNode { + type Origin = FromRef; +} + pub struct CassResult { pub tracing_id: Option, pub paging_state_response: PagingStateResponse, From 9deb70a111bf0da947010ff49c2f37aab05eb9d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Mon, 12 May 2025 12:36:46 +0200 Subject: [PATCH 05/12] statement: implement cass_future_set_node --- scylla-rust-wrapper/src/statement.rs | 52 ++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/scylla-rust-wrapper/src/statement.rs b/scylla-rust-wrapper/src/statement.rs index 87bef0f4..33eabf2d 100644 --- a/scylla-rust-wrapper/src/statement.rs +++ b/scylla-rust-wrapper/src/statement.rs @@ -3,7 +3,7 @@ use crate::cass_types::CassConsistency; use crate::exec_profile::PerStatementExecProfile; use crate::inet::CassInet; use crate::prepared::CassPrepared; -use crate::query_result::CassResult; +use crate::query_result::{CassNode, CassResult}; use crate::retry_policy::CassRetryPolicy; use crate::types::*; use crate::value::CassCqlValue; @@ -541,6 +541,35 @@ pub unsafe extern "C" fn cass_statement_set_host_inet( CassError::CASS_OK } +#[unsafe(no_mangle)] +pub unsafe extern "C" fn cass_statement_set_node( + statement_raw: CassBorrowedExclusivePtr, + node_raw: CassBorrowedSharedPtr, +) -> CassError { + let Some(statement) = BoxFFI::as_mut_ref(statement_raw) else { + tracing::error!("Provided null statement pointer to cass_statement_set_node!"); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + }; + let Some(node) = RefFFI::as_ref(node_raw) else { + tracing::error!("Provided null node pointer to cass_statement_set_node!"); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + }; + + let enforce_target_lbp = + SingleTargetLoadBalancingPolicy::new(NodeIdentifier::Node(Arc::clone(node.node())), None); + + match &mut statement.statement { + BoundStatement::Simple(inner) => inner + .query + .set_load_balancing_policy(Some(enforce_target_lbp)), + BoundStatement::Prepared(inner) => Arc::make_mut(&mut inner.statement) + .statement + .set_load_balancing_policy(Some(enforce_target_lbp)), + } + + CassError::CASS_OK +} + #[unsafe(no_mangle)] pub unsafe extern "C" fn cass_statement_set_retry_policy( statement: CassBorrowedExclusivePtr, @@ -802,10 +831,12 @@ mod tests { use std::ptr::addr_of; use std::str::FromStr; - use crate::argconv::BoxFFI; + use crate::argconv::{BoxFFI, RefFFI}; use crate::cass_error::CassError; use crate::inet::CassInet; - use crate::statement::{cass_statement_set_host, cass_statement_set_host_inet}; + use crate::statement::{ + cass_statement_set_host, cass_statement_set_host_inet, cass_statement_set_node, + }; use crate::testing::assert_cass_error_eq; use super::{cass_statement_free, cass_statement_new}; @@ -929,6 +960,21 @@ mod tests { ); } + // cass_statement_set_node + { + // Null statement + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_node(BoxFFI::null_mut(), RefFFI::null()) + ); + + // Null CassNode + assert_cass_error_eq!( + CassError::CASS_ERROR_LIB_BAD_PARAMS, + cass_statement_set_node(statement_raw.borrow_mut(), RefFFI::null()) + ); + } + cass_statement_free(statement_raw); } } From c0c7e670115eb269f6e2f4d9a4779560bc618e53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Mon, 12 May 2025 12:40:24 +0200 Subject: [PATCH 06/12] testing: remove coordinator-related methods from unimplemented --- src/testing_unimplemented.cpp | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/src/testing_unimplemented.cpp b/src/testing_unimplemented.cpp index 960ab3e2..9ba9b257 100644 --- a/src/testing_unimplemented.cpp +++ b/src/testing_unimplemented.cpp @@ -162,9 +162,6 @@ CASS_EXPORT const CassDataType* cass_function_meta_return_type(const CassFunctionMeta* function_meta) { throw std::runtime_error("UNIMPLEMENTED cass_function_meta_return_type\n"); } -CASS_EXPORT const CassNode* cass_future_coordinator(CassFuture* future) { - throw std::runtime_error("UNIMPLEMENTED cass_future_coordinator\n"); -} CASS_EXPORT const CassValue* cass_index_meta_field_by_name(const CassIndexMeta* index_meta, const char* name) { throw std::runtime_error("UNIMPLEMENTED cass_index_meta_field_by_name\n"); @@ -220,20 +217,9 @@ CASS_EXPORT CassError cass_statement_set_custom_payload(CassStatement* statement const CassCustomPayload* payload) { throw std::runtime_error("UNIMPLEMENTED cass_statement_set_custom_payload\n"); } -CASS_EXPORT CassError cass_statement_set_host(CassStatement* statement, const char* host, - int port) { - throw std::runtime_error("UNIMPLEMENTED cass_statement_set_host\n"); -} -CASS_EXPORT CassError cass_statement_set_host_inet(CassStatement* statement, const CassInet* host, - int port) { - throw std::runtime_error("UNIMPLEMENTED cass_statement_set_host_inet\n"); -} CASS_EXPORT CassError cass_statement_set_keyspace(CassStatement* statement, const char* keyspace) { throw std::runtime_error("UNIMPLEMENTED cass_statement_set_keyspace\n"); } -CASS_EXPORT CassError cass_statement_set_node(CassStatement* statement, const CassNode* node) { - throw std::runtime_error("UNIMPLEMENTED cass_statement_set_node\n"); -} CASS_EXPORT CassClusteringOrder cass_table_meta_clustering_key_order(const CassTableMeta* table_meta, size_t index) { throw std::runtime_error("UNIMPLEMENTED cass_table_meta_clustering_key_order\n"); From 681ba5826b375cc2c65fed973000dee3991489b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Mon, 12 May 2025 13:18:38 +0200 Subject: [PATCH 07/12] ci: enable StatementNoClusterTests suite The suite contains 4 tests: - SetHostWithValidHostString -> calls cass_statment_set_host with valid ip addresses and port. Expects CASS_OK to be returned - SetHostWithInvalidHostString -> calls cass_statement_set_host with invalid ip addressed ("inavlid", "", NULL). Expects LIB_BAD_PARAMS to be returned - SetHostWithValidHostInet -> calls cass_statement_set_host_inet with valid ip addresses Expects CASS_OK. - SetHostWithInvalidHostInet -> calls cass_statement_set_host_inet with invalid CassInet struct. Expects LIB_BAD_PARAMS --- Makefile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Makefile b/Makefile index a3f014f3..753d0929 100644 --- a/Makefile +++ b/Makefile @@ -12,6 +12,7 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :SerialConsistencyTests.*\ :HeartbeatTests.*\ :PreparedTests.*\ +:StatementNoClusterTests.*\ :NamedParametersTests.*\ :CassandraTypes/CassandraTypesTests/*.Integration_Cassandra_*\ :ControlConnectionTests.*\ @@ -69,6 +70,7 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :SerialConsistencyTests.*\ :HeartbeatTests.*\ :PreparedTests.*\ +:StatementNoClusterTests.*\ :NamedParametersTests.*\ :CassandraTypes/CassandraTypesTests/*.Integration_Cassandra_*\ :ControlConnectionTests.*\ From 9a619c0814e8610c93c974dd604cdad0d1cdf6bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Mon, 12 May 2025 13:22:25 +0200 Subject: [PATCH 08/12] ci: enable StatementTests suite This suite contains 5 tests: - SetHost -> it enforces "127.0.0.1:9042" host using cass_statement_set_host. Then, it fetches the rpc_address from system.local and checks whether it matches enforced address (twice). - SetHostInet -> the same as above, but using cass_statement_set_host_inet (CassInet instead of String). - SetNode -> executes "SELECT rpc_address from system.local" on random node. Then it gets the coordinator of this request (using cass_future_coordinator) and enforces this coordinator (cass_statement_set_node) on the same statement. Checks whether addresses match. - SetHostWithInvalidPort -> tries to enforce host with unknown port (8888). Expects LIB_NO_HOST_AVAILABLE. - SetHostWhereHostIsDown -> stops the node, and then tried to enforce it as a coordinator for some request. Expects LIB_NO_HOST_AVAILABLE. --- Makefile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Makefile b/Makefile index 753d0929..6991fcd9 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,7 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :HeartbeatTests.*\ :PreparedTests.*\ :StatementNoClusterTests.*\ +:StatementTests.*\ :NamedParametersTests.*\ :CassandraTypes/CassandraTypesTests/*.Integration_Cassandra_*\ :ControlConnectionTests.*\ @@ -71,6 +72,7 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :HeartbeatTests.*\ :PreparedTests.*\ :StatementNoClusterTests.*\ +:StatementTests.*\ :NamedParametersTests.*\ :CassandraTypes/CassandraTypesTests/*.Integration_Cassandra_*\ :ControlConnectionTests.*\ From 18adf5912835fce64895d5d9e7ebe08ae426d3be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Mon, 12 May 2025 13:32:27 +0200 Subject: [PATCH 09/12] ci: enable ServerSideFailureThreeNodeTests suite It contains two tests. Both of the tests use 3-nodes cluster (single dc) and RF=3. Both of them try to enforce the "127.0.0.1" host for some read/write request with cl=LOCAL_QUORUM. - ErrorReadWriteTimeout -> It **pauses** two remaining nodes and expects server-side READ/WRITE_TIMEOUT - ErrorUnavailable -> It **stops** two remaining nodes and expects UNAVAILABLE server-side error. --- Makefile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Makefile b/Makefile index 6991fcd9..f6446285 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,7 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :PreparedMetadataTests.*\ :UseKeyspaceCaseSensitiveTests.*\ :ServerSideFailureTests.*\ +:ServerSideFailureThreeNodeTests.*\ :TimestampTests.*\ :MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\ :MetricsTests.Integration_Cassandra_Requests\ @@ -87,6 +88,7 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :PreparedMetadataTests.*\ :UseKeyspaceCaseSensitiveTests.*\ :ServerSideFailureTests.*\ +:ServerSideFailureThreeNodeTests.*\ :TimestampTests.*\ :MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\ :MetricsTests.Integration_Cassandra_Requests\ From f356a4428afb949288744ddc6281a467e7bfa0ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Mon, 12 May 2025 14:29:46 +0200 Subject: [PATCH 10/12] testing: implement get_host_from_future Added rust utilities to obtain a stringified ip address of request coordinator from future. Implemented `get_host_from_future` on top of them. This utility method is used in integration tests. We cannot enable any yet - they require other features (e.g. filtering config methods). --- .../src/integration_testing.rs | 49 ++++++++++++++++++- src/testing.cpp | 17 ++++++- src/testing_rust_impls.h | 9 ++++ 3 files changed, 73 insertions(+), 2 deletions(-) diff --git a/scylla-rust-wrapper/src/integration_testing.rs b/scylla-rust-wrapper/src/integration_testing.rs index bf372eaa..02119d38 100644 --- a/scylla-rust-wrapper/src/integration_testing.rs +++ b/scylla-rust-wrapper/src/integration_testing.rs @@ -7,9 +7,12 @@ use scylla::errors::{RequestAttemptError, RequestError}; use scylla::observability::history::{AttemptId, HistoryListener, RequestId, SpeculativeId}; use scylla::policies::retry::RetryDecision; -use crate::argconv::{BoxFFI, CMut, CassBorrowedExclusivePtr}; +use crate::argconv::{ + ArcFFI, BoxFFI, CConst, CMut, CassBorrowedExclusivePtr, CassBorrowedSharedPtr, +}; use crate::batch::CassBatch; use crate::cluster::CassCluster; +use crate::future::{CassFuture, CassResultValue}; use crate::statement::{BoundStatement, CassStatement}; use crate::types::{cass_int32_t, cass_uint16_t, cass_uint64_t, size_t}; @@ -64,6 +67,50 @@ pub unsafe extern "C" fn testing_free_contact_points(contact_points: *mut c_char let _ = unsafe { CString::from_raw(contact_points) }; } +#[unsafe(no_mangle)] +pub unsafe extern "C" fn testing_future_get_host( + future_raw: CassBorrowedSharedPtr, + host: *mut *mut c_char, + host_length: *mut size_t, +) { + let Some(future) = ArcFFI::as_ref(future_raw) else { + tracing::error!("Provided null future pointer to testing_future_get_host!"); + unsafe { + *host = std::ptr::null_mut(); + *host_length = 0; + }; + return; + }; + + future.with_waited_result(|r| match r { + Ok(CassResultValue::QueryResult(result)) => { + // unwrap: Coordinator is none only for unit tests. + let coordinator = result.coordinator.as_ref().unwrap(); + + let ip_addr_str = coordinator.node().address.ip().to_string(); + let length = ip_addr_str.len(); + + let ip_addr_cstr = CString::new(ip_addr_str).expect( + "String obtained from IpAddr::to_string() should not contain any nul bytes!", + ); + + unsafe { + *host = ip_addr_cstr.into_raw(); + *host_length = length as size_t + }; + } + _ => unsafe { + *host = std::ptr::null_mut(); + *host_length = 0; + }, + }) +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn testing_free_host(host: *mut c_char) { + let _ = unsafe { CString::from_raw(host) }; +} + #[derive(Debug)] struct SleepingHistoryListener(Duration); diff --git a/src/testing.cpp b/src/testing.cpp index 7e397901..5a15764f 100644 --- a/src/testing.cpp +++ b/src/testing.cpp @@ -32,7 +32,22 @@ namespace datastax { namespace internal { namespace testing { using namespace core; String get_host_from_future(CassFuture* future) { - throw std::runtime_error("Unimplemented 'get_host_from_future'!"); + char* host; + size_t host_length; + + testing_future_get_host(future, &host, &host_length); + + if (host == nullptr) { + throw std::runtime_error("CassFuture returned a null host string."); + } + + std::string host_str(host, host_length); + OStringStream ss; + ss << host_str; + + testing_free_host(host); + + return ss.str(); } StringVec get_attempted_hosts_from_future(CassFuture* future) { diff --git a/src/testing_rust_impls.h b/src/testing_rust_impls.h index e6f07800..a26aa68f 100644 --- a/src/testing_rust_impls.h +++ b/src/testing_rust_impls.h @@ -22,6 +22,15 @@ CASS_EXPORT void testing_cluster_get_contact_points(CassCluster* cluster, char** CASS_EXPORT void testing_free_contact_points(char* contact_points); +// Returns an ip address of request coordinator. +// +// This method fails if the future resolved to some error. +// +// On success, it allocates a host string which needs to be then freed wih `testing_free_host`. +CASS_EXPORT void testing_future_get_host(const CassFuture* future, char** host, size_t* host_length); + +CASS_EXPORT void testing_free_host(char* host); + // Sets a sleeping history listener on the statement. // This can be used to enforce a sleep time during statement execution, which increases the latency. CASS_EXPORT void testing_statement_set_sleeping_history_listener(CassStatement *statement, From d4b32e510340f77aac34b30671e1e6b4d42f12e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Mon, 12 May 2025 14:32:12 +0200 Subject: [PATCH 11/12] testing: de-duplicate cstring freeing logic Replaced `testing_free_contact_points` and `testing_free_host` with one common method `testing_free_cstring`. --- scylla-rust-wrapper/src/integration_testing.rs | 9 ++------- src/testing.cpp | 4 ++-- src/testing_rust_impls.h | 8 +++----- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/scylla-rust-wrapper/src/integration_testing.rs b/scylla-rust-wrapper/src/integration_testing.rs index 02119d38..ce97ac48 100644 --- a/scylla-rust-wrapper/src/integration_testing.rs +++ b/scylla-rust-wrapper/src/integration_testing.rs @@ -62,11 +62,6 @@ pub unsafe extern "C" fn testing_cluster_get_contact_points( } } -#[unsafe(no_mangle)] -pub unsafe extern "C" fn testing_free_contact_points(contact_points: *mut c_char) { - let _ = unsafe { CString::from_raw(contact_points) }; -} - #[unsafe(no_mangle)] pub unsafe extern "C" fn testing_future_get_host( future_raw: CassBorrowedSharedPtr, @@ -107,8 +102,8 @@ pub unsafe extern "C" fn testing_future_get_host( } #[unsafe(no_mangle)] -pub unsafe extern "C" fn testing_free_host(host: *mut c_char) { - let _ = unsafe { CString::from_raw(host) }; +pub unsafe extern "C" fn testing_free_cstring(s: *mut c_char) { + let _ = unsafe { CString::from_raw(s) }; } #[derive(Debug)] diff --git a/src/testing.cpp b/src/testing.cpp index 5a15764f..2f7d9c12 100644 --- a/src/testing.cpp +++ b/src/testing.cpp @@ -45,7 +45,7 @@ String get_host_from_future(CassFuture* future) { OStringStream ss; ss << host_str; - testing_free_host(host); + testing_free_cstring(host); return ss.str(); } @@ -74,7 +74,7 @@ String get_contact_points_from_cluster(CassCluster* cluster) { OStringStream ss; ss << contact_points_str; - testing_free_contact_points(contact_points); + testing_free_cstring(contact_points); return ss.str(); } diff --git a/src/testing_rust_impls.h b/src/testing_rust_impls.h index a26aa68f..a5866ff7 100644 --- a/src/testing_rust_impls.h +++ b/src/testing_rust_impls.h @@ -16,20 +16,18 @@ CASS_EXPORT cass_int32_t testing_cluster_get_port(CassCluster* cluster); // Then, the resulting pointer is set to null. // // On success, this function allocates a contact points string, which needs to be then -// freed with `testing_free_contact_points`. +// freed with `testing_free_cstring`. CASS_EXPORT void testing_cluster_get_contact_points(CassCluster* cluster, char** contact_points, size_t* contact_points_length); -CASS_EXPORT void testing_free_contact_points(char* contact_points); - // Returns an ip address of request coordinator. // // This method fails if the future resolved to some error. // -// On success, it allocates a host string which needs to be then freed wih `testing_free_host`. +// On success, it allocates a host string which needs to be then freed wih `testing_free_cstring`. CASS_EXPORT void testing_future_get_host(const CassFuture* future, char** host, size_t* host_length); -CASS_EXPORT void testing_free_host(char* host); +CASS_EXPORT void testing_free_cstring(char *s); // Sets a sleeping history listener on the statement. // This can be used to enforce a sleep time during statement execution, which increases the latency. From 665266ce562af98b0007814a31588f9498d77b32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Mon, 12 May 2025 16:13:32 +0200 Subject: [PATCH 12/12] future: store result in OnceLock The result is going to be initialized only once, thus we do not need to store it behind a mutex. We can use OnceLock instead. Thanks to that, we can remove the unsafe logic which extends the lifetime of `coordinator` reference in cass_future_coordinator. We now guarantee that the result will be immutable once future is resolved - the guarantee is provided on the type-level. --- scylla-rust-wrapper/src/future.rs | 66 +++++++++++-------------- scylla-rust-wrapper/src/query_result.rs | 10 +++- 2 files changed, 37 insertions(+), 39 deletions(-) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 192d07bf..b20d2cd0 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -9,14 +9,14 @@ use crate::query_result::{CassNode, CassResult}; use crate::types::*; use crate::uuid::CassUuid; use futures::future; -use scylla::response::Coordinator; use std::future::Future; use std::mem; use std::os::raw::c_void; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex, OnceLock}; use tokio::task::JoinHandle; use tokio::time::Duration; +#[derive(Debug)] pub enum CassResultValue { Empty, QueryResult(Arc), @@ -51,7 +51,6 @@ impl BoundCallback { #[derive(Default)] struct CassFutureState { - value: Option, err_string: Option, callback: Option, join_handle: Option>, @@ -59,6 +58,7 @@ struct CassFutureState { pub struct CassFuture { state: Mutex, + result: OnceLock, wait_for_value: Condvar, } @@ -88,6 +88,7 @@ impl CassFuture { ) -> Arc { let cass_fut = Arc::new(CassFuture { state: Mutex::new(Default::default()), + result: OnceLock::new(), wait_for_value: Condvar::new(), }); let cass_fut_clone = Arc::clone(&cass_fut); @@ -95,7 +96,10 @@ impl CassFuture { let r = fut.await; let maybe_cb = { let mut guard = cass_fut_clone.state.lock().unwrap(); - guard.value = Some(r); + cass_fut_clone + .result + .set(r) + .expect("Tried to resolve future result twice!"); // Take the callback and call it after releasing the lock guard.callback.take() }; @@ -116,16 +120,17 @@ impl CassFuture { pub fn new_ready(r: CassFutureResult) -> Arc { Arc::new(CassFuture { - state: Mutex::new(CassFutureState { - value: Some(r), - ..Default::default() - }), + state: Mutex::new(CassFutureState::default()), + result: OnceLock::from(r), wait_for_value: Condvar::new(), }) } - pub fn with_waited_result(&self, f: impl FnOnce(&mut CassFutureResult) -> T) -> T { - self.with_waited_state(|s| f(s.value.as_mut().unwrap())) + pub fn with_waited_result<'s, T>(&'s self, f: impl FnOnce(&'s CassFutureResult) -> T) -> T + where + T: 's, + { + self.with_waited_state(|_| f(self.result.get().unwrap())) } /// Awaits the future until completion. @@ -154,7 +159,7 @@ impl CassFuture { guard = self .wait_for_value .wait_while(guard, |state| { - state.value.is_none() && state.join_handle.is_none() + self.result.get().is_none() && state.join_handle.is_none() }) // unwrap: Error appears only when mutex is poisoned. .unwrap(); @@ -172,10 +177,10 @@ impl CassFuture { fn with_waited_result_timed( &self, - f: impl FnOnce(&mut CassFutureResult) -> T, + f: impl FnOnce(&CassFutureResult) -> T, timeout_duration: Duration, ) -> Result { - self.with_waited_state_timed(|s| f(s.value.as_mut().unwrap()), timeout_duration) + self.with_waited_state_timed(|_| f(self.result.get().unwrap()), timeout_duration) } /// Tries to await the future with a given timeout. @@ -243,7 +248,7 @@ impl CassFuture { let (guard_result, timeout_result) = self .wait_for_value .wait_timeout_while(guard, remaining_timeout, |state| { - state.value.is_none() && state.join_handle.is_none() + self.result.get().is_none() && state.join_handle.is_none() }) // unwrap: Error appears only when mutex is poisoned. .unwrap(); @@ -276,7 +281,7 @@ impl CassFuture { return CassError::CASS_ERROR_LIB_CALLBACK_ALREADY_SET; } let bound_cb = BoundCallback { cb, data }; - if lock.value.is_some() { + if self.result.get().is_some() { // The value is already available, we need to call the callback ourselves mem::drop(lock); bound_cb.invoke(self_ptr); @@ -346,8 +351,7 @@ pub unsafe extern "C" fn cass_future_ready( return cass_false; }; - let state_guard = future.state.lock().unwrap(); - match state_guard.value { + match future.result.get() { None => cass_false, Some(_) => cass_true, } @@ -362,7 +366,7 @@ pub unsafe extern "C" fn cass_future_error_code( return CassError::CASS_ERROR_LIB_BAD_PARAMS; }; - future.with_waited_result(|r: &mut CassFutureResult| match r { + future.with_waited_result(|r: &CassFutureResult| match r { Ok(CassResultValue::QueryError(err)) => err.to_cass_error(), Err((err, _)) => *err, _ => CassError::CASS_OK, @@ -381,7 +385,7 @@ pub unsafe extern "C" fn cass_future_error_message( }; future.with_waited_state(|state: &mut CassFutureState| { - let value = &state.value; + let value = future.result.get(); let msg = state .err_string .get_or_insert_with(|| match value.as_ref().unwrap() { @@ -408,7 +412,7 @@ pub unsafe extern "C" fn cass_future_get_result( }; future - .with_waited_result(|r: &mut CassFutureResult| -> Option> { + .with_waited_result(|r: &CassFutureResult| -> Option> { match r.as_ref().ok()? { CassResultValue::QueryResult(qr) => Some(Arc::clone(qr)), _ => None, @@ -427,7 +431,7 @@ pub unsafe extern "C" fn cass_future_get_error_result( }; future - .with_waited_result(|r: &mut CassFutureResult| -> Option> { + .with_waited_result(|r: &CassFutureResult| -> Option> { match r.as_ref().ok()? { CassResultValue::QueryError(qr) => Some(Arc::clone(qr)), _ => None, @@ -446,7 +450,7 @@ pub unsafe extern "C" fn cass_future_get_prepared( }; future - .with_waited_result(|r: &mut CassFutureResult| -> Option> { + .with_waited_result(|r: &CassFutureResult| -> Option> { match r.as_ref().ok()? { CassResultValue::Prepared(p) => Some(Arc::clone(p)), _ => None, @@ -465,7 +469,7 @@ pub unsafe extern "C" fn cass_future_tracing_id( return CassError::CASS_ERROR_LIB_BAD_PARAMS; }; - future.with_waited_result(|r: &mut CassFutureResult| match r { + future.with_waited_result(|r: &CassFutureResult| match r { Ok(CassResultValue::QueryResult(result)) => match result.tracing_id { Some(id) => { unsafe { *tracing_id = CassUuid::from(id) }; @@ -489,21 +493,7 @@ pub unsafe extern "C" fn cass_future_coordinator( future.with_waited_result(|r| match r { Ok(CassResultValue::QueryResult(result)) => { // unwrap: Coordinator is `None` only for tests. - let coordinator_ptr = result.coordinator.as_ref().unwrap() as *const Coordinator; - - // We need to 'extend' the lifetime of returned Coordinator so safe FFI api does not complain. - // The lifetime of "result" reference provided to this closure is the lifetime of a mutex guard. - // We are guaranteed, that once the future is resolved (i.e. this closure is called), the result will not - // be modified in any way. Thus, we can guarantee that returned coordinator lives as long as underlying - // CassResult lives (i.e. longer than the lifetime of acquired mutex guard). - // - // SAFETY: Coordinator's lifetime is tied to the lifetime of underlying CassResult, thus: - // 1. Coordinator lives as long as the underlying CassResult lives - // 2. Coordinator will not be moved as long as underlying CassResult is not freed - // 3. Coordinator is immutable once future is resolved (because CassResult is set once) - let coordinator_ref = unsafe { &*coordinator_ptr }; - - RefFFI::as_ptr(coordinator_ref) + RefFFI::as_ptr(result.coordinator.as_ref().unwrap()) } _ => RefFFI::null(), }) diff --git a/scylla-rust-wrapper/src/query_result.rs b/scylla-rust-wrapper/src/query_result.rs index 26de9688..738b8473 100644 --- a/scylla-rust-wrapper/src/query_result.rs +++ b/scylla-rust-wrapper/src/query_result.rs @@ -29,17 +29,20 @@ use std::sync::Arc; use thiserror::Error; use uuid::Uuid; +#[derive(Debug)] pub enum CassResultKind { NonRows, Rows(CassRowsResult), } +#[derive(Debug)] pub struct CassRowsResult { // Arc: shared with first_row (yoke). pub(crate) shared_data: Arc, pub(crate) first_row: Option, } +#[derive(Debug)] pub(crate) struct CassRowsResultSharedData { pub(crate) raw_rows: DeserializedMetadataAndRawRows, // Arc: shared with CassPrepared @@ -53,6 +56,7 @@ impl FFI for CassNode { type Origin = FromRef; } +#[derive(Debug)] pub struct CassResult { pub tracing_id: Option, pub paging_state_response: PagingStateResponse, @@ -159,6 +163,7 @@ impl<'frame, 'metadata> DeserializeRow<'frame, 'metadata> for CassRawRow<'frame, /// The lifetime of CassRow is bound to CassResult. /// It will be freed, when CassResult is freed.(see #[cass_result_free]) +#[derive(Debug)] pub struct CassRow<'result> { pub columns: Vec>, pub result_metadata: &'result CassResultMetadata, @@ -230,7 +235,7 @@ mod row_with_self_borrowed_result_data { /// A simple wrapper over CassRow. /// Needed, so we can implement Yokeable for it, instead of implementing it for CassRow. - #[derive(Yokeable)] + #[derive(Yokeable, Debug)] struct CassRowWrapper<'result>(CassRow<'result>); /// A wrapper over struct which self-borrows the metadata allocated using Arc. @@ -243,6 +248,7 @@ mod row_with_self_borrowed_result_data { /// /// This struct is a shared owner of the row bytes and metadata, and self-borrows this data /// to the `CassRow` it contains. + #[derive(Debug)] pub struct RowWithSelfBorrowedResultData( Yoke, Arc>, ); @@ -307,6 +313,7 @@ pub(crate) mod cass_raw_value { use scylla::errors::{DeserializationError, TypeCheckError}; use thiserror::Error; + #[derive(Debug)] pub(crate) struct CassRawValue<'frame, 'metadata> { typ: &'metadata ColumnType<'metadata>, slice: Option>, @@ -428,6 +435,7 @@ pub(crate) mod cass_raw_value { } } +#[derive(Debug)] pub struct CassValue<'result> { pub(crate) value: CassRawValue<'result, 'result>, pub(crate) value_type: &'result Arc,