Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk)!: retry broadcast operations #2337

Merged
merged 19 commits into from
Nov 26, 2024
Merged
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions packages/rs-dapi-client/src/dapi_client.rs
Original file line number Diff line number Diff line change
@@ -227,7 +227,11 @@ impl DapiRequestExecutor for DapiClient {
.address_list
.write()
.expect("can't get address list for write");

tracing::warn!(
?address,
?error,
"received server error, banning address"
);
address_list.ban_address(&address).map_err(|error| {
ExecutionError {
inner: DapiClientError::AddressList(error),
@@ -236,9 +240,18 @@ impl DapiRequestExecutor for DapiClient {
address: Some(address.clone()),
}
})?;
} else {
tracing::debug!(
?address,
?error,
"received server error, we should ban the node but banning is disabled"
);
}
} else {
tracing::trace!(?error, "received error");
tracing::debug!(
?error,
"received server error, most likely the request is invalid"
);
}
}
};
73 changes: 73 additions & 0 deletions packages/rs-dapi-client/src/executor.rs
Original file line number Diff line number Diff line change
@@ -124,6 +124,18 @@ where
/// Result of request execution
pub type ExecutionResult<R, E> = Result<ExecutionResponse<R>, ExecutionError<E>>;

impl<R, E> From<ExecutionResponse<R>> for ExecutionResult<R, E> {
fn from(response: ExecutionResponse<R>) -> Self {
ExecutionResult::<R, E>::Ok(response)
}
}

impl<R, E> From<ExecutionError<E>> for ExecutionResult<R, E> {
fn from(e: ExecutionError<E>) -> Self {
ExecutionResult::<R, E>::Err(e)
}
}

impl<R, E> IntoInner<Result<R, E>> for ExecutionResult<R, E> {
fn into_inner(self) -> Result<R, E> {
match self {
@@ -145,3 +157,64 @@ where
}
}
}

/// Convert Result<T,TE> to ExecutionResult<R,E>, taking context from ExecutionResponse.
pub trait Wrap<R, RE, W>: Sized {
/// Convert self (eg. some [Result]) to [ExecutionResult], taking context information from `W` (eg. ExecutionResponse).
///
/// This function simplifies processing of results by wrapping them into ExecutionResult.
/// It is useful when you have execution result retrieved in previous step and you want to
/// add it to the result of the current step.
///
/// Useful when chaining multiple commands and you want to keep track of retries and address.
///
/// ## Example
///
/// ```rust
/// use rs_dapi_client::{ExecutionResponse, ExecutionResult, Wrap};
///
/// fn some_request() -> ExecutionResult<i8, String> {
/// Ok(ExecutionResponse {
/// inner: 42,
/// retries: 123,
/// address: "http://127.0.0.1".parse().expect("create mock address"),
/// })
/// }
///
/// fn next_step() -> Result<i32, String> {
/// Err("next error".to_string())
/// }
///
/// let response = some_request().expect("request should succeed");
/// let result: ExecutionResult<i32, String> = next_step().wrap(&response);
///
/// if let ExecutionResult::Err(error) = result {
/// assert_eq!(error.inner, "next error");
/// assert_eq!(error.retries, 123);
/// } else {
/// panic!("Expected error");
/// }
/// ```
fn wrap(self, result: &W) -> ExecutionResult<R, RE>;
}

impl<R, RE, TR, IR, IRE> Wrap<R, RE, ExecutionResponse<TR>> for Result<IR, IRE>
where
R: From<IR>,
RE: From<IRE>,
{
fn wrap(self, result: &ExecutionResponse<TR>) -> ExecutionResult<R, RE> {
match self {
Ok(r) => ExecutionResult::Ok(ExecutionResponse {
inner: r.into(),
retries: result.retries,
address: result.address.clone(),
}),
Err(e) => ExecutionResult::Err(ExecutionError {
inner: e.into(),
retries: result.retries,
address: Some(result.address.clone()),
}),
}
}
}
1 change: 1 addition & 0 deletions packages/rs-dapi-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ pub use dapi_client::{DapiClient, DapiClientError};
pub use dump::DumpData;
pub use executor::{
DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult, InnerInto, IntoInner,
Wrap,
};
use futures::{future::BoxFuture, FutureExt};
pub use request_settings::RequestSettings;
5 changes: 4 additions & 1 deletion packages/rs-dapi-client/src/request_settings.rs
Original file line number Diff line number Diff line change
@@ -19,7 +19,10 @@ const DEFAULT_BAN_FAILED_ADDRESS: bool = true;
pub struct RequestSettings {
/// Timeout for establishing a connection.
pub connect_timeout: Option<Duration>,
/// Timeout for a request.
/// Timeout for single request (soft limit).
///
/// Note that the total maximum time of execution can exceed `(timeout + connect_timeout) * retries`
/// as it accounts for internal processing time between retries.
pub timeout: Option<Duration>,
/// Number of retries in case of failed requests. If max retries reached, the last error is returned.
/// 1 means one request and one retry in case of error, etc.
34 changes: 32 additions & 2 deletions packages/rs-dapi-client/src/transport/grpc.rs
Original file line number Diff line number Diff line change
@@ -132,8 +132,38 @@ impl CanRetry for dapi_grpc::tonic::Status {
}
}

