Skip to content

Commit

Permalink
Fixed client used in headers maintain (#487)
Browse files Browse the repository at this point in the history
* fixed client used in headers maintain

* fmt
  • Loading branch information
svyatonik authored and bkchr committed Apr 10, 2024
1 parent 2a6af69 commit 439c3e9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
45 changes: 26 additions & 19 deletions bridges/relays/substrate/src/headers_maintain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ use relay_substrate_client::{Chain, Client, Error as SubstrateError, Justificati
use relay_utils::HeaderId;
use sp_core::Bytes;
use sp_runtime::{traits::Header as HeaderT, Justification};
use std::{collections::VecDeque, task::Poll};
use std::{collections::VecDeque, marker::PhantomData, task::Poll};

/// Substrate-to-Substrate headers synchronization maintain procedure.
pub struct SubstrateHeadersToSubstrateMaintain<P: SubstrateHeadersSyncPipeline, C: Chain> {
pub struct SubstrateHeadersToSubstrateMaintain<P: SubstrateHeadersSyncPipeline, SourceChain, TargetChain: Chain> {
pipeline: P,
target_client: Client<C>,
target_client: Client<TargetChain>,
justifications: Arc<Mutex<Justifications<P>>>,
_marker: PhantomData<SourceChain>,
}

/// Future and already received justifications from the source chain.
Expand All @@ -62,26 +63,32 @@ struct Justifications<P: SubstrateHeadersSyncPipeline> {
queue: VecDeque<(HeaderIdOf<P>, Justification)>,
}

impl<P: SubstrateHeadersSyncPipeline, C: Chain> SubstrateHeadersToSubstrateMaintain<P, C> {
impl<P: SubstrateHeadersSyncPipeline, SourceChain, TargetChain: Chain>
SubstrateHeadersToSubstrateMaintain<P, SourceChain, TargetChain>
{
/// Create new maintain procedure.
pub fn new(pipeline: P, target_client: Client<C>, justifications: JustificationsSubscription) -> Self {
pub fn new(pipeline: P, target_client: Client<TargetChain>, justifications: JustificationsSubscription) -> Self {
SubstrateHeadersToSubstrateMaintain {
pipeline,
target_client,
justifications: Arc::new(Mutex::new(Justifications {
stream: justifications,
queue: VecDeque::new(),
})),
_marker: Default::default(),
}
}
}

#[async_trait]
impl<P, C> SyncMaintain<P> for SubstrateHeadersToSubstrateMaintain<P, C>
impl<P, SourceChain, TargetChain> SyncMaintain<P> for SubstrateHeadersToSubstrateMaintain<P, SourceChain, TargetChain>
where
C: Chain,
P::Number: Decode + From<C::BlockNumber>,
P::Hash: Decode + From<C::Hash>,
SourceChain: Chain,
<SourceChain::Header as HeaderT>::Number: Into<P::Number>,
<SourceChain::Header as HeaderT>::Hash: Into<P::Hash>,
TargetChain: Chain,
P::Number: Decode,
P::Hash: Decode,
P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>,
{
async fn maintain(&self, sync: &mut HeadersSync<P>) {
Expand Down Expand Up @@ -122,7 +129,7 @@ where
// on every maintain call. So maintain rate directly affects finalization rate.
let justification_to_submit = poll_fn(|context| {
// read justifications from the stream and push to the queue
justifications.read_from_stream::<C::Header>(context);
justifications.read_from_stream::<SourceChain::Header>(context);

// remove all obsolete justifications from the queue
remove_obsolete::<P>(&mut justifications.queue, best_finalized);
Expand Down Expand Up @@ -166,11 +173,11 @@ where
P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>,
{
/// Read justifications from the subscription stream without blocking.
fn read_from_stream<'a, Header>(&mut self, context: &mut std::task::Context<'a>)
fn read_from_stream<'a, SourceHeader>(&mut self, context: &mut std::task::Context<'a>)
where
Header: HeaderT,
Header::Number: Into<P::Number>,
Header::Hash: Into<P::Hash>,
SourceHeader: HeaderT,
SourceHeader::Number: Into<P::Number>,
SourceHeader::Hash: Into<P::Hash>,
{
loop {
let maybe_next_justification = self.stream.next();
Expand All @@ -183,7 +190,7 @@ where
};

// decode justification target
let target = pallet_substrate_bridge::decode_justification_target::<Header>(&justification);
let target = pallet_substrate_bridge::decode_justification_target::<SourceHeader>(&justification);
let target = match target {
Ok((target_hash, target_number)) => HeaderId(target_number.into(), target_hash.into()),
Err(error) => {
Expand Down Expand Up @@ -266,18 +273,18 @@ where
async fn best_finalized_header_id<P, C>(client: &Client<C>) -> Result<HeaderIdOf<P>, SubstrateError>
where
P: SubstrateHeadersSyncPipeline,
P::Number: Decode + From<C::BlockNumber>,
P::Hash: Decode + From<C::Hash>,
P::Number: Decode,
P::Hash: Decode,
C: Chain,
{
let call = P::FINALIZED_BLOCK_METHOD.into();
let data = Bytes(Vec::new());

let encoded_response = client.state_call(call, data, None).await?;
let decoded_response: (C::BlockNumber, C::Hash) =
let decoded_response: (P::Number, P::Hash) =
Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?;

let best_header_id = HeaderId(decoded_response.0.into(), decoded_response.1.into());
let best_header_id = HeaderId(decoded_response.0, decoded_response.1);
Ok(best_header_id)
}

Expand Down
7 changes: 5 additions & 2 deletions bridges/relays/substrate/src/headers_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ pub async fn run<SourceChain, TargetChain, P>(
}
};

let sync_maintain =
SubstrateHeadersToSubstrateMaintain::new(pipeline.clone(), source_client.clone(), source_justifications);
let sync_maintain = SubstrateHeadersToSubstrateMaintain::<_, SourceChain, _>::new(
pipeline.clone(),
target_client.clone(),
source_justifications,
);

headers_relay::sync_loop::run(
HeadersSource::new(source_client),
Expand Down

0 comments on commit 439c3e9

Please sign in to comment.