Skip to content

Commit

Permalink
feat: adding the status check
Browse files Browse the repository at this point in the history
  • Loading branch information
geekbrother committed Oct 29, 2024
1 parent b944401 commit df4df80
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 4 deletions.
15 changes: 15 additions & 0 deletions integration/chain_orchestrator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ describe('Chain abstraction orchestrator', () => {
const chain_id_base = "eip155:8453";
const usdc_contract_optimism = "0x0b2c639c533813f4aa9d7837caf62653d097ff85";

let orchestration_id = "";

it('bridging available', async () => {
// Sending USDC to Optimism, but having the USDC balance on Base chain
const amount_to_send_in_decimals = usdc_funds_on_address - 1_000_000
Expand Down Expand Up @@ -191,5 +193,18 @@ describe('Chain abstraction orchestrator', () => {

// Last transaction expected to be the initial one
expect(data.transactions[2]).toStrictEqual(transactionObj.transaction)

// Set the Orchestration ID for the next test
orchestration_id = data.orchestrationId;
})

it('bridging status', async () => {
let resp: any = await httpClient.get(
`${baseUrl}/v1/ca/orchestrator/status?projectId=${projectId}&orchestrationId=${orchestration_id}`,
)
expect(resp.status).toBe(200)
const data = resp.data
expect(typeof data.status).toBe('string')
expect(data.status).toBe('pending')
})
})
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ pub enum RpcError {

#[error("No routes available for the bridging")]
NoBridgingRoutesAvailable,

#[error("Orchestration ID is not found: {0}")]
OrchestrationIdNotFound(String),
}

impl IntoResponse for RpcError {
Expand Down Expand Up @@ -655,6 +658,14 @@ impl IntoResponse for RpcError {
)),
)
.into_response(),
Self::OrchestrationIdNotFound(id) => (
StatusCode::BAD_REQUEST,
Json(new_error_response(
"orchestrationId".to_string(),
format!("Orchestration ID is not found: {}", id),
)),
)
.into_response(),
// Any other errors considering as 500
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
Expand Down
22 changes: 22 additions & 0 deletions src/handlers/chain_agnostic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use {
alloy::primitives::{Address, U256},
ethers::types::H160 as EthersH160,
phf::phf_map,
serde::{Deserialize, Serialize},
std::{collections::HashMap, str::FromStr},
};

pub mod check;
pub mod route;
pub mod status;

/// Available assets for Bridging
pub static BRIDGING_AVAILABLE_ASSETS: phf::Map<&'static str, phf::Map<&'static str, &'static str>> = phf_map! {
Expand All @@ -21,6 +23,26 @@ pub static BRIDGING_AVAILABLE_ASSETS: phf::Map<&'static str, phf::Map<&'static s
},
};

/// Serialized bridging request item schema to store it in the IRN database
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageBridgingItem {
created_at: usize,
chain_id: String,
wallet: Address,
amount_expected: U256,
status: BridgingStatus,
}

/// Bridging status
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum BridgingStatus {
Pending,
Completed,
Error,
}