/// A shortcut to link between gRPC request type, response type, client and its
/// method in order to represent it in a form of types and data.
/// Macro to implement the `TransportRequest` trait for a given request type, response type, client type, and settings.
///
/// # Parameters
///
/// - `$request:ty`: The request type for which the `TransportRequest` trait will be implemented.
/// - `$response:ty`: The response type returned by the transport request.
/// - `$client:ty`: The client type used to execute the transport request (eg. generated by `tonic` crate).
/// - `$settings:expr`: The settings to be used for the transport request; these settings will override client's
/// default settings, but can still be overriden by arguments to
/// the [`DapiRequestExecutor::execute`](crate::DapiRequestExecutor::execute) method.
/// - `$($method:tt)+`: The method of `$client` to be called to execute the request.
///
/// # Example
///
/// ```compile_fail
/// impl_transport_request_grpc!(
/// MyRequestType,
/// MyResponseType,
/// MyClientType,
/// my_settings,
/// my_method
/// );
/// ```
///
/// This will generate an implementation of the `TransportRequest` trait for `MyRequestType`
/// that uses `MyClientType` to execute the `my_method` method, with the specified `my_settings`.
///
/// The generated implementation will:
/// - Define the associated types `Client` and `Response`.
/// - Set the `SETTINGS_OVERRIDES` constant to the provided settings.
/// - Implement the `method_name` function to return the name of the method as a string.
/// - Implement the `execute_transport` function to execute the transport request using the provided client and settings.
macro_rules! impl_transport_request_grpc {
($request:ty, $response:ty, $client:ty, $settings:expr, $($method:tt)+) => {
impl TransportRequest for $request {
4 changes: 2 additions & 2 deletions packages/rs-dpp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ dashcore = { git = "https://github.com/dashpay/rust-dashcore", features = [
"signer",
"serde",
"bls",
"eddsa"
"eddsa",
], default-features = false, tag = "0.32.0" }
env_logger = { version = "0.11" }
getrandom = { version = "0.2", features = ["js"] }
@@ -56,7 +56,7 @@ platform-version = { path = "../rs-platform-version" }
platform-versioning = { path = "../rs-platform-versioning" }
platform-serialization = { path = "../rs-platform-serialization" }
platform-serialization-derive = { path = "../rs-platform-serialization-derive" }
derive_more = { version = "1.0", features = ["from", "display"] }
derive_more = { version = "1.0", features = ["from", "display", "try_into"] }
nohash-hasher = "0.2.0"
rust_decimal = "1.29.1"
rust_decimal_macros = "1.29.1"
2 changes: 1 addition & 1 deletion packages/rs-dpp/src/state_transition/proof_result.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use crate::voting::votes::Vote;
use platform_value::Identifier;
use std::collections::BTreeMap;

#[derive(Debug)]
#[derive(Debug, strum::Display, derive_more::TryInto)]
pub enum StateTransitionProofResult {
VerifiedDataContract(DataContract),
VerifiedIdentity(Identity),
2 changes: 1 addition & 1 deletion packages/rs-sdk/src/core/transaction.rs
Original file line number Diff line number Diff line change
@@ -57,7 +57,7 @@ impl Sdk {
self.execute(core_transactions_stream, RequestSettings::default())
.await
.into_inner()
.map_err(|e| Error::DapiClientError(e.to_string()))
.map_err(|e| e.into())
}

/// Waits for a response for the asset lock proof
17 changes: 15 additions & 2 deletions packages/rs-sdk/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Definitions of errors
use dapi_grpc::tonic::Code;
use dpp::consensus::ConsensusError;
use dpp::serialization::PlatformDeserializable;
use dpp::version::PlatformVersionError;
@@ -56,6 +57,10 @@ pub enum Error {
/// SDK operation timeout reached error
#[error("SDK operation timeout {} secs reached: {1}", .0.as_secs())]
TimeoutReached(Duration, String),

/// Returned when an attempt is made to create an object that already exists in the system
#[error("Object already exists: {0}")]
AlreadyExists(String),
/// Generic error
// TODO: Use domain specific errors instead of generic ones
#[error("SDK error: {0}")]
@@ -78,6 +83,7 @@ pub enum Error {
impl From<DapiClientError> for Error {
fn from(value: DapiClientError) -> Self {
if let DapiClientError::Transport(TransportError::Grpc(status)) = &value {
// If we have some consensus error metadata, we deserialize it and return as ConsensusError
if let Some(consensus_error_value) = status
.metadata()
.get_bin("dash-serialized-consensus-error-bin")
@@ -88,11 +94,18 @@ impl From<DapiClientError> for Error {
.map(|consensus_error| {
Self::Protocol(ProtocolError::ConsensusError(Box::new(consensus_error)))
})
.unwrap_or_else(Self::Protocol);
.unwrap_or_else(|e| {
tracing::debug!("Failed to deserialize consensus error: {}", e);
Self::Protocol(e)
});
}
// Otherwise we parse the error code and act accordingly
if status.code() == Code::AlreadyExists {
return Self::AlreadyExists(status.message().to_string());
}
}

Self::DapiClientError(format!("{:?}", value))
Self::DapiClientError(value.to_string())
}
}

3 changes: 0 additions & 3 deletions packages/rs-sdk/src/platform/transition.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@
pub mod broadcast;
pub(crate) mod broadcast_identity;
pub mod broadcast_request;
pub(crate) mod context;
pub mod purchase_document;
pub mod put_contract;
pub mod put_document;
@@ -16,6 +15,4 @@ pub mod update_price_of_document;
pub mod vote;
pub mod withdraw_from_identity;

pub use context::*;

pub use txid::TxId;
157 changes: 114 additions & 43 deletions packages/rs-sdk/src/platform/transition/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,146 @@
use super::broadcast_request::BroadcastRequestForStateTransition;
use super::put_settings::PutSettings;
use crate::platform::block_info_from_metadata::block_info_from_metadata;
use crate::sync::retry;
use crate::{Error, Sdk};
use dapi_grpc::platform::v0::{Proof, WaitForStateTransitionResultResponse};
use dapi_grpc::platform::VersionedGrpcResponse;
use dpp::state_transition::proof_result::StateTransitionProofResult;
use dpp::state_transition::StateTransition;
use drive::drive::Drive;
use drive_proof_verifier::error::ContextProviderError;
use drive_proof_verifier::DataContractProvider;
use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings};
use rs_dapi_client::Wrap;
use rs_dapi_client::{DapiRequest, ExecutionError, InnerInto, IntoInner, RequestSettings};

#[async_trait::async_trait]
pub trait BroadcastStateTransition {
async fn broadcast(&self, sdk: &Sdk) -> Result<(), Error>;
async fn broadcast_and_wait(
async fn broadcast(&self, sdk: &Sdk, settings: Option<PutSettings>) -> Result<(), Error>;
async fn wait_for_response<T: TryFrom<StateTransitionProofResult>>(
&self,
sdk: &Sdk,
time_out_ms: Option<u64>,
) -> Result<StateTransitionProofResult, Error>;
settings: Option<PutSettings>,
) -> Result<T, Error>;
async fn broadcast_and_wait<T: TryFrom<StateTransitionProofResult>>(
&self,
sdk: &Sdk,
settings: Option<PutSettings>,
) -> Result<T, Error>;
}

#[async_trait::async_trait]
impl BroadcastStateTransition for StateTransition {
async fn broadcast(&self, sdk: &Sdk) -> Result<(), Error> {
let request = self.broadcast_request_for_state_transition()?;
async fn broadcast(&self, sdk: &Sdk, settings: Option<PutSettings>) -> Result<(), Error> {
let retry_settings = match settings {
Some(s) => sdk.dapi_client_settings.override_by(s.request_settings),
None => sdk.dapi_client_settings,
};

request
.execute(sdk, RequestSettings::default())
.await // TODO: We need better way to handle execution errors
.into_inner()?;
// async fn retry_test_function(settings: RequestSettings) -> ExecutionResult<(), dash_sdk::Error>
let factory = |request_settings: RequestSettings| async move {
let request =
self.broadcast_request_for_state_transition()
.map_err(|e| ExecutionError {
inner: e,
address: None,
retries: 0,
})?;
request
.execute(sdk, request_settings)
.await
.map_err(|e| e.inner_into())
};

// response is empty for a broadcast, result comes from the stream wait for state transition result

Ok(())
retry(retry_settings, factory)
.await
.into_inner()
.map(|_| ())
}

async fn broadcast_and_wait(
async fn wait_for_response<T: TryFrom<StateTransitionProofResult>>(
&self,
sdk: &Sdk,
_time_out_ms: Option<u64>,
) -> Result<StateTransitionProofResult, Error> {
let request = self.broadcast_request_for_state_transition()?;
// TODO: Implement retry logic
request
.clone()
.execute(sdk, RequestSettings::default())
.await
.into_inner()?;
settings: Option<PutSettings>,
) -> Result<T, Error> {
let retry_settings = match settings {
Some(s) => sdk.dapi_client_settings.override_by(s.request_settings),
None => sdk.dapi_client_settings,
};

let request = self.wait_for_state_transition_result_request()?;
// prepare a factory that will generate closure which executes actual code
let factory = |request_settings: RequestSettings| async move {
let request = self
.wait_for_state_transition_result_request()
.map_err(|e| ExecutionError {
inner: e,
address: None,
retries: 0,
})?;

let response = request
.execute(sdk, RequestSettings::default())
.await
.into_inner()?;
let response = request.execute(sdk, request_settings).await.inner_into()?;

let block_info = block_info_from_metadata(response.metadata()?)?;
let proof = response.proof_owned()?;
let context_provider =
sdk.context_provider()
.ok_or(Error::from(ContextProviderError::Config(
let grpc_response: &WaitForStateTransitionResultResponse = &response.inner;
let metadata = grpc_response.metadata().wrap(&response)?.inner;
let block_info = block_info_from_metadata(metadata).wrap(&response)?.inner;
let proof: &Proof = (*grpc_response).proof().wrap(&response)?.inner;

let context_provider = sdk.context_provider().ok_or(ExecutionError {
inner: Error::from(ContextProviderError::Config(
"Context provider not initialized".to_string(),
)))?;
)),
address: Some(response.address.clone()),
retries: response.retries,
})?;

let (_, result) = Drive::verify_state_transition_was_executed_with_proof(
self,
&block_info,
proof.grovedb_proof.as_slice(),
&context_provider.as_contract_lookup_fn(),
sdk.version(),
)
.wrap(&response)?
.inner;

let variant_name = result.to_string();
T::try_from(result)
.map_err(|_| {
Error::InvalidProvedResponse(format!(
"invalid proved response: cannot convert from {} to {}",
variant_name,
std::any::type_name::<T>(),
))
})
.wrap(&response)
};

let (_, result) = Drive::verify_state_transition_was_executed_with_proof(
self,
&block_info,
proof.grovedb_proof.as_slice(),
&context_provider.as_contract_lookup_fn(),
sdk.version(),
)?;
let future = retry(retry_settings, factory);
// run the future with or without timeout, depending on the settings
let wait_timeout = settings.and_then(|s| s.wait_timeout);
match wait_timeout {
Some(timeout) => tokio::time::timeout(timeout, future)
.await
.map_err(|e| {
Error::TimeoutReached(
timeout,
format!("Timeout waiting for result of {} (tx id: {}) affecting object {}: {:?}",
self.name(),
self.transaction_id().map(hex::encode).unwrap_or("UNKNOWN".to_string()),
self.unique_identifiers().join(","),
e),
)
})?
.into_inner(),
None => future.await.into_inner(),
}
}

Ok(result)
async fn broadcast_and_wait<T: TryFrom<StateTransitionProofResult>>(
&self,
sdk: &Sdk,
settings: Option<PutSettings>,
) -> Result<T, Error> {
self.broadcast(sdk, settings).await?;
self.wait_for_response::<T>(sdk, settings).await
}
}
5 changes: 0 additions & 5 deletions packages/rs-sdk/src/platform/transition/context.rs

This file was deleted.

38 changes: 5 additions & 33 deletions packages/rs-sdk/src/platform/transition/purchase_document.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use crate::platform::transition::broadcast_request::BroadcastRequestForStateTransition;
use std::sync::Arc;

use crate::{Error, Sdk};

use crate::platform::block_info_from_metadata::block_info_from_metadata;
use crate::platform::transition::put_settings::PutSettings;
use dapi_grpc::platform::VersionedGrpcResponse;
use dpp::data_contract::document_type::accessors::DocumentTypeV0Getters;
use dpp::data_contract::document_type::DocumentType;
use dpp::data_contract::DataContract;
@@ -18,8 +15,8 @@ use dpp::state_transition::documents_batch_transition::methods::v0::DocumentsBat
use dpp::state_transition::documents_batch_transition::DocumentsBatchTransition;
use dpp::state_transition::proof_result::StateTransitionProofResult;
use dpp::state_transition::StateTransition;
use drive::drive::Drive;
use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings};

use super::broadcast::BroadcastStateTransition;

#[async_trait::async_trait]
/// A trait for purchasing a document on Platform
@@ -96,43 +93,18 @@ impl<S: Signer> PurchaseDocument<S> for Document {
None,
)?;

let request = transition.broadcast_request_for_state_transition()?;

request
.clone()
.execute(sdk, settings.request_settings)
.await // TODO: We need better way to handle execution errors
.into_inner()?;

transition.broadcast(sdk, Some(settings)).await?;
// response is empty for a broadcast, result comes from the stream wait for state transition result

Ok(transition)
}

async fn wait_for_response(
&self,
sdk: &Sdk,
state_transition: StateTransition,
data_contract: Arc<DataContract>,
_data_contract: Arc<DataContract>,
) -> Result<Document, Error> {
let request = state_transition.wait_for_state_transition_result_request()?;
// TODO: Implement retry logic
let response = request
.execute(sdk, RequestSettings::default())
.await
.into_inner()?;

let block_info = block_info_from_metadata(response.metadata()?)?;

let proof = response.proof_owned()?;

let (_, result) = Drive::verify_state_transition_was_executed_with_proof(
&state_transition,
&block_info,
proof.grovedb_proof.as_slice(),
&|_| Ok(Some(data_contract.clone())),
sdk.version(),
)?;
let result = state_transition.wait_for_response(sdk, None).await?;

match result {
StateTransitionProofResult::VerifiedDocuments(mut documents) => {
42 changes: 4 additions & 38 deletions packages/rs-sdk/src/platform/transition/put_contract.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use crate::platform::transition::broadcast_request::BroadcastRequestForStateTransition;
use std::collections::BTreeMap;

use crate::{Error, Sdk};

use crate::platform::block_info_from_metadata::block_info_from_metadata;
use crate::platform::transition::put_settings::PutSettings;
use dapi_grpc::platform::VersionedGrpcResponse;
use dpp::data_contract::accessors::v0::DataContractV0Getters;
use dpp::data_contract::DataContract;
use dpp::identity::identity_public_key::accessors::v0::IdentityPublicKeyGettersV0;
@@ -15,10 +12,8 @@ use dpp::state_transition::data_contract_create_transition::methods::DataContrac
use dpp::state_transition::data_contract_create_transition::DataContractCreateTransition;
use dpp::state_transition::proof_result::StateTransitionProofResult;
use dpp::state_transition::StateTransition;
use drive::drive::Drive;
use drive_proof_verifier::error::ContextProviderError;
use drive_proof_verifier::DataContractProvider;
use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings};

use super::broadcast::BroadcastStateTransition;

#[async_trait::async_trait]
/// A trait for putting a contract to platform
@@ -81,14 +76,7 @@ impl<S: Signer> PutContract<S> for DataContract {
None,
)?;

let request = transition.broadcast_request_for_state_transition()?;

request
.clone()
.execute(sdk, settings.unwrap_or_default().request_settings)
.await // TODO: We need better way to handle execution errors
.into_inner()?;

transition.broadcast(sdk, settings).await?;
// response is empty for a broadcast, result comes from the stream wait for state transition result

Ok(transition)
@@ -99,29 +87,7 @@ impl<S: Signer> PutContract<S> for DataContract {
sdk: &Sdk,
state_transition: StateTransition,
) -> Result<DataContract, Error> {
let request = state_transition.wait_for_state_transition_result_request()?;

let response = request
.execute(sdk, RequestSettings::default())
.await
.into_inner()?;

let block_info = block_info_from_metadata(response.metadata()?)?;

let proof = response.proof_owned()?;
let context_provider =
sdk.context_provider()
.ok_or(Error::from(ContextProviderError::Config(
"Context provider not initialized".to_string(),
)))?;

let (_, result) = Drive::verify_state_transition_was_executed_with_proof(
&state_transition,
&block_info,
proof.grovedb_proof.as_slice(),
&context_provider.as_contract_lookup_fn(),
sdk.version(),
)?;
let result = state_transition.wait_for_response(sdk, None).await?;

//todo verify

68 changes: 22 additions & 46 deletions packages/rs-sdk/src/platform/transition/put_document.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
use crate::platform::transition::broadcast_request::BroadcastRequestForStateTransition;
use std::sync::Arc;

use crate::{Error, Sdk};

use crate::platform::block_info_from_metadata::block_info_from_metadata;
use super::broadcast::BroadcastStateTransition;
use crate::platform::transition::put_settings::PutSettings;
use dapi_grpc::platform::VersionedGrpcResponse;
use crate::{Error, Sdk};
use dpp::data_contract::document_type::accessors::DocumentTypeV0Getters;
use dpp::data_contract::document_type::DocumentType;
use dpp::data_contract::DataContract;
@@ -16,8 +11,7 @@ use dpp::state_transition::documents_batch_transition::methods::v0::DocumentsBat
use dpp::state_transition::documents_batch_transition::DocumentsBatchTransition;
use dpp::state_transition::proof_result::StateTransitionProofResult;
use dpp::state_transition::StateTransition;
use drive::drive::Drive;
use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings};
use std::sync::Arc;

#[async_trait::async_trait]
/// A trait for putting a document to platform
@@ -90,46 +84,19 @@ impl<S: Signer> PutDocument<S> for Document {
None,
)?;

let request = transition.broadcast_request_for_state_transition()?;

request
.clone()
.execute(sdk, settings.request_settings)
.await // TODO: We need better way to handle execution errors
.into_inner()?;

// response is empty for a broadcast, result comes from the stream wait for state transition result

transition.broadcast(sdk, Some(settings)).await?;
Ok(transition)
}

async fn wait_for_response(
&self,
sdk: &Sdk,
state_transition: StateTransition,
data_contract: Arc<DataContract>,
_data_contract: Arc<DataContract>,
) -> Result<Document, Error> {
let request = state_transition.wait_for_state_transition_result_request()?;
// TODO: Implement retry logic
let response = request
.execute(sdk, RequestSettings::default())
.await
.into_inner()?;

let block_info = block_info_from_metadata(response.metadata()?)?;

let proof = response.proof_owned()?;

let (_, result) = Drive::verify_state_transition_was_executed_with_proof(
&state_transition,
&block_info,
proof.grovedb_proof.as_slice(),
&|_| Ok(Some(data_contract.clone())),
sdk.version(),
)?;

let result = state_transition.wait_for_response(sdk, None).await?;
//todo verify

match result {
StateTransitionProofResult::VerifiedDocuments(mut documents) => {
let document = documents
@@ -152,7 +119,7 @@ impl<S: Signer> PutDocument<S> for Document {
document_type: DocumentType,
document_state_transition_entropy: [u8; 32],
identity_public_key: IdentityPublicKey,
data_contract: Arc<DataContract>,
_data_contract: Arc<DataContract>,
signer: &S,
) -> Result<Document, Error> {
let state_transition = self
@@ -166,11 +133,20 @@ impl<S: Signer> PutDocument<S> for Document {
)
.await?;

// TODO: Why do we need full type annotation?
let document =
<Self as PutDocument<S>>::wait_for_response(self, sdk, state_transition, data_contract)
.await?;

Ok(document)
let result = state_transition.broadcast_and_wait(sdk, None).await?;
match result {
StateTransitionProofResult::VerifiedDocuments(mut documents) => {
let document = documents
.remove(self.id_ref())
.ok_or(Error::InvalidProvedResponse(
"did not prove the sent document".to_string(),
))?
.ok_or(Error::InvalidProvedResponse(
"expected there to actually be a document".to_string(),
))?;
Ok(document)
}
_ => Err(Error::DapiClientError("proved a non document".to_string())),
}
}
}
17 changes: 17 additions & 0 deletions packages/rs-sdk/src/platform/transition/put_settings.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use dpp::prelude::UserFeeIncrease;
use rs_dapi_client::RequestSettings;

@@ -7,4 +9,19 @@ pub struct PutSettings {
pub request_settings: RequestSettings,
pub identity_nonce_stale_time_s: Option<u64>,
pub user_fee_increase: Option<UserFeeIncrease>,
/// Soft limit of total time to wait for state transition to be executed (included in a block).
///
/// This is an upper limit, and other settings may affect the actual wait time
/// (like DAPI timeouts, [RequestSettings::timeout], [RequestSettings::retries], etc.).
/// If you want to use `wait_timeout`, tune `retries` accordingly.
///
/// It can be exceeded due to execution of non-cancellable parts of the Sdk.
// TODO: Simplify timeout logic when waiting for response in Sdk, as having 3 different timeouts is confusing.
pub wait_timeout: Option<Duration>,
}

impl From<PutSettings> for RequestSettings {
fn from(settings: PutSettings) -> Self {
settings.request_settings
}
}
8 changes: 7 additions & 1 deletion packages/rs-sdk/src/platform/transition/transfer.rs
Original file line number Diff line number Diff line change
@@ -17,6 +17,12 @@ pub trait TransferToIdentity {
///
/// If signing_transfer_key_to_use is not set, we will try to use one in the signer that is
/// available for the transfer.
///
/// This method will resolve once the state transition is executed.
///
/// ## Returns
///
/// Final balance of the identity after the transfer.
async fn transfer_credits<S: Signer + Send>(
&self,
sdk: &Sdk,
@@ -53,7 +59,7 @@ impl TransferToIdentity for Identity {
None,
)?;

let result = state_transition.broadcast_and_wait(sdk, None).await?;
let result = state_transition.broadcast_and_wait(sdk, settings).await?;

match result {
StateTransitionProofResult::VerifiedPartialIdentity(identity) => {
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ impl WithdrawFromIdentity for Identity {
None,
)?;

let result = state_transition.broadcast_and_wait(sdk, None).await?;
let result = state_transition.broadcast_and_wait(sdk, settings).await?;

match result {
StateTransitionProofResult::VerifiedPartialIdentity(identity) => {
12 changes: 6 additions & 6 deletions packages/rs-sdk/tests/fetch/contested_resource.rs
Original file line number Diff line number Diff line change
@@ -232,11 +232,11 @@ async fn contested_resources_limit_PLAN_656() {
/// None
#[test_case::test_case(|_q| {}, Ok("ContestedResources([ContestedResource(Text(".into()); "unmodified base query is Ok")]
#[test_case::test_case(|q| q.start_index_values = vec![Value::Text("".to_string())], Ok("".into()); "index value empty string is Ok")]
#[test_case::test_case(|q| q.document_type_name = "some random non-existing name".to_string(), Err(r#"code: InvalidArgument, message: "document type some random non-existing name not found"#); "non existing document type returns InvalidArgument")]
#[test_case::test_case(|q| q.index_name = "nx index".to_string(), Err(r#"code: InvalidArgument, message: "index with name nx index is not the contested index"#); "non existing index returns InvalidArgument")]
#[test_case::test_case(|q| q.index_name = "dashIdentityId".to_string(), Err(r#"code: InvalidArgument, message: "index with name dashIdentityId is not the contested index"#); "existing non-contested index returns InvalidArgument")]
#[test_case::test_case(|q| q.document_type_name = "some random non-existing name".to_string(), Err(r#"status: InvalidArgument, message: "document type some random non-existing name not found"#); "non existing document type returns InvalidArgument")]
#[test_case::test_case(|q| q.index_name = "nx index".to_string(), Err(r#"status: InvalidArgument, message: "index with name nx index is not the contested index"#); "non existing index returns InvalidArgument")]
#[test_case::test_case(|q| q.index_name = "dashIdentityId".to_string(), Err(r#"status: InvalidArgument, message: "index with name dashIdentityId is not the contested index"#); "existing non-contested index returns InvalidArgument")]
// Disabled due to bug PLAN-653
// #[test_case::test_case(|q| q.start_at_value = Some((Value::Array(vec![]), true)), Err(r#"code: InvalidArgument"#); "start_at_value wrong index type returns InvalidArgument PLAN-653")]
// #[test_case::test_case(|q| q.start_at_value = Some((Value::Array(vec![]), true)), Err(r#"status: InvalidArgument"#); "start_at_value wrong index type returns InvalidArgument PLAN-653")]
#[test_case::test_case(|q| q.start_index_values = vec![], Ok(r#"ContestedResources([ContestedResource(Text("dash"))])"#.into()); "start_index_values empty vec returns top-level keys")]
#[test_case::test_case(|q| q.start_index_values = vec![Value::Text("".to_string())], Ok(r#"ContestedResources([])"#.into()); "start_index_values empty string returns zero results")]
#[test_case::test_case(|q| {
@@ -276,8 +276,8 @@ async fn contested_resources_limit_PLAN_656() {
q.end_index_values = vec![Value::Text("zzz non existing".to_string())]
}, Ok("ContestedResources([])".into()); "Non-existing end_index_values returns error")]
#[test_case::test_case(|q| q.end_index_values = vec![Value::Array(vec![0.into(), 1.into()])], Err("incorrect index values error: too many end index values were provided"); "wrong type of end_index_values should return InvalidArgument")]
#[test_case::test_case(|q| q.limit = Some(0), Err(r#"code: InvalidArgument"#); "limit 0 returns InvalidArgument")]
#[test_case::test_case(|q| q.limit = Some(u16::MAX), Err(r#"code: InvalidArgument"#); "limit u16::MAX returns InvalidArgument")]
#[test_case::test_case(|q| q.limit = Some(0), Err(r#"status: InvalidArgument"#); "limit 0 returns InvalidArgument")]
#[test_case::test_case(|q| q.limit = Some(u16::MAX), Err(r#"status: InvalidArgument"#); "limit u16::MAX returns InvalidArgument")]
// Disabled due to bug PLAN-656
// #[test_case::test_case(|q| {
// q.start_index_values = vec![Value::Text("dash".to_string())];
8 changes: 4 additions & 4 deletions packages/rs-sdk/tests/fetch/contested_resource_vote_state.rs
Original file line number Diff line number Diff line change
@@ -107,7 +107,7 @@ async fn contested_resource_vote_states_nx_contract() {
if let dash_sdk::error::Error::DapiClientError(e) = result {
assert!(
e.contains(
"Transport(Grpc(Status { code: InvalidArgument, message: \"contract not found error"
"Transport(Grpc(Status { status: InvalidArgument, message: \"contract not found error"
),
"we should get contract not found error, got: {:?}",
e,
@@ -280,9 +280,9 @@ type MutFn = fn(&mut ContestedDocumentVotePollDriveQuery);
#[test_case(|q| q.limit = Some(u16::MAX), Err("limit 65535 out of bounds of [1, 100]"); "limit u16::MAX")]
#[test_case(|q| q.start_at = Some(([0x11; 32], true)), Ok("Contenders { winner: None, contenders: {Identifier("); "start_at does not exist should return next contenders")]
#[test_case(|q| q.start_at = Some(([0xff; 32], true)), Ok("Contenders { winner: None, contenders: {}, abstain_vote_tally: None, lock_vote_tally: None }"); "start_at 0xff;32 should return zero contenders")]
#[test_case(|q| q.vote_poll.document_type_name = "nx doctype".to_string(), Err(r#"code: InvalidArgument, message: "document type nx doctype not found"#); "non existing document type returns InvalidArgument")]
#[test_case(|q| q.vote_poll.index_name = "nx index".to_string(), Err(r#"code: InvalidArgument, message: "index with name nx index is not the contested index"#); "non existing index returns InvalidArgument")]
#[test_case(|q| q.vote_poll.index_name = "dashIdentityId".to_string(), Err(r#"code: InvalidArgument, message: "index with name dashIdentityId is not the contested index"#); "existing non-contested index returns InvalidArgument")]
#[test_case(|q| q.vote_poll.document_type_name = "nx doctype".to_string(), Err(r#"status: InvalidArgument, message: "document type nx doctype not found"#); "non existing document type returns InvalidArgument")]
#[test_case(|q| q.vote_poll.index_name = "nx index".to_string(), Err(r#"status: InvalidArgument, message: "index with name nx index is not the contested index"#); "non existing index returns InvalidArgument")]
#[test_case(|q| q.vote_poll.index_name = "dashIdentityId".to_string(), Err(r#"status: InvalidArgument, message: "index with name dashIdentityId is not the contested index"#); "existing non-contested index returns InvalidArgument")]
#[test_case(|q| q.vote_poll.index_values = vec![], Err("query uses index parentNameAndLabel, this index has 2 properties, but the query provided 0 index values instead"); "index_values empty vec returns error")]
#[test_case(|q| q.vote_poll.index_values = vec![Value::Text("".to_string())], Err("query uses index parentNameAndLabel, this index has 2 properties, but the query provided 1 index values instead"); "index_values empty string returns error")]
#[test_case(|q| q.vote_poll.index_values = vec![Value::Text("dash".to_string())], Err("query uses index parentNameAndLabel, this index has 2 properties, but the query provided 1 index values instead"); "index_values with one value returns error")]