Skip to content

Commit

Permalink
Merge pull request #309 from SorellaLabs/feat/rpc-return-type
Browse files Browse the repository at this point in the history
Feat/rpc return type
  • Loading branch information
jnoorchashm37 authored Dec 23, 2024
2 parents ae56afe + 17905db commit 534762b
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 57 deletions.
12 changes: 4 additions & 8 deletions crates/angstrom-net/src/pool_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use tokio::sync::{
};
use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream};
use validation::order::{
state::pools::AngstromPoolsTracker, OrderValidationResults, OrderValidatorHandle
state::pools::AngstromPoolsTracker, OrderPoolNewOrderResult, OrderValidationResults,
OrderValidatorHandle
};

use crate::{LruCache, NetworkOrderEvent, StromMessage, StromNetworkEvent, StromNetworkHandle};
Expand Down Expand Up @@ -65,15 +66,10 @@ impl OrderPoolHandle for PoolHandle {
&self,
origin: OrderOrigin,
order: AllOrders
) -> impl Future<Output = bool> + Send {
) -> impl Future<Output = OrderPoolNewOrderResult> + Send {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self.send(OrderCommand::NewOrder(origin, order, tx));
rx.map(|result| match result {
Ok(OrderValidationResults::Valid(_)) => true,
Ok(OrderValidationResults::Invalid(_)) => false,
Ok(OrderValidationResults::TransitionedToBlock) => false,
Err(_) => false
})
rx.map(Into::into)
}

