Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk)!: retry broadcast operations #2337

Merged
merged 19 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions packages/rs-dapi-client/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ where
/// Result of request execution
pub type ExecutionResult<R, E> = Result<ExecutionResponse<R>, ExecutionError<E>>;

impl<R, E> From<ExecutionResponse<R>> for ExecutionResult<R, E> {
fn from(response: ExecutionResponse<R>) -> Self {
ExecutionResult::<R, E>::Ok(response)
}
}

impl<R, E> From<ExecutionError<E>> for ExecutionResult<R, E> {
fn from(e: ExecutionError<E>) -> Self {
ExecutionResult::<R, E>::Err(e)
}
}

impl<R, E> IntoInner<Result<R, E>> for ExecutionResult<R, E> {
fn into_inner(self) -> Result<R, E> {
match self {
Expand All @@ -145,3 +157,56 @@ where
}
}
}

/// Convert Result<T,E> to ExecutionResult<>, taking context from ExecutionResponse.
pub trait WrapWithExecutionResult<R, RE, W>: Sized {
/// Convert Result<T,E> to ExecutionResult<>, taking context from ExecutionResponse.
///
/// This function simplifies processing of results by wrapping them into ExecutionResult.
/// It is useful when you have execution result retrieved in previous step and you want to
/// add it to the result of the current step.
///
/// ## Example
///
/// ```rust
/// use rs_dapi_client::{ExecutionResponse, ExecutionResult, WrapWithExecutionResult};
///
/// let response: ExecutionResponse<i32> = ExecutionResponse {
/// inner: 42,
/// retries: 123,
/// address: "http://127.0.0.1".parse().expect("create mock address"),
/// };
///
/// let result: Result<i32, String> = Err("next error".to_string());
/// let wrapped_result: ExecutionResult<i32, String> = result.wrap(&response);
///
/// if let ExecutionResult::Err(error) = wrapped_result {
/// assert_eq!(error.inner, "next error");
/// assert_eq!(error.retries, 123);
/// } else {
/// panic!("Expected error");
/// }
/// ```
fn wrap(self, result: &W) -> ExecutionResult<R, RE>;
shumkov marked this conversation as resolved.
Show resolved Hide resolved
}

