Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk)!: retry broadcast operations #2337

Merged
merged 19 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions packages/rs-dapi-client/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ where
/// Result of request execution
pub type ExecutionResult<R, E> = Result<ExecutionResponse<R>, ExecutionError<E>>;

impl<R, E> From<ExecutionResponse<R>> for ExecutionResult<R, E> {
fn from(response: ExecutionResponse<R>) -> Self {
ExecutionResult::<R, E>::Ok(response)
}
}

impl<R, E> From<ExecutionError<E>> for ExecutionResult<R, E> {
fn from(e: ExecutionError<E>) -> Self {
ExecutionResult::<R, E>::Err(e)
}
}

impl<R, E> IntoInner<Result<R, E>> for ExecutionResult<R, E> {
fn into_inner(self) -> Result<R, E> {
match self {
Expand All @@ -145,3 +157,56 @@ where
}
}
}

/// Convert Result<T,E> to ExecutionResult<>, taking context from ExecutionResponse.
pub trait WrapWithExecutionResult<R, RE, W>: Sized {
/// Convert Result<T,E> to ExecutionResult<>, taking context from ExecutionResponse.
///
/// This function simplifies processing of results by wrapping them into ExecutionResult.
/// It is useful when you have execution result retrieved in previous step and you want to
/// add it to the result of the current step.
///
/// ## Example
///
/// ```rust
/// use rs_dapi_client::{ExecutionResponse, ExecutionResult, WrapWithExecutionResult};
///
/// let response: ExecutionResponse<i32> = ExecutionResponse {
/// inner: 42,
/// retries: 123,
/// address: "http://127.0.0.1".parse().expect("create mock address"),
/// };
///
/// let result: Result<i32, String> = Err("next error".to_string());
/// let wrapped_result: ExecutionResult<i32, String> = result.wrap(&response);
///
/// if let ExecutionResult::Err(error) = wrapped_result {
/// assert_eq!(error.inner, "next error");
/// assert_eq!(error.retries, 123);
/// } else {
/// panic!("Expected error");
/// }
/// ```
fn wrap(self, result: &W) -> ExecutionResult<R, RE>;
shumkov marked this conversation as resolved.
Show resolved Hide resolved
}

impl<R, RE, TR, IR, IRE> WrapWithExecutionResult<R, RE, ExecutionResponse<TR>> for Result<IR, IRE>
where
R: From<IR>,
RE: From<IRE>,
{
fn wrap(self, result: &ExecutionResponse<TR>) -> ExecutionResult<R, RE> {
match self {
Ok(r) => ExecutionResult::Ok(ExecutionResponse {
inner: r.into(),
retries: result.retries,
address: result.address.clone(),
}),
Err(e) => ExecutionResult::Err(ExecutionError {
inner: e.into(),
retries: result.retries,
address: Some(result.address.clone()),
}),
}
}
}
lklimek marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions packages/rs-dapi-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use dapi_client::{DapiClient, DapiClientError};
pub use dump::DumpData;
pub use executor::{
DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult, InnerInto, IntoInner,
WrapWithExecutionResult,
};
use futures::{future::BoxFuture, FutureExt};
pub use request_settings::RequestSettings;
Expand Down
116 changes: 80 additions & 36 deletions packages/rs-sdk/src/platform/transition/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use super::broadcast_request::BroadcastRequestForStateTransition;
use crate::platform::block_info_from_metadata::block_info_from_metadata;
use crate::sync::retry;
use crate::{Error, Sdk};
use dapi_grpc::platform::v0::Proof;
use dapi_grpc::platform::VersionedGrpcResponse;
use dpp::state_transition::proof_result::StateTransitionProofResult;
use dpp::state_transition::StateTransition;
use drive::drive::Drive;
use drive_proof_verifier::error::ContextProviderError;
use drive_proof_verifier::DataContractProvider;
use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings};

