Skip to content

Commit

Permalink
Merge pull request #304 from SorellaLabs/will/back-138-network-gossip…
Browse files Browse the repository at this point in the history
…ing-cancelations

feat: network gossiping cancellations
  • Loading branch information
Will-Smith11 authored Dec 17, 2024
2 parents 0660464 + 05a5bd1 commit 72a53ff
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 84 deletions.
6 changes: 6 additions & 0 deletions crates/angstrom-net/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ impl<DB: Unpin> Future for StromNetworkManager<DB> {
.send(NetworkOrderEvent::IncomingOrders { peer_id, orders: a });
});
}
StromMessage::OrderCancellation(a) => {
self.to_pool_manager.as_ref().inspect(|tx| {
let _ =
tx.send(NetworkOrderEvent::CancelOrder { peer_id, request: a });
});
}
StromMessage::Status(_) => {}
},
SwarmEvent::Disconnected { peer_id } => {
Expand Down
7 changes: 5 additions & 2 deletions crates/angstrom-net/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::{atomic::AtomicUsize, Arc};

use angstrom_types::{primitive::PeerId, sol_bindings::grouped_orders::AllOrders};
use angstrom_types::{
orders::CancelOrderRequest, primitive::PeerId, sol_bindings::grouped_orders::AllOrders
};
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_network::DisconnectReason;
use tokio::sync::{
Expand Down Expand Up @@ -89,7 +91,8 @@ struct StromNetworkInner {
/// All events related to orders emitted by the network.
#[derive(Debug, Clone, PartialEq)]
pub enum NetworkOrderEvent {
IncomingOrders { peer_id: PeerId, orders: Vec<AllOrders> }
IncomingOrders { peer_id: PeerId, orders: Vec<AllOrders> },
CancelOrder { peer_id: PeerId, request: CancelOrderRequest }
}

#[derive(Debug)]
Expand Down
50 changes: 41 additions & 9 deletions crates/angstrom-net/src/pool_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use alloy::primitives::{Address, FixedBytes, B256};
use angstrom_eth::manager::EthEvent;
use angstrom_types::{
block_sync::BlockSyncConsumer,
orders::{OrderLocation, OrderOrigin, OrderStatus},
orders::{CancelOrderRequest, OrderLocation, OrderOrigin, OrderStatus},
primitive::{NewInitializedPool, PeerId, PoolId},
sol_bindings::grouped_orders::AllOrders
};
Expand Down Expand Up @@ -48,7 +48,7 @@ pub struct PoolHandle {
pub enum OrderCommand {
// new orders
NewOrder(OrderOrigin, AllOrders, tokio::sync::oneshot::Sender<OrderValidationResults>),
CancelOrder(Address, B256, tokio::sync::oneshot::Sender<bool>),
CancelOrder(CancelOrderRequest, tokio::sync::oneshot::Sender<bool>),
PendingOrders(Address, tokio::sync::oneshot::Sender<Vec<AllOrders>>),
OrdersByPool(FixedBytes<32>, OrderLocation, tokio::sync::oneshot::Sender<Vec<AllOrders>>),
OrderStatus(B256, tokio::sync::oneshot::Sender<Option<OrderStatus>>)
Expand Down Expand Up @@ -112,9 +112,9 @@ impl OrderPoolHandle for PoolHandle {
rx.map(|res| res.unwrap_or_default())
}

fn cancel_order(&self, from: Address, order_hash: B256) -> impl Future<Output = bool> + Send {
fn cancel_order(&self, req: CancelOrderRequest) -> impl Future<Output = bool> + Send {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self.send(OrderCommand::CancelOrder(from, order_hash, tx));
let _ = self.send(OrderCommand::CancelOrder(req, tx));
rx.map(|res| res.unwrap_or(false))
}
}
Expand Down Expand Up @@ -283,8 +283,11 @@ where
OrderCommand::NewOrder(_, order, validation_response) => self
.order_indexer
.new_rpc_order(OrderOrigin::External, order, validation_response),
OrderCommand::CancelOrder(from, order_hash, receiver) => {
let res = self.order_indexer.cancel_order(from, order_hash);
OrderCommand::CancelOrder(req, receiver) => {
let res = self.order_indexer.cancel_order(&req);
if res {
self.broadcast_cancel_to_peers(req);
}
let _ = receiver.send(res);
}
OrderCommand::PendingOrders(from, receiver) => {
Expand Down Expand Up @@ -355,6 +358,12 @@ where
);
});
}
NetworkOrderEvent::CancelOrder { request, .. } => {
let res = self.order_indexer.cancel_order(&request);
if res {
self.broadcast_cancel_to_peers(request);
}
}
}
}

Expand All @@ -365,7 +374,12 @@ where
self.peer_to_info.insert(
peer_id,
StromPeer {
orders: LruCache::new(NonZeroUsize::new(PEER_ORDER_CACHE_LIMIT).unwrap())
orders: LruCache::new(
NonZeroUsize::new(PEER_ORDER_CACHE_LIMIT).unwrap()
),
cancellations: LruCache::new(
NonZeroUsize::new(PEER_ORDER_CACHE_LIMIT).unwrap()
)
}
);
}
Expand All @@ -380,7 +394,12 @@ where
self.peer_to_info.insert(
peer_id,
StromPeer {
orders: LruCache::new(NonZeroUsize::new(PEER_ORDER_CACHE_LIMIT).unwrap())
orders: LruCache::new(
NonZeroUsize::new(PEER_ORDER_CACHE_LIMIT).unwrap()
),
cancellations: LruCache::new(
NonZeroUsize::new(PEER_ORDER_CACHE_LIMIT).unwrap()
)
}
);
}
Expand Down Expand Up @@ -413,6 +432,18 @@ where
self.broadcast_orders_to_peers(valid_orders);
}