/// Checking ERC20 balances for given address for provided ERC20 contracts
pub async fn check_erc20_balances(
project_id: String,
Expand Down
35 changes: 31 additions & 4 deletions src/handlers/chain_agnostic/route.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use {
super::{super::HANDLER_TASK_METRICS, check_bridging_for_erc20_transfer},
super::{
super::HANDLER_TASK_METRICS, check_bridging_for_erc20_transfer, BridgingStatus,
StorageBridgingItem,
},
crate::{
analytics::MessageSource,
error::RpcError,
state::AppState,
storage::irn::OperationType,
utils::crypto::{
convert_alloy_address_to_h160, decode_erc20_function_type, decode_erc20_transfer_data,
get_balance, Erc20FunctionType,
Expand All @@ -16,7 +20,7 @@ use {
Json,
},
serde::{Deserialize, Serialize},
std::{str::FromStr, sync::Arc},
std::{str::FromStr, sync::Arc, time::SystemTime, time::UNIX_EPOCH},
tracing::{debug, error},
uuid::Uuid,
wc::future::FutureExt,
Expand Down Expand Up @@ -196,7 +200,7 @@ async fn handler_internal(
.transaction
.max_priority_fee_per_gas
.clone(),
chain_id: bridge_chain_id,
chain_id: bridge_chain_id.clone(),
});
}
}
Expand All @@ -217,8 +221,31 @@ async fn handler_internal(

// Push initial transaction last after all bridging transactions
routes.push(request_payload.transaction);

let orchestration_id = Uuid::new_v4().to_string();

// Save the bridging transaction to the IRN
let bridging_status_item = StorageBridgingItem {
created_at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as usize,
chain_id: bridge_chain_id,
wallet: from_address,
amount_expected: erc20_transfer_value,
status: BridgingStatus::Pending,
};
let irn_client = state.irn.as_ref().ok_or(RpcError::IrnNotConfigured)?;
let irn_call_start = SystemTime::now();
irn_client
.set(
orchestration_id.clone(),
serde_json::to_string(&bridging_status_item)?.into(),
)
.await?;
state
.metrics
.add_irn_latency(irn_call_start, OperationType::Set);

return Ok(Json(RouteResponse {
orchestration_id,
transactions: routes,
Expand Down
112 changes: 112 additions & 0 deletions src/handlers/chain_agnostic/status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use {
super::{super::HANDLER_TASK_METRICS, BridgingStatus, StorageBridgingItem},
crate::{
analytics::MessageSource, error::RpcError, state::AppState, storage::irn::OperationType,
utils::crypto::get_balance,
},
alloy::primitives::U256,
axum::{
extract::{Query, State},
response::{IntoResponse, Response},
Json,
},
ethers::types::H160 as EthersH160,
serde::{Deserialize, Serialize},
std::{sync::Arc, time::SystemTime},
wc::future::FutureExt,
};

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct QueryParams {
pub project_id: String,
pub orchestration_id: String,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StatusResponse {
status: BridgingStatus,
created_at: usize,
}

pub async fn handler(
state: State<Arc<AppState>>,
query_params: Query<QueryParams>,
) -> Result<Response, RpcError> {
handler_internal(state, query_params)
.with_metrics(HANDLER_TASK_METRICS.with_name("ca_status"))
.await
}

#[tracing::instrument(skip(state), level = "debug")]
async fn handler_internal(
state: State<Arc<AppState>>,
Query(query_params): Query<QueryParams>,
) -> Result<Response, RpcError> {
state
.validate_project_access_and_quota(&query_params.project_id.clone())
.await?;

let irn_client = state.irn.as_ref().ok_or(RpcError::IrnNotConfigured)?;

// Get the bridging request status from the IRN
let irn_call_start = SystemTime::now();
let irn_result = irn_client
.get(query_params.orchestration_id.clone())
.await?
.ok_or(RpcError::OrchestrationIdNotFound(
query_params.orchestration_id.clone(),
))?;
state
.metrics
.add_irn_latency(irn_call_start, OperationType::Get);
let mut bridging_status_item = serde_json::from_str::<StorageBridgingItem>(&irn_result)?;

// Return without checking the balance if the status is completed or errored
if bridging_status_item.status == BridgingStatus::Completed
|| bridging_status_item.status == BridgingStatus::Error
{
return Ok(Json(StatusResponse {
status: bridging_status_item.status,
created_at: bridging_status_item.created_at,
})
.into_response());
}

// Check the balance of the wallet and the amount expected
let wallet_balance = get_balance(
&bridging_status_item.chain_id,
EthersH160::from(<[u8; 20]>::from(bridging_status_item.wallet)),
&query_params.project_id,
MessageSource::ChainAgnosticCheck,
)
.await?;
if U256::from_be_bytes(wallet_balance.into()) < bridging_status_item.amount_expected {
// The balance was not fullfilled return the same pending status
return Ok(Json(StatusResponse {
status: bridging_status_item.status,
created_at: bridging_status_item.created_at,
})
.into_response());
} else {
// The balance was fullfilled, update the status to completed
bridging_status_item.status = BridgingStatus::Completed;
let irn_call_start = SystemTime::now();
irn_client
.set(
query_params.orchestration_id,
serde_json::to_string(&bridging_status_item)?.into(),
)
.await?;
state
.metrics
.add_irn_latency(irn_call_start, OperationType::Set);
}

return Ok(Json(StatusResponse {
status: bridging_status_item.status,
created_at: bridging_status_item.created_at,
})
.into_response());
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ pub async fn bootstrap(config: Config) -> RpcResult<()> {
// Chain agnostic orchestration
.route("/v1/ca/orchestrator/check", post(handlers::chain_agnostic::check::handler))
.route("/v1/ca/orchestrator/route", post(handlers::chain_agnostic::route::handler))
.route("/v1/ca/orchestrator/status", get(handlers::chain_agnostic::status::handler))
// Health
.route("/health", get(handlers::health::handler))
.route_layer(tracing_and_metrics_layer)
Expand Down
4 changes: 4 additions & 0 deletions src/storage/irn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub enum OperationType {
Hget,
Hfields,
Hdel,
Set,
Get,
}

impl OperationType {
Expand All @@ -31,6 +33,8 @@ impl OperationType {
OperationType::Hget => "hget",
OperationType::Hfields => "hfields",
OperationType::Hdel => "hdel",
OperationType::Set => "set",
OperationType::Get => "get",
}
}
}
Expand Down

0 comments on commit df4df80

Please sign in to comment.