impl<R, RE, TR, IR, IRE> WrapWithExecutionResult<R, RE, ExecutionResponse<TR>> for Result<IR, IRE>
where
R: From<IR>,
RE: From<IRE>,
{
fn wrap(self, result: &ExecutionResponse<TR>) -> ExecutionResult<R, RE> {
match self {
Ok(r) => ExecutionResult::Ok(ExecutionResponse {
inner: r.into(),
retries: result.retries,
address: result.address.clone(),
}),
Err(e) => ExecutionResult::Err(ExecutionError {
inner: e.into(),
retries: result.retries,
address: Some(result.address.clone()),
}),
}
}
}
lklimek marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions packages/rs-dapi-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use dapi_client::{DapiClient, DapiClientError};
pub use dump::DumpData;
pub use executor::{
DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult, InnerInto, IntoInner,
WrapWithExecutionResult,
};
use futures::{future::BoxFuture, FutureExt};
pub use request_settings::RequestSettings;
Expand Down
3 changes: 2 additions & 1 deletion packages/rs-dapi-client/src/request_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ const DEFAULT_BAN_FAILED_ADDRESS: bool = true;
pub struct RequestSettings {
/// Timeout for establishing a connection.
pub connect_timeout: Option<Duration>,
/// Timeout for a request.
/// Timeout for single request (soft limit).
/// Note that the total maximum time of execution can exceed `(timeout + connect_timeout) * retries`.
pub timeout: Option<Duration>,
/// Number of retries in case of failed requests. If max retries reached, the last error is returned.
/// 1 means one request and one retry in case of error, etc.
Expand Down
4 changes: 2 additions & 2 deletions packages/rs-dpp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dashcore = { git = "https://github.com/dashpay/rust-dashcore", features = [
"signer",
"serde",
"bls",
"eddsa"
"eddsa",
], default-features = false, tag = "0.32.0" }
env_logger = { version = "0.11" }
getrandom = { version = "0.2", features = ["js"] }
Expand All @@ -56,7 +56,7 @@ platform-version = { path = "../rs-platform-version" }
platform-versioning = { path = "../rs-platform-versioning" }
platform-serialization = { path = "../rs-platform-serialization" }
platform-serialization-derive = { path = "../rs-platform-serialization-derive" }
derive_more = { version = "1.0", features = ["from", "display"] }
derive_more = { version = "1.0", features = ["from", "display", "try_into"] }
nohash-hasher = "0.2.0"
rust_decimal = "1.29.1"
rust_decimal_macros = "1.29.1"
Expand Down
2 changes: 1 addition & 1 deletion packages/rs-dpp/src/state_transition/proof_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::voting::votes::Vote;
use platform_value::Identifier;
use std::collections::BTreeMap;

#[derive(Debug)]
#[derive(Debug, strum::Display, derive_more::TryInto)]
shumkov marked this conversation as resolved.
Show resolved Hide resolved
pub enum StateTransitionProofResult {
VerifiedDataContract(DataContract),
VerifiedIdentity(Identity),
Expand Down
2 changes: 1 addition & 1 deletion packages/rs-sdk/src/core/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Sdk {
self.execute(core_transactions_stream, RequestSettings::default())
.await
.into_inner()
.map_err(|e| Error::DapiClientError(e.to_string()))
.map_err(|e| e.into())
}

/// Waits for a response for the asset lock proof
Expand Down
12 changes: 11 additions & 1 deletion packages/rs-sdk/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Definitions of errors
use dapi_grpc::tonic::Code;
use dpp::consensus::ConsensusError;
use dpp::serialization::PlatformDeserializable;
use dpp::version::PlatformVersionError;
Expand Down Expand Up @@ -56,6 +57,10 @@ pub enum Error {
/// SDK operation timeout reached error
#[error("SDK operation timeout {} secs reached: {1}", .0.as_secs())]
TimeoutReached(Duration, String),

/// Object already exists
#[error("Object already exists: {0}")]
AlreadyExists(String),
shumkov marked this conversation as resolved.
Show resolved Hide resolved
/// Generic error
// TODO: Use domain specific errors instead of generic ones
#[error("SDK error: {0}")]
Expand All @@ -78,6 +83,7 @@ pub enum Error {
impl From<DapiClientError> for Error {
fn from(value: DapiClientError) -> Self {
if let DapiClientError::Transport(TransportError::Grpc(status)) = &value {
// If we have some consensus error metadata, we deserialize it and return as ConsensusError
if let Some(consensus_error_value) = status
.metadata()
.get_bin("dash-serialized-consensus-error-bin")
Expand All @@ -90,9 +96,13 @@ impl From<DapiClientError> for Error {
})
.unwrap_or_else(Self::Protocol);
}
// Otherwise we parse the error code and act accordingly
if status.code() == Code::AlreadyExists {
return Self::AlreadyExists(status.message().to_string());
}
}

Self::DapiClientError(format!("{:?}", value))
Self::DapiClientError(value.to_string())
}
}

Expand Down
3 changes: 0 additions & 3 deletions packages/rs-sdk/src/platform/transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
pub mod broadcast;
pub(crate) mod broadcast_identity;
pub mod broadcast_request;
pub(crate) mod context;
pub mod purchase_document;
pub mod put_contract;
pub mod put_document;
Expand All @@ -16,6 +15,4 @@ pub mod update_price_of_document;
pub mod vote;
pub mod withdraw_from_identity;

pub use context::*;

pub use txid::TxId;
155 changes: 112 additions & 43 deletions packages/rs-sdk/src/platform/transition/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,144 @@
use super::broadcast_request::BroadcastRequestForStateTransition;
use super::put_settings::PutSettings;
use crate::platform::block_info_from_metadata::block_info_from_metadata;
use crate::sync::retry;
use crate::{Error, Sdk};
use dapi_grpc::platform::v0::{Proof, WaitForStateTransitionResultResponse};
use dapi_grpc::platform::VersionedGrpcResponse;
use dpp::state_transition::proof_result::StateTransitionProofResult;
use dpp::state_transition::StateTransition;
use drive::drive::Drive;
use drive_proof_verifier::error::ContextProviderError;
use drive_proof_verifier::DataContractProvider;
use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings};
use rs_dapi_client::WrapWithExecutionResult;
use rs_dapi_client::{DapiRequest, ExecutionError, InnerInto, IntoInner, RequestSettings};

#[async_trait::async_trait]
pub trait BroadcastStateTransition {
async fn broadcast(&self, sdk: &Sdk) -> Result<(), Error>;
async fn broadcast_and_wait(
async fn broadcast(&self, sdk: &Sdk, settings: Option<PutSettings>) -> Result<(), Error>;
async fn wait_for_response<T: TryFrom<StateTransitionProofResult>>(
&self,
sdk: &Sdk,
time_out_ms: Option<u64>,
) -> Result<StateTransitionProofResult, Error>;
settings: Option<PutSettings>,
) -> Result<T, Error>;
async fn broadcast_and_wait<T: TryFrom<StateTransitionProofResult>>(
&self,
sdk: &Sdk,
settings: Option<PutSettings>,
) -> Result<T, Error>;
}

#[async_trait::async_trait]
impl BroadcastStateTransition for StateTransition {
async fn broadcast(&self, sdk: &Sdk) -> Result<(), Error> {
let request = self.broadcast_request_for_state_transition()?;
async fn broadcast(&self, sdk: &Sdk, settings: Option<PutSettings>) -> Result<(), Error> {
let retry_settings = match settings {
Some(s) => sdk.dapi_client_settings.override_by(s.request_settings),
None => sdk.dapi_client_settings,
};

request
.execute(sdk, RequestSettings::default())
.await // TODO: We need better way to handle execution errors
.into_inner()?;
// async fn retry_test_function(settings: RequestSettings) -> ExecutionResult<(), dash_sdk::Error>
let factory = |request_settings: RequestSettings| async move {
let request =
self.broadcast_request_for_state_transition()
.map_err(|e| ExecutionError {
inner: e,
address: None,
retries: 0,
})?;
request
.execute(sdk, request_settings)
.await
.map_err(|e| e.inner_into())
};

// response is empty for a broadcast, result comes from the stream wait for state transition result

Ok(())
retry(retry_settings, factory)
.await
.into_inner()
.map(|_| ())
}

async fn broadcast_and_wait(
async fn wait_for_response<T: TryFrom<StateTransitionProofResult>>(
&self,
sdk: &Sdk,
_time_out_ms: Option<u64>,
) -> Result<StateTransitionProofResult, Error> {
let request = self.broadcast_request_for_state_transition()?;
// TODO: Implement retry logic
request
.clone()
.execute(sdk, RequestSettings::default())
.await
.into_inner()?;
settings: Option<PutSettings>,
) -> Result<T, Error> {
let retry_settings = match settings {
Some(s) => sdk.dapi_client_settings.override_by(s.request_settings),
None => sdk.dapi_client_settings,
};

let request = self.wait_for_state_transition_result_request()?;
let factory = |request_settings: RequestSettings| async move {
let request = self
.wait_for_state_transition_result_request()
.map_err(|e| ExecutionError {
inner: e,
address: None,
retries: 0,
})?;

let response = request
.execute(sdk, RequestSettings::default())
.await
.into_inner()?;
let response = request.execute(sdk, request_settings).await.inner_into()?;

let block_info = block_info_from_metadata(response.metadata()?)?;
let proof = response.proof_owned()?;
let context_provider =
sdk.context_provider()
.ok_or(Error::from(ContextProviderError::Config(
let grpc_response: &WaitForStateTransitionResultResponse = &response.inner;
let metadata = grpc_response.metadata().wrap(&response)?.inner;
let block_info = block_info_from_metadata(metadata).wrap(&response)?.inner;
let proof: &Proof = (*grpc_response).proof().wrap(&response)?.inner;

let context_provider = sdk.context_provider().ok_or(ExecutionError {
inner: Error::from(ContextProviderError::Config(
"Context provider not initialized".to_string(),
)))?;
)),
address: Some(response.address.clone()),
retries: response.retries,
})?;

let (_, result) = Drive::verify_state_transition_was_executed_with_proof(
self,
&block_info,
proof.grovedb_proof.as_slice(),
&context_provider.as_contract_lookup_fn(),
sdk.version(),
)
.wrap(&response)?
.inner;

let variant_name = result.to_string();
T::try_from(result)
.map_err(|_| {
Error::InvalidProvedResponse(format!(
"invalid proved response: cannot convert from {} to {}",
variant_name,
std::any::type_name::<T>(),
))
})
.wrap(&response)
};

let (_, result) = Drive::verify_state_transition_was_executed_with_proof(
self,
&block_info,
proof.grovedb_proof.as_slice(),
&context_provider.as_contract_lookup_fn(),
sdk.version(),
)?;
let future = retry(retry_settings, factory);
let wait_timeout = settings.and_then(|s| s.wait_timeout);
match wait_timeout {
Some(timeout) => tokio::time::timeout(timeout, future)
.await
.map_err(|e| {
Error::TimeoutReached(
timeout,
format!("Timeout waiting for result of {} (tx id: {}) affecting object {}: {:?}",
self.name(),
self.transaction_id().map(hex::encode).unwrap_or("UNKNOWN".to_string()),
self.unique_identifiers().join(","),
e),
)
})?
.into_inner(),
None => future.await.into_inner(),
}
}

Ok(result)
async fn broadcast_and_wait<T: TryFrom<StateTransitionProofResult>>(
&self,
sdk: &Sdk,
settings: Option<PutSettings>,
) -> Result<T, Error> {
self.broadcast(sdk, settings).await?;
self.wait_for_response::<T>(sdk, settings).await
}
}
5 changes: 0 additions & 5 deletions packages/rs-sdk/src/platform/transition/context.rs

This file was deleted.

Loading
Loading