fn broadcast_cancel_to_peers(&mut self, cancel: CancelOrderRequest) {
for (peer_id, info) in self.peer_to_info.iter_mut() {
let order_hash = cancel.order_id;
if !info.cancellations.contains(&order_hash) {
self.network
.send_message(*peer_id, StromMessage::OrderCancellation(cancel.clone()));

info.cancellations.insert(order_hash);
}
}
}

fn broadcast_orders_to_peers(&mut self, valid_orders: Vec<AllOrders>) {
for order in valid_orders.iter() {
for (peer_id, info) in self.peer_to_info.iter_mut() {
Expand Down Expand Up @@ -496,5 +527,6 @@ pub enum NetworkTransactionEvent {
#[derive(Debug)]
struct StromPeer {
/// Keeps track of transactions that we know the peer has seen.
orders: LruCache<B256>
orders: LruCache<B256>,
cancellations: LruCache<B256>
}
32 changes: 21 additions & 11 deletions crates/angstrom-net/src/types/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{fmt::Debug, sync::Arc};
use alloy::rlp::{Buf, BufMut, Decodable, Encodable};
use angstrom_types::{
consensus::{PreProposal, PreProposalAggregation, Proposal},
orders::CancelOrderRequest,
sol_bindings::grouped_orders::AllOrders
};
use reth_eth_wire::{protocol::Protocol, Capability};
Expand All @@ -25,13 +26,14 @@ const STROM_PROTOCOL: Protocol = Protocol::new(STROM_CAPABILITY, 5);
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum StromMessageID {
Status = 0,
Status = 0,
/// Consensus
PrePropose = 1,
PreProposeAgg = 2,
Propose = 3,
PrePropose = 1,
PreProposeAgg = 2,
Propose = 3,
/// Propagation messages that broadcast new orders to all peers
PropagatePooledOrders = 4
PropagatePooledOrders = 4,
OrderCancellation = 5
}

impl Encodable for StromMessageID {
Expand All @@ -50,8 +52,10 @@ impl Decodable for StromMessageID {
let id = match id {
0 => StromMessageID::Status,
1 => StromMessageID::PrePropose,
2 => StromMessageID::Propose,
3 => StromMessageID::PropagatePooledOrders,
2 => StromMessageID::PreProposeAgg,
3 => StromMessageID::PrePropose,
4 => StromMessageID::PropagatePooledOrders,
5 => StromMessageID::OrderCancellation,
_ => return Err(alloy::rlp::Error::Custom("Invalid message ID"))
};
buf.advance(1);
Expand Down Expand Up @@ -116,7 +120,8 @@ pub enum StromMessage {
Propose(Proposal),

/// Propagation messages that broadcast new orders to all peers
PropagatePooledOrders(Vec<AllOrders>)
PropagatePooledOrders(Vec<AllOrders>),
OrderCancellation(CancelOrderRequest)
}
impl StromMessage {
/// Returns the message's ID.
Expand All @@ -126,7 +131,8 @@ impl StromMessage {
StromMessage::PrePropose(_) => StromMessageID::PrePropose,
StromMessage::PreProposeAgg(_) => StromMessageID::PreProposeAgg,
StromMessage::Propose(_) => StromMessageID::Propose,
StromMessage::PropagatePooledOrders(_) => StromMessageID::PropagatePooledOrders
StromMessage::PropagatePooledOrders(_) => StromMessageID::PropagatePooledOrders,
StromMessage::OrderCancellation(_) => StromMessageID::OrderCancellation
}
}
}
Expand All @@ -146,7 +152,8 @@ pub enum StromBroadcastMessage {
Propose(Arc<Proposal>),
PreProposeAgg(Arc<PreProposalAggregation>),
// Order Broadcast
PropagatePooledOrders(Arc<Vec<AllOrders>>)
PropagatePooledOrders(Arc<Vec<AllOrders>>),
OrderCancellation(Arc<CancelOrderRequest>)
}

impl StromBroadcastMessage {
Expand All @@ -156,7 +163,10 @@ impl StromBroadcastMessage {
StromBroadcastMessage::PrePropose(_) => StromMessageID::PrePropose,
StromBroadcastMessage::PreProposeAgg(_) => StromMessageID::PreProposeAgg,
StromBroadcastMessage::Propose(_) => StromMessageID::Propose,
StromBroadcastMessage::PropagatePooledOrders(_) => StromMessageID::PropagatePooledOrders
StromBroadcastMessage::PropagatePooledOrders(_) => {
StromMessageID::PropagatePooledOrders
}
StromBroadcastMessage::OrderCancellation(_) => StromMessageID::OrderCancellation
}
}
}
4 changes: 2 additions & 2 deletions crates/order-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::future::Future;

use alloy::primitives::{Address, FixedBytes, B256};
use angstrom_types::{
orders::{OrderLocation, OrderOrigin, OrderStatus},
orders::{CancelOrderRequest, OrderLocation, OrderOrigin, OrderStatus},
sol_bindings::grouped_orders::{AllOrders, OrderWithStorageData}
};
pub use angstrom_utils::*;
Expand All @@ -39,7 +39,7 @@ pub trait OrderPoolHandle: Send + Sync + Clone + Unpin + 'static {

fn pending_orders(&self, sender: Address) -> impl Future<Output = Vec<AllOrders>> + Send;

fn cancel_order(&self, sender: Address, order_hash: B256) -> impl Future<Output = bool> + Send;
fn cancel_order(&self, req: CancelOrderRequest) -> impl Future<Output = bool> + Send;

fn fetch_orders_from_pool(
&self,
Expand Down
42 changes: 24 additions & 18 deletions crates/order-pool/src/order_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,46 +192,52 @@ impl<V: OrderValidatorHandle<Order = AllOrders>> OrderIndexer<V> {
self.new_order(Some(peer_id), origin, order, None)
}

pub fn cancel_order(&mut self, from: Address, order_hash: B256) -> bool {
if self.is_seen_invalid(&order_hash) || self.is_cancelled(&order_hash) {
pub fn cancel_order(&mut self, request: &angstrom_types::orders::CancelOrderRequest) -> bool {
// ensure validity
if !request.is_valid() {
return false;
}

if self.is_seen_invalid(&request.order_id) || self.is_cancelled(&request.order_id) {
return true
}

// the cancel arrived before the new order request
// nothing more needs to be done, since new_order() will return early
if self.is_missing(&order_hash) {
if self.is_missing(&request.order_id) {
// optimistically assuming that orders won't take longer than a day to propagate
let deadline = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
+ MAX_NEW_ORDER_DELAY_PROPAGATION * ETH_BLOCK_TIME.as_secs();
self.insert_cancel_request_with_deadline(from, &order_hash, Some(U256::from(deadline)));
self.insert_cancel_request_with_deadline(
request.user_address,
&request.order_id,
Some(U256::from(deadline))
);

return true
}

let order_id = self.order_hash_to_order_id.get(&order_hash).unwrap();
if order_id.address != from {
return false
}

let removed = self.order_storage.cancel_order(order_id);
let removed_from_storage = removed.is_some();
if removed_from_storage {
let order = removed.unwrap();
self.order_hash_to_order_id.remove(&order_hash);
self.order_hash_to_peer_id.remove(&order_hash);
self.insert_cancel_request_with_deadline(from, &order_hash, order.deadline());
let id = self.order_hash_to_order_id.remove(&request.order_id);
if let Some(order) = id.and_then(|v| self.order_storage.cancel_order(&v)) {
self.order_hash_to_order_id.remove(&order.order_hash());
self.order_hash_to_peer_id.remove(&order.order_hash());
self.insert_cancel_request_with_deadline(
request.user_address,
&request.order_id,
order.deadline()
);

self.notify_order_subscribers(PoolManagerUpdate::CancelledOrder {
order_hash: order.order_hash(),
user: order.from(),
pool_id: order.pool_id
});
return true
}

removed_from_storage
false
}

fn insert_cancel_request_with_deadline(
Expand Down
20 changes: 2 additions & 18 deletions crates/rpc/src/api/orders.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::collections::HashSet;

use alloy_primitives::{keccak256, Address, FixedBytes, Signature, B256, U256};
use alloy_sol_types::SolValue;
use alloy_primitives::{Address, FixedBytes, B256, U256};
use angstrom_types::{
orders::{OrderLocation, OrderStatus},
orders::{CancelOrderRequest, OrderLocation, OrderStatus},
sol_bindings::grouped_orders::AllOrders
};
use futures::StreamExt;
Expand All @@ -15,21 +14,6 @@ use serde::Deserialize;

use crate::types::{OrderSubscriptionFilter, OrderSubscriptionKind};

#[derive(Serialize, Deserialize, Debug)]
pub struct CancelOrderRequest {
pub signature: Signature,
// if there's no salt to make this a unique signing hash. One can just
// copy the signature of the order and id and it will verify
pub user_address: Address,
pub order_id: B256
}

impl CancelOrderRequest {
pub fn signing_payload(&self) -> FixedBytes<32> {
keccak256((self.user_address, self.order_id).abi_encode())
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GasEstimateResponse {
pub gas_units: u64,
Expand Down
Loading

0 comments on commit 72a53ff

Please sign in to comment.