Skip to content

Commit

Permalink
chore(hubble): raceclient - provide winning client in result (#3156)
Browse files Browse the repository at this point in the history
qlp authored Nov 4, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 18c86b4 + 3ca6735 commit a9767d8
Showing 6 changed files with 188 additions and 410 deletions.
137 changes: 40 additions & 97 deletions hubble/src/indexer/aptos/provider.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,10 @@ use aptos_rest_client::{
};
use url::Url;

use crate::{indexer::api::BlockHeight, race_client::RaceClient};
use crate::{
indexer::api::BlockHeight,
race_client::{RaceClient, RaceClientId, RaceClientResponse},
};

#[derive(Clone, Debug)]
pub struct Provider {
@@ -16,7 +19,13 @@ pub struct Provider {

#[derive(Clone, Debug, Copy)]
pub struct RpcProviderId {
index: usize,
race_client_id: RaceClientId,
}

impl From<RpcProviderId> for RaceClientId {
fn from(value: RpcProviderId) -> Self {
value.race_client_id
}
}

#[derive(Debug)]
@@ -26,16 +35,20 @@ pub struct RpcResult<T> {
}

impl<T> RpcResult<T> {
fn new(provider_index: usize, result: T) -> Self {
fn new(race_client_id: RaceClientId, result: T) -> Self {
Self {
provider_id: RpcProviderId {
index: provider_index,
},
provider_id: RpcProviderId { race_client_id },
response: result,
}
}
}

impl<T> From<RaceClientResponse<T>> for RpcResult<T> {
fn from(value: RaceClientResponse<T>) -> Self {
RpcResult::new(value.race_client_id, value.response)
}
}

impl Provider {
pub fn new(rpc_urls: Vec<Url>) -> Self {
Self {
@@ -48,55 +61,28 @@ impl Provider {
}
}

fn rpc_client(&self, provider_id: Option<RpcProviderId>) -> RaceClient<Client> {
Self::select_client(self.rpc_client.clone(), provider_id.map(|id| id.index))
}

fn select_client<T: Clone>(
client: RaceClient<T>,
provider_index: Option<usize>,
) -> RaceClient<T> {
match provider_index {
Some(provider_index) => RaceClient::new(vec![client.clients[provider_index].clone()]),
None => client,
}
}

// RPC
pub async fn get_index(
&self,
provider_id: Option<RpcProviderId>,
) -> Result<RpcResult<Response<IndexResponse>>, RestError> {
let result = self.rpc_client(provider_id).get_index().await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.rpc_client
.race(provider_id.map(Into::into), |c| c.get_index())
.await
.map(Into::into)
}

pub async fn get_block_by_height(
&self,
height: BlockHeight,
provider_id: Option<RpcProviderId>,
) -> Result<RpcResult<Response<Block>>, RestError> {
let result = self
.rpc_client(provider_id)
.get_block_by_height(height)
.await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.rpc_client
.race(provider_id.map(Into::into), |c| {
c.get_block_by_height(height, false)
})
.await
.map(Into::into)
}

pub async fn get_transactions(
@@ -105,67 +91,24 @@ impl Provider {
limit: u16,
provider_id: Option<RpcProviderId>,
) -> Result<RpcResult<Response<Vec<Transaction>>>, RestError> {
let result = self
.rpc_client(provider_id)
.get_transactions(start, limit)
.await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.rpc_client
.race(provider_id.map(Into::into), |c| {
c.get_transactions(Some(start), Some(limit))
})
.await
.map(Into::into)
}

pub async fn get_transaction_by_version(
&self,
version: u64,
provider_id: Option<RpcProviderId>,
) -> Result<RpcResult<Response<Transaction>>, RestError> {
let result = self
.rpc_client(provider_id)
.get_transaction_by_version(version)
.await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
}
}

impl RaceClient<Client> {
pub async fn get_index(&self) -> Result<Response<IndexResponse>, RestError> {
self.race(|c| c.get_index()).await
}

pub async fn get_block_by_height(
&self,
height: BlockHeight,
) -> Result<Response<Block>, RestError> {
self.race(|c| c.get_block_by_height(height, false)).await
}

pub async fn get_transactions(
&self,
start: BlockHeight,
limit: u16,
) -> Result<Response<Vec<Transaction>>, RestError> {
self.race(|c| c.get_transactions(Some(start), Some(limit)))
self.rpc_client
.race(provider_id.map(Into::into), |c| {
c.get_transaction_by_version(version)
})
.await
}

pub async fn get_transaction_by_version(
&self,
version: u64,
) -> Result<Response<Transaction>, RestError> {
self.race(|c| c.get_transaction_by_version(version)).await
.map(Into::into)
}
}
4 changes: 2 additions & 2 deletions hubble/src/indexer/eth/create_client_tracker.rs
Original file line number Diff line number Diff line change
@@ -52,8 +52,8 @@ pub fn schedule_create_client_checker(
let tx = provider
.get_transaction_by_hash(FixedBytes::from_str(&transaction_hash).expect("valid transaction hash"), None)
.await?
.response
.expect("transaction");
.expect("transaction")
.response;

let msg = match <IbcHandler::CreateClientCall as alloy::sol_types::SolCall>::abi_decode(&tx.input,true) {
Ok(msg) => msg,
11 changes: 6 additions & 5 deletions hubble/src/indexer/eth/fetcher_client.rs
Original file line number Diff line number Diff line change
@@ -109,23 +109,24 @@ impl EthFetcherClient {
.await;

match block {
Ok(rpc_result) => match rpc_result.response {
Some(block) => {
Ok(rpc_result) => match rpc_result {
Some(result) => {
let block = result.response;
debug!(
"{}: fetched (provider index: {:?})",
selection, rpc_result.provider_id
selection, result.provider_id
);

Ok(EthBlockHandle {
reference: block.block_reference()?,
details: match mode {
FetchMode::Lazy => BlockDetails::Lazy(block),
FetchMode::Eager => BlockDetails::Eager(
self.fetch_details(&block, rpc_result.provider_id).await?,
self.fetch_details(&block, result.provider_id).await?,
),
},
eth_client: self.clone(),
provider_id: rpc_result.provider_id,
provider_id: result.provider_id,
})
}
None => {
133 changes: 36 additions & 97 deletions hubble/src/indexer/eth/provider.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use alloy::{
};
use url::Url;

use crate::race_client::RaceClient;
use crate::race_client::{RaceClient, RaceClientId, RaceClientResponse};

#[derive(Clone, Debug)]
pub struct Provider {
@@ -20,7 +20,13 @@ pub struct Provider {

#[derive(Clone, Debug, Copy)]
pub struct RpcProviderId {
index: usize,
race_client_id: RaceClientId,
}

impl From<RpcProviderId> for RaceClientId {
fn from(value: RpcProviderId) -> Self {
value.race_client_id
}
}

#[derive(Debug)]
@@ -30,16 +36,20 @@ pub struct RpcResult<T> {
}

impl<T> RpcResult<T> {
fn new(provider_index: usize, result: T) -> Self {
fn new(race_client_id: RaceClientId, result: T) -> Self {
Self {
provider_id: RpcProviderId {
index: provider_index,
},
provider_id: RpcProviderId { race_client_id },
response: result,
}
}
}

impl<T> From<RaceClientResponse<T>> for RpcResult<T> {
fn from(value: RaceClientResponse<T>) -> Self {
RpcResult::new(value.race_client_id, value.response)
}
}

impl Provider {
pub fn new(rpc_urls: Vec<Url>) -> Self {
Self {
@@ -52,123 +62,52 @@ impl Provider {
}
}

fn rpc_client(
&self,
provider_id: Option<RpcProviderId>,
) -> RaceClient<RootProvider<Http<Client>>> {
Self::select_client(self.rpc_client.clone(), provider_id.map(|id| id.index))
}

fn select_client<T: Clone>(
client: RaceClient<T>,
provider_index: Option<usize>,
) -> RaceClient<T> {
match provider_index {
Some(provider_index) => RaceClient::new(vec![client.clients[provider_index].clone()]),
None => client,
}
}

pub async fn get_chain_id(
&self,
provider_id: Option<RpcProviderId>,
) -> Result<RpcResult<u64>, RpcError<TransportErrorKind>> {
let result = self.rpc_client(provider_id).get_chain_id().await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.rpc_client
.race(provider_id.map(Into::into), |c| c.get_chain_id())
.await
.map(Into::into)
}

pub async fn get_block(
&self,
id: BlockId,
kind: BlockTransactionsKind,
provider_id: Option<RpcProviderId>,
) -> Result<RpcResult<Option<Block>>, RpcError<TransportErrorKind>> {
let result = self.rpc_client(provider_id).get_block(id, kind).await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
) -> Result<Option<RpcResult<Block>>, RpcError<TransportErrorKind>> {
self.rpc_client
.race_some(provider_id.map(Into::into), |c| c.get_block(id, kind))
.await
.map(|op| op.map(Into::into))
}

pub async fn get_logs(
&self,
filter: &Filter,
provider_id: Option<RpcProviderId>,
) -> Result<RpcResult<Vec<Log>>, RpcError<TransportErrorKind>> {
let result = self.rpc_client(provider_id).get_logs(filter).await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.rpc_client
.race(provider_id.map(Into::into), |c| c.get_logs(filter))
.await
.map(Into::into)
}

pub async fn get_transaction_by_hash(
&self,
tx_hash: TxHash,
provider_id: Option<RpcProviderId>,
) -> Result<
RpcResult<Option<<Ethereum as Network>::TransactionResponse>>,
Option<RpcResult<<Ethereum as Network>::TransactionResponse>>,
RpcError<TransportErrorKind>,
> {
let result = self
.rpc_client(provider_id)
.get_transaction_by_hash(tx_hash)
.await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
}
}

impl RaceClient<RootProvider<Http<Client>>> {
pub async fn get_chain_id(&self) -> Result<u64, RpcError<TransportErrorKind>> {
self.race(|c| c.get_chain_id()).await
}

pub async fn get_block(
&self,
id: BlockId,
kind: BlockTransactionsKind,
) -> Result<Option<Block>, RpcError<TransportErrorKind>> {
self.race_some(|c| c.get_block(id, kind)).await
}

pub async fn get_logs(
&self,
filter: &Filter,
) -> Result<Vec<Log>, RpcError<TransportErrorKind>> {
self.race(|c| c.get_logs(filter)).await
}

pub async fn get_transaction_by_hash(
&self,
tx_hash: TxHash,
) -> Result<Option<<Ethereum as Network>::TransactionResponse>, RpcError<TransportErrorKind>>
{
self.race_some(|c| c.get_transaction_by_hash(tx_hash)).await
self.rpc_client
.race_some(provider_id.map(Into::into), |c| {
c.get_transaction_by_hash(tx_hash)
})
.await
.map(|op| op.map(Into::into))
}
}
263 changes: 75 additions & 188 deletions hubble/src/indexer/tm/provider.rs
Original file line number Diff line number Diff line change
@@ -5,13 +5,15 @@ use protos::ibc::{
core::client::v1::{QueryClientStateRequest, QueryClientStateResponse},
lightclients::wasm::v1::{QueryCodeRequest, QueryCodeResponse},
};
use tendermint::block::Height;
use tendermint_rpc::{query::Query, Client, Error, HttpClient, Order};
use tendermint_rpc::{query::Query, Client, HttpClient, Order};
use tonic::Response;
use unionlabs::aptos::block_info::BlockHeight;
use url::Url;

use crate::{indexer::api::IndexerError, race_client::RaceClient};
use crate::{
indexer::api::IndexerError,
race_client::{RaceClient, RaceClientId, RaceClientResponse},
};

#[derive(Clone, Debug)]
pub struct Provider {
@@ -21,12 +23,24 @@ pub struct Provider {

#[derive(Clone, Debug, Copy)]
pub struct RpcProviderId {
index: usize,
race_client_id: RaceClientId,
}

impl From<RpcProviderId> for RaceClientId {
fn from(value: RpcProviderId) -> Self {
value.race_client_id
}
}

#[derive(Clone, Debug, Copy)]
pub struct GrpcProviderId {
index: usize,
race_client_id: RaceClientId,
}

impl From<GrpcProviderId> for RaceClientId {
fn from(value: GrpcProviderId) -> Self {
value.race_client_id
}
}

#[derive(Debug)]
@@ -36,33 +50,41 @@ pub struct RpcResult<T> {
}

impl<T> RpcResult<T> {
fn new(provider_index: usize, result: T) -> Self {
fn new(race_client_id: RaceClientId, result: T) -> Self {
Self {
provider_id: RpcProviderId {
index: provider_index,
},
provider_id: RpcProviderId { race_client_id },
response: result,
}
}
}

impl<T> From<RaceClientResponse<T>> for RpcResult<T> {
fn from(value: RaceClientResponse<T>) -> Self {
RpcResult::new(value.race_client_id, value.response)
}
}

#[derive(Debug)]
pub struct GrpcResult<T> {
pub provider_id: GrpcProviderId,
pub response: T,
}

impl<T> GrpcResult<T> {
fn new(provider_index: usize, result: T) -> Self {
fn new(race_client_id: RaceClientId, result: T) -> Self {
Self {
provider_id: GrpcProviderId {
index: provider_index,
},
provider_id: GrpcProviderId { race_client_id },
response: result,
}
}
}

impl<T> From<RaceClientResponse<T>> for GrpcResult<T> {
fn from(value: RaceClientResponse<T>) -> Self {
GrpcResult::new(value.race_client_id, value.response)
}
}

impl Provider {
pub fn new(rpc_urls: Vec<Url>, grpc_urls: Vec<Url>) -> Self {
Self {
@@ -84,16 +106,10 @@ impl Provider {
provider_id: Option<RpcProviderId>,
) -> Result<RpcResult<tendermint_rpc::endpoint::status::Response>, tendermint_rpc::error::Error>
{
let result = self.rpc_client(provider_id).status().await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.rpc_client
.race(provider_id.map(Into::into), |c| c.status())
.await
.map(Into::into)
}

pub async fn blockchain(
@@ -105,36 +121,23 @@ impl Provider {
RpcResult<tendermint_rpc::endpoint::blockchain::Response>,
tendermint_rpc::error::Error,
> {
let result = self
.rpc_client(provider_id)
.blockchain(min_inclusive as u32, max_inclusive as u32)
.await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.rpc_client
.race(provider_id.map(Into::into), |c| {
c.blockchain(min_inclusive as u32, max_inclusive as u32)
})
.await
.map(Into::into)
}

pub async fn latest_block(
&self,
provider_id: Option<RpcProviderId>,
) -> Result<RpcResult<tendermint_rpc::endpoint::block::Response>, tendermint_rpc::error::Error>
{
let result = self.rpc_client(provider_id).latest_block().await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.rpc_client
.race(provider_id.map(Into::into), |c| c.latest_block())
.await
.map(Into::into)
}

pub async fn commit(
@@ -143,16 +146,10 @@ impl Provider {
provider_id: Option<RpcProviderId>,
) -> Result<RpcResult<tendermint_rpc::endpoint::commit::Response>, tendermint_rpc::error::Error>
{
let result = self.rpc_client(provider_id).commit(height as u32).await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.rpc_client
.race(provider_id.map(Into::into), |c| c.commit(height as u32))
.await
.map(Into::into)
}

pub async fn block_results(
@@ -163,19 +160,12 @@ impl Provider {
RpcResult<tendermint_rpc::endpoint::block_results::Response>,
tendermint_rpc::error::Error,
> {
let result = self
.rpc_client(provider_id)
.block_results(height as u32)
.await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.rpc_client
.race(provider_id.map(Into::into), |c| {
c.block_results(height as u32)
})
.await
.map(Into::into)
}

pub async fn tx_search(
@@ -190,19 +180,12 @@ impl Provider {
RpcResult<tendermint_rpc::endpoint::tx_search::Response>,
tendermint_rpc::error::Error,
> {
let result = self
.rpc_client(provider_id)
.tx_search(query, prove, page, per_page, order)
.await?;

// TODO: improve race client to return index with result
Ok(RpcResult::new(
provider_id.map_or_else(
|| self.rpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.rpc_client
.race(provider_id.map(Into::into), |c| {
c.tx_search(query.clone(), prove, page, per_page, order.clone())
})
.await
.map(Into::into)
}

// GRPC
@@ -211,51 +194,23 @@ impl Provider {
request: QueryClientStateRequest,
provider_id: Option<GrpcProviderId>,
) -> Result<GrpcResult<Response<QueryClientStateResponse>>, IndexerError> {
let result = self.grpc_client(provider_id).client_state(request).await?;

// TODO: improve race client to return index with result
Ok(GrpcResult::new(
provider_id.map_or_else(
|| self.grpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
self.grpc_client
.race(provider_id.map(Into::into), |c| {
c.client_state(request.clone())
})
.await
.map(Into::into)
}

pub async fn code(
&self,
request: QueryCodeRequest,
provider_id: Option<GrpcProviderId>,
) -> Result<GrpcResult<Response<QueryCodeResponse>>, IndexerError> {
let result = self.grpc_client(provider_id).code(request).await?;

// TODO: improve race client to return index with result
Ok(GrpcResult::new(
provider_id.map_or_else(
|| self.grpc_client.fastest_index(),
|provider_id| provider_id.index,
),
result,
))
}

fn rpc_client(&self, provider_id: Option<RpcProviderId>) -> RaceClient<HttpClient> {
Self::select_client(self.rpc_client.clone(), provider_id.map(|id| id.index))
}

fn grpc_client(&self, provider_id: Option<GrpcProviderId>) -> RaceClient<GrpcClient> {
Self::select_client(self.grpc_client.clone(), provider_id.map(|id| id.index))
}

fn select_client<T: Clone>(
client: RaceClient<T>,
provider_index: Option<usize>,
) -> RaceClient<T> {
match provider_index {
Some(provider_index) => RaceClient::new(vec![client.clients[provider_index].clone()]),
None => client,
}
self.grpc_client
.race(provider_id.map(Into::into), |c| c.code(request.clone()))
.await
.map(Into::into)
}
}

@@ -301,74 +256,6 @@ impl GrpcClient {
}
}

impl RaceClient<GrpcClient> {
pub async fn client_state(
&self,
request: QueryClientStateRequest,
) -> Result<Response<QueryClientStateResponse>, IndexerError> {
self.race(|client| client.client_state(request.clone()))
.await
}

pub async fn code(
&self,
request: QueryCodeRequest,
) -> Result<Response<QueryCodeResponse>, IndexerError> {
self.race(|client| client.code(request.clone())).await
}
}

impl<C: Client + std::marker::Sync + Clone> RaceClient<C> {
pub async fn status(&self) -> Result<tendermint_rpc::endpoint::status::Response, Error> {
self.race(|c| c.status()).await
}

pub async fn blockchain<H: Into<Height>>(
&self,
min: H,
max: H,
) -> Result<tendermint_rpc::endpoint::blockchain::Response, Error> {
let min = min.into();
let max = max.into();

self.race(|c| c.blockchain(min, max)).await
}

#[allow(dead_code)]
pub async fn tx_search(
&self,
query: Query,
prove: bool,
page: u32,
per_page: u8,
order: Order,
) -> Result<tendermint_rpc::endpoint::tx_search::Response, Error> {
self.race(|c| c.tx_search(query.clone(), prove, page, per_page, order.clone()))
.await
}

pub async fn latest_block(&self) -> Result<tendermint_rpc::endpoint::block::Response, Error> {
self.race(|c| c.latest_block()).await
}

pub async fn commit<H: Into<Height>>(
&self,
height: H,
) -> Result<tendermint_rpc::endpoint::commit::Response, Error> {
let height = height.into();
self.race(|c| c.commit(height)).await
}

#[allow(dead_code)]
pub async fn block_results<H: Into<Height>>(
&self,
height: H,
) -> Result<tendermint_rpc::endpoint::block_results::Response, Error> {
let height = height.into();
self.race(|c| c.block_results(height)).await
}
}

impl From<tonic::Status> for IndexerError {
fn from(error: tonic::Status) -> Self {
Self::ProviderError(Report::from(error))
50 changes: 29 additions & 21 deletions hubble/src/race_client.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
use core::{fmt::Debug, future::Future};
use std::sync::atomic::{AtomicUsize, Ordering};

use futures::{stream::FuturesUnordered, StreamExt};
use tracing::debug;

#[derive(Debug)]
pub struct RaceClient<C> {
pub clients: Vec<C>,
fastest: AtomicUsize,
}

#[derive(Clone, Debug, Copy)]
pub struct RaceClientId {
index: usize,
}

pub struct RaceClientResponse<T> {
pub race_client_id: RaceClientId,
pub response: T,
}

impl<T> RaceClientResponse<T> {
fn new(index: usize, response: T) -> Self {
RaceClientResponse {
race_client_id: RaceClientId { index },
response,
}
}
}

impl<C: Clone> Clone for RaceClient<C> {
fn clone(&self) -> Self {
let clients = self.clients.clone();
let fastest = self.fastest.load(Ordering::Relaxed);
Self {
clients,
fastest: fastest.into(),
}
Self { clients }
}
}

impl<C> RaceClient<C> {
pub fn new(clients: Vec<C>) -> Self {
Self {
clients,
fastest: AtomicUsize::new(0),
}
}

pub fn fastest_index(&self) -> usize {
self.fastest.load(Ordering::Relaxed)
Self { clients }
}

/// Run the provided closure over the clients, returning the first encountered Ok, or if all error, the first
@@ -43,12 +49,14 @@ impl<C> RaceClient<C> {
F: Fn(&'a C) -> FUT,
>(
&'a self,
race_client_id: Option<RaceClientId>,
f: F,
) -> Result<T, E> {
) -> Result<RaceClientResponse<T>, E> {
let mut futures: FuturesUnordered<_> = self
.clients
.iter()
.enumerate()
.filter(|(i, _)| race_client_id.as_ref().is_none_or(|id| &id.index == i))
.map(|(i, c)| {
let f = f(c);
async move {
@@ -62,8 +70,7 @@ impl<C> RaceClient<C> {
loop {
match futures.next().await {
Some((i, Ok(res))) => {
self.fastest.store(i, Ordering::Relaxed);
return Ok(res);
return Ok(RaceClientResponse::new(i, res));
}
Some((_, Err(err))) => {
debug!("error racing client requests: {:?}", err);
@@ -88,12 +95,14 @@ impl<C> RaceClient<C> {
F: Fn(&'a C) -> FUT,
>(
&'a self,
race_client_id: Option<RaceClientId>,
f: F,
) -> Result<Option<T>, E> {
) -> Result<Option<RaceClientResponse<T>>, E> {
let mut futures: FuturesUnordered<_> = self
.clients
.iter()
.enumerate()
.filter(|(i, _)| race_client_id.as_ref().is_none_or(|id| &id.index == i))
.map(|(i, c)| {
let f = f(c);
async move {
@@ -107,8 +116,7 @@ impl<C> RaceClient<C> {
loop {
match futures.next().await {
Some((i, Ok(Some(res)))) => {
self.fastest.store(i, Ordering::Relaxed);
return Ok(Some(res));
return Ok(Some(RaceClientResponse::new(i, res)));
}
Some((_, Ok(None))) => continue,
Some((_, Err(err))) => {

0 comments on commit a9767d8

Please sign in to comment.