Skip to content

Commit

Permalink
add mvp verification logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ctoyan committed May 10, 2024
1 parent c3bbf6d commit 7a7126c
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 143 deletions.
2 changes: 2 additions & 0 deletions ampd/src/handlers/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ pub enum Error {
Sign,
#[error("failed to get transaction receipts")]
TxReceipts,
#[error("failed to verify message against event")]
Verification,
}
76 changes: 39 additions & 37 deletions ampd/src/handlers/starknet_verify_msg.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::collections::HashSet;
use std::collections::HashMap;
use std::convert::TryInto;

use async_trait::async_trait;
use axelar_wasm_std::voting::{PollId, Vote};
// use connection_router_api::ChainName;
use cosmrs::cosmwasm::MsgExecuteContract;
use error_stack::{FutureExt, ResultExt};
use events::Error::EventTypeMismatch;
use events_derive::try_from;
use futures::future::join_all;
use futures::future::try_join_all;
use serde::Deserialize;
use tokio::sync::watch::Receiver;
use tracing::info;
Expand All @@ -17,7 +17,9 @@ use crate::event_processor::EventHandler;
use crate::handlers::errors::Error;
use crate::handlers::errors::Error::DeserializeEvent;
use crate::queue::queued_broadcaster::BroadcasterClient;
use crate::starknet::verifier::MessageVerifier;
use crate::starknet::events::contract_call::ContractCallEvent;
use crate::starknet::json_rpc::StarknetClient;
use crate::starknet::verifier::verify_msg;
use crate::types::{Hash, TMAddress};

type Result<T> = error_stack::Result<T, Error>;
Expand All @@ -38,42 +40,40 @@ struct PollStartedEvent {
#[serde(rename = "_contract_address")]
contract_address: TMAddress,
poll_id: PollId,
// source_chain: ChainName,
// source_gateway_address: String,
// confirmation_height: u64,
source_gateway_address: String,
expires_at: u64,
messages: Vec<Message>,
participants: Vec<TMAddress>,
}

pub struct Handler<V, B>
pub struct Handler<C, B>
where
V: MessageVerifier,
C: StarknetClient,
B: BroadcasterClient,
{
worker: TMAddress,
voting_verifier: TMAddress,
msg_verifier: V,
rpc_client: C,
broadcast_client: B,
latest_block_height: Receiver<u64>,
}

impl<V, B> Handler<V, B>
impl<C, B> Handler<C, B>
where
V: MessageVerifier + Send + Sync,
C: StarknetClient + Send + Sync,
B: BroadcasterClient,
{
pub fn new(
worker: TMAddress,
voting_verifier: TMAddress,
msg_verifier: V,
rpc_client: C,
broadcast_client: B,
latest_block_height: Receiver<u64>,
) -> Self {
Self {
worker,
voting_verifier,
msg_verifier,
rpc_client,
broadcast_client,
latest_block_height,
}
Expand All @@ -99,21 +99,20 @@ where
#[async_trait]
impl<V, B> EventHandler for Handler<V, B>
where
V: MessageVerifier + Send + Sync,
V: StarknetClient + Send + Sync,
B: BroadcasterClient + Send + Sync,
{
type Err = Error;

async fn handle(&self, event: &events::Event) -> Result<()> {
let PollStartedEvent {
contract_address,
poll_id,
// source_chain,
// source_gateway_address: _,
// confirmation_height: _,
source_gateway_address,
messages,
expires_at,
participants,
expires_at,
contract_address,
..
} = match event.try_into() as error_stack::Result<_, _> {
Err(report) if matches!(report.current_context(), EventTypeMismatch(_)) => {
return Ok(());
Expand All @@ -135,27 +134,30 @@ where
return Ok(());
}

let tx_hashes: HashSet<_> = messages
.iter()
.map(|message| message.tx_id.as_str())
.collect();

let unique_axl_msgs: Vec<&Message> = messages
.iter()
.filter(|m| tx_hashes.get(m.tx_id.as_str()).is_some())
.collect();

let votes: Vec<Vote> = join_all(
unique_axl_msgs
.into_iter()
.map(|msg| self.msg_verifier.verify_msg(msg)),
let events: HashMap<String, ContractCallEvent> = try_join_all(
messages
.iter()
.map(|msg| self.rpc_client.get_event_by_hash(msg.tx_id.as_str())),
)
.await
.change_context(Error::TxReceipts)
.await?
.into_iter()
// TODO: Maybe log the errors (mostly with connection/serialization)?
.filter_map(|v| v.ok())
.flatten()
.collect();

let mut votes = vec![];
for msg in messages {
if !events.contains_key(&msg.tx_id) {
votes.push(Vote::NotFound);
continue;
}
votes.push(verify_msg(
events.get(&msg.tx_id).unwrap(), // safe to unwrap, because of previous check
&msg,
&source_gateway_address,
));
}

println!("VOTES {:?}", votes);

self.broadcast_votes(poll_id, votes).await
Expand Down
32 changes: 21 additions & 11 deletions ampd/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::pin::Pin;
use std::str::FromStr;
use std::time::Duration;

use block_height_monitor::BlockHeightMonitor;
Expand All @@ -13,6 +14,7 @@ use events::Event;
use evm::finalizer::{pick, Finalization};
use evm::json_rpc::EthereumClient;
use queue::queued_broadcaster::{QueuedBroadcaster, QueuedBroadcasterDriver};
use starknet_providers::jsonrpc::HttpTransport;
use state::StateUpdater;
use thiserror::Error;
use tofnd::grpc::{MultisigClient, SharableEcdsaClient};
Expand All @@ -22,6 +24,7 @@ use tokio_stream::Stream;
use tokio_util::sync::CancellationToken;
use tracing::info;
use types::TMAddress;
use url::Url;

use crate::asyncutil::task::{CancellableTask, TaskError, TaskGroup};
use crate::config::Config;
Expand Down Expand Up @@ -344,17 +347,24 @@ where
cosmwasm_contract,
rpc_url,
rpc_timeout: _,
} => self.create_handler_task(
"starknet-msg-verifier",
handlers::starknet_verify_msg::Handler::new(
worker.clone(),
cosmwasm_contract,
starknet::verifier::RPCMessageVerifier::new(rpc_url.as_str()),
self.broadcaster.client(),
self.block_height_monitor.latest_block_height(),
),
stream_timeout,
),
} => {
// let starknet_rpc_url = Url::from_str(rpc_url).unwrap();
self.create_handler_task(
"starknet-msg-verifier",
handlers::starknet_verify_msg::Handler::new(
worker.clone(),
cosmwasm_contract,
starknet::json_rpc::Client::new_with_transport(HttpTransport::new(
&rpc_url.into(),
))
.unwrap(),
// starknet::verifier::RPCMessageVerifier::new(rpc_url.as_str()),
self.broadcaster.client(),
self.block_height_monitor.latest_block_height(),
),
stream_timeout,
)
}
};
self.event_processor = self.event_processor.add_task(task);
}
Expand Down
Loading

0 comments on commit 7a7126c

Please sign in to comment.