fn subscribe_orders(&self) -> BroadcastStream<PoolManagerUpdate> {
Expand Down
8 changes: 6 additions & 2 deletions crates/order-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub use angstrom_utils::*;
pub use config::PoolConfig;
pub use order_indexer::*;
use tokio_stream::wrappers::BroadcastStream;
use validation::order::OrderPoolNewOrderResult;

#[derive(Debug, Clone)]
pub enum PoolManagerUpdate {
Expand All @@ -32,8 +33,11 @@ pub enum PoolManagerUpdate {
/// asyncly. This allows for requesting data and providing data from different
/// threads efficiently.
pub trait OrderPoolHandle: Send + Sync + Clone + Unpin + 'static {
fn new_order(&self, origin: OrderOrigin, order: AllOrders)
-> impl Future<Output = bool> + Send;
fn new_order(
&self,
origin: OrderOrigin,
order: AllOrders
) -> impl Future<Output = OrderPoolNewOrderResult> + Send;

fn subscribe_orders(&self) -> BroadcastStream<PoolManagerUpdate>;

Expand Down
24 changes: 6 additions & 18 deletions crates/order-pool/src/order_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,8 @@ impl<V: OrderValidatorHandle<Order = AllOrders>> OrderIndexer<V> {
})
// remove from all underlying pools
.filter_map(|id| match id.location {
angstrom_types::orders::OrderLocation::Searcher => {
self.order_storage.remove_searcher_order(&id)
}
angstrom_types::orders::OrderLocation::Limit => {
self.order_storage.remove_limit_order(&id)
}
OrderLocation::Searcher => self.order_storage.remove_searcher_order(&id),
OrderLocation::Limit => self.order_storage.remove_limit_order(&id)
})
.collect::<Vec<_>>();

Expand All @@ -363,12 +359,8 @@ impl<V: OrderValidatorHandle<Order = AllOrders>> OrderIndexer<V> {
.for_each(|order_ids| {
order_ids.into_iter().for_each(|id| {
let Some(order) = (match id.location {
angstrom_types::orders::OrderLocation::Limit => {
self.order_storage.remove_limit_order(&id)
}
angstrom_types::orders::OrderLocation::Searcher => {
self.order_storage.remove_searcher_order(&id)
}
OrderLocation::Limit => self.order_storage.remove_limit_order(&id),
OrderLocation::Searcher => self.order_storage.remove_searcher_order(&id)
}) else {
return
};
Expand Down Expand Up @@ -404,12 +396,8 @@ impl<V: OrderValidatorHandle<Order = AllOrders>> OrderIndexer<V> {
.iter()
.filter_map(|hash| self.order_hash_to_order_id.remove(hash))
.filter_map(|order_id| match order_id.location {
angstrom_types::orders::OrderLocation::Limit => {
self.order_storage.remove_limit_order(&order_id)
}
angstrom_types::orders::OrderLocation::Searcher => {
self.order_storage.remove_searcher_order(&order_id)
}
OrderLocation::Limit => self.order_storage.remove_limit_order(&order_id),
OrderLocation::Searcher => self.order_storage.remove_searcher_order(&order_id)
})
.collect::<Vec<OrderWithStorageData<AllOrders>>>();

Expand Down
20 changes: 11 additions & 9 deletions crates/rpc/src/api/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashSet;
use alloy_primitives::{Address, FixedBytes, B256, U256};
use angstrom_types::{
orders::{CancelOrderRequest, OrderLocation, OrderStatus},
primitive::PoolId,
sol_bindings::grouped_orders::AllOrders
};
use futures::StreamExt;
Expand All @@ -11,10 +12,11 @@ use jsonrpsee::{
proc_macros::rpc
};
use serde::Deserialize;
use validation::order::OrderPoolNewOrderResult;

use crate::types::{OrderSubscriptionFilter, OrderSubscriptionKind};

#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct GasEstimateResponse {
pub gas_units: u64,
pub gas: U256
Expand All @@ -26,7 +28,7 @@ pub struct GasEstimateResponse {
pub trait OrderApi {
/// Submit any type of order
#[method(name = "sendOrder")]
async fn send_order(&self, order: AllOrders) -> RpcResult<bool>;
async fn send_order(&self, order: AllOrders) -> RpcResult<OrderPoolNewOrderResult>;

#[method(name = "pendingOrder")]
async fn pending_order(&self, from: Address) -> RpcResult<Vec<AllOrders>>;
Expand All @@ -41,9 +43,9 @@ pub trait OrderApi {
async fn order_status(&self, order_hash: B256) -> RpcResult<Option<OrderStatus>>;

#[method(name = "ordersByPair")]
async fn orders_by_pair(
async fn orders_by_pool_id(
&self,
pair: FixedBytes<32>,
pool_id: PoolId,
location: OrderLocation
) -> RpcResult<Vec<AllOrders>>;

Expand All @@ -60,7 +62,7 @@ pub trait OrderApi {

// MULTI CALL
#[method(name = "sendOrders")]
async fn send_orders(&self, orders: Vec<AllOrders>) -> RpcResult<Vec<bool>> {
async fn send_orders(&self, orders: Vec<AllOrders>) -> RpcResult<Vec<OrderPoolNewOrderResult>> {
futures::stream::iter(orders.into_iter())
.map(|order| async { self.send_order(order).await })
.buffered(3)
Expand Down Expand Up @@ -124,12 +126,12 @@ pub trait OrderApi {
}

#[method(name = "ordersByPairs")]
async fn orders_by_pairs(
async fn orders_by_pool_ids(
&self,
pair_with_location: Vec<(FixedBytes<32>, OrderLocation)>
pool_ids_with_location: Vec<(PoolId, OrderLocation)>
) -> RpcResult<Vec<AllOrders>> {
Ok(futures::stream::iter(pair_with_location.into_iter())
.map(|(pair, location)| async move { self.orders_by_pair(pair, location).await })
Ok(futures::stream::iter(pool_ids_with_location.into_iter())
.map(|(pair, location)| async move { self.orders_by_pool_id(pair, location).await })
.buffered(3)
.collect::<Vec<_>>()
.await
Expand Down
31 changes: 19 additions & 12 deletions crates/rpc/src/impls/orders.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::collections::HashSet;

use alloy_primitives::{Address, FixedBytes, B256};
use alloy_primitives::{Address, B256};
use angstrom_types::{
orders::{CancelOrderRequest, OrderLocation, OrderOrigin, OrderStatus},
primitive::PoolId,
sol_bindings::grouped_orders::AllOrders
};
use futures::StreamExt;
use jsonrpsee::{core::RpcResult, PendingSubscriptionSink, SubscriptionMessage};
use order_pool::{OrderPoolHandle, PoolManagerUpdate};
use reth_tasks::TaskSpawner;
use validation::order::OrderValidatorHandle;
use validation::order::{OrderPoolNewOrderResult, OrderValidatorHandle};

use crate::{
api::{GasEstimateResponse, OrderApiServer},
Expand All @@ -36,7 +37,7 @@ where
Spawner: TaskSpawner + 'static,
Validator: OrderValidatorHandle
{
async fn send_order(&self, order: AllOrders) -> RpcResult<bool> {
async fn send_order(&self, order: AllOrders) -> RpcResult<OrderPoolNewOrderResult> {
Ok(self.pool.new_order(OrderOrigin::External, order).await)
}

Expand All @@ -61,12 +62,12 @@ where
Ok(self.pool.fetch_order_status(order_hash).await)
}

async fn orders_by_pair(
async fn orders_by_pool_id(
&self,
pair: FixedBytes<32>,
pool_id: PoolId,
location: OrderLocation
) -> RpcResult<Vec<AllOrders>> {
Ok(self.pool.fetch_orders_from_pool(pair, location).await)
Ok(self.pool.fetch_orders_from_pool(pool_id, location).await)
}

async fn subscribe_orders(
Expand Down Expand Up @@ -238,18 +239,24 @@ mod tests {
assert!(api
.send_order(standing_order)
.await
.expect("to not throw error"));
.expect("to not throw error")
.is_valid());

// Test flash order
let flash_order = create_flash_order();
assert!(api
.send_order(flash_order)
.await
.expect("to not throw error"));
.expect("to not throw error")
.is_valid());

// Test TOB order
let tob_order = create_tob_order();
assert!(api.send_order(tob_order).await.expect("to not throw error"));
assert!(api
.send_order(tob_order)
.await
.expect("to not throw error")
.is_valid());
}

fn setup_order_api(
Expand Down Expand Up @@ -280,7 +287,7 @@ mod tests {
impl OrderPoolHandle for MockOrderPoolHandle {
fn fetch_orders_from_pool(
&self,
_: FixedBytes<32>,
_: PoolId,
_: OrderLocation
) -> impl Future<Output = Vec<AllOrders>> + Send {
future::ready(vec![])
Expand All @@ -290,13 +297,13 @@ mod tests {
&self,
origin: OrderOrigin,
order: AllOrders
) -> impl Future<Output = bool> + Send {
) -> impl Future<Output = OrderPoolNewOrderResult> + Send {
let (tx, _) = tokio::sync::oneshot::channel();
let _ = self
.sender
.send(OrderCommand::NewOrder(origin, order, tx))
.is_ok();
future::ready(true)
future::ready(OrderPoolNewOrderResult::Valid)
}

fn subscribe_orders(&self) -> BroadcastStream<PoolManagerUpdate> {
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/orders/orderpool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
sol_bindings::{ext::RespendAvoidanceMethod, RawPoolOrder}
};

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum OrderStatus {
Filled,
Pending,
Expand Down
Loading

0 comments on commit 534762b

Please sign in to comment.