use rs_dapi_client::WrapWithExecutionResult;
use rs_dapi_client::{DapiRequest, ExecutionError, InnerInto, IntoInner, RequestSettings};
#[async_trait::async_trait]
pub trait BroadcastStateTransition {
async fn broadcast(&self, sdk: &Sdk) -> Result<(), Error>;
Expand All @@ -22,54 +24,96 @@ pub trait BroadcastStateTransition {
#[async_trait::async_trait]
impl BroadcastStateTransition for StateTransition {
async fn broadcast(&self, sdk: &Sdk) -> Result<(), Error> {
let request = self.broadcast_request_for_state_transition()?;
let retry_settings = sdk.dapi_client_settings;

request
.execute(sdk, RequestSettings::default())
.await // TODO: We need better way to handle execution errors
.into_inner()?;
// async fn retry_test_function(settings: RequestSettings) -> ExecutionResult<(), dash_sdk::Error>
let factory = |request_settings: RequestSettings| async move {
let request =
self.broadcast_request_for_state_transition()
.map_err(|e| ExecutionError {
inner: e,
address: None,
retries: 0,
})?;
request
.execute(sdk, request_settings)
.await
.map_err(|e| e.inner_into())
};

// response is empty for a broadcast, result comes from the stream wait for state transition result

Ok(())
retry(retry_settings, factory)
.await
.into_inner()
.map(|_| ())
}

async fn broadcast_and_wait(
&self,
sdk: &Sdk,
_time_out_ms: Option<u64>,
time_out_ms: Option<u64>,
) -> Result<StateTransitionProofResult, Error> {
let request = self.broadcast_request_for_state_transition()?;
// TODO: Implement retry logic
request
.clone()
.execute(sdk, RequestSettings::default())
.await
.into_inner()?;
self.broadcast(sdk).await?;

let request = self.wait_for_state_transition_result_request()?;
let retry_settings = sdk.dapi_client_settings;

let response = request
.execute(sdk, RequestSettings::default())
.await
.into_inner()?;
let factory = |request_settings: RequestSettings| async move {
let request = self
.wait_for_state_transition_result_request()
.map_err(|e| ExecutionError {
inner: e,
address: None,
retries: 0,
})?;

let response = request
.execute(sdk, request_settings)
.await
.map_err(|e| e.inner_into())?;

let grpc_response = &response.inner;
let metadata = grpc_response.metadata().wrap(&response)?.inner;
let block_info = block_info_from_metadata(metadata).wrap(&response)?.inner;
let proof: &Proof = (*grpc_response).proof().wrap(&response)?.inner;

let block_info = block_info_from_metadata(response.metadata()?)?;
let proof = response.proof_owned()?;
let context_provider =
sdk.context_provider()
.ok_or(Error::from(ContextProviderError::Config(
let context_provider = sdk.context_provider().ok_or(ExecutionError {
inner: Error::from(ContextProviderError::Config(
"Context provider not initialized".to_string(),
)))?;
)),
address: Some(response.address.clone()),
retries: response.retries,
})?;

let (_, result) = Drive::verify_state_transition_was_executed_with_proof(
self,
&block_info,
proof.grovedb_proof.as_slice(),
&context_provider.as_contract_lookup_fn(),
sdk.version(),
)
.wrap(&response)?
.inner;

Ok::<_, Error>(result).wrap(&response)
};

let (_, result) = Drive::verify_state_transition_was_executed_with_proof(
self,
&block_info,
proof.grovedb_proof.as_slice(),
&context_provider.as_contract_lookup_fn(),
sdk.version(),
)?;
let future = retry(retry_settings, factory);
match time_out_ms {
Some(time_out_ms) => {
let timeout = tokio::time::Duration::from_millis(time_out_ms);
tokio::time::timeout(timeout, future)
.await
.map_err(|e| {
Error::TimeoutReached(
timeout,
format!("Timeout waiting for state transition result: {:?}", e),
)
})?
.into_inner()
}
None => future.await.into_inner(),
}

Ok(result)
//Result<StateTransitionProofResult, Error>
}
}