From 439c3e9a70f061832cefc8299b28c480ea58e815 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 6 Nov 2020 12:19:39 +0300 Subject: [PATCH] Fixed client used in headers maintain (#487) * fixed client used in headers maintain * fmt --- .../relays/substrate/src/headers_maintain.rs | 45 +++++++++++-------- .../relays/substrate/src/headers_pipeline.rs | 7 ++- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/bridges/relays/substrate/src/headers_maintain.rs b/bridges/relays/substrate/src/headers_maintain.rs index af3196b375b2..fc91494c7b72 100644 --- a/bridges/relays/substrate/src/headers_maintain.rs +++ b/bridges/relays/substrate/src/headers_maintain.rs @@ -44,13 +44,14 @@ use relay_substrate_client::{Chain, Client, Error as SubstrateError, Justificati use relay_utils::HeaderId; use sp_core::Bytes; use sp_runtime::{traits::Header as HeaderT, Justification}; -use std::{collections::VecDeque, task::Poll}; +use std::{collections::VecDeque, marker::PhantomData, task::Poll}; /// Substrate-to-Substrate headers synchronization maintain procedure. -pub struct SubstrateHeadersToSubstrateMaintain { +pub struct SubstrateHeadersToSubstrateMaintain { pipeline: P, - target_client: Client, + target_client: Client, justifications: Arc>>, + _marker: PhantomData, } /// Future and already received justifications from the source chain. @@ -62,9 +63,11 @@ struct Justifications { queue: VecDeque<(HeaderIdOf

, Justification)>, } -impl SubstrateHeadersToSubstrateMaintain { +impl + SubstrateHeadersToSubstrateMaintain +{ /// Create new maintain procedure. - pub fn new(pipeline: P, target_client: Client, justifications: JustificationsSubscription) -> Self { + pub fn new(pipeline: P, target_client: Client, justifications: JustificationsSubscription) -> Self { SubstrateHeadersToSubstrateMaintain { pipeline, target_client, @@ -72,16 +75,20 @@ impl SubstrateHeadersToSubstrateMaint stream: justifications, queue: VecDeque::new(), })), + _marker: Default::default(), } } } #[async_trait] -impl SyncMaintain

for SubstrateHeadersToSubstrateMaintain +impl SyncMaintain

for SubstrateHeadersToSubstrateMaintain where - C: Chain, - P::Number: Decode + From, - P::Hash: Decode + From, + SourceChain: Chain, + ::Number: Into, + ::Hash: Into, + TargetChain: Chain, + P::Number: Decode, + P::Hash: Decode, P: SubstrateHeadersSyncPipeline, { async fn maintain(&self, sync: &mut HeadersSync

) { @@ -122,7 +129,7 @@ where // on every maintain call. So maintain rate directly affects finalization rate. let justification_to_submit = poll_fn(|context| { // read justifications from the stream and push to the queue - justifications.read_from_stream::(context); + justifications.read_from_stream::(context); // remove all obsolete justifications from the queue remove_obsolete::

(&mut justifications.queue, best_finalized); @@ -166,11 +173,11 @@ where P: SubstrateHeadersSyncPipeline, { /// Read justifications from the subscription stream without blocking. - fn read_from_stream<'a, Header>(&mut self, context: &mut std::task::Context<'a>) + fn read_from_stream<'a, SourceHeader>(&mut self, context: &mut std::task::Context<'a>) where - Header: HeaderT, - Header::Number: Into, - Header::Hash: Into, + SourceHeader: HeaderT, + SourceHeader::Number: Into, + SourceHeader::Hash: Into, { loop { let maybe_next_justification = self.stream.next(); @@ -183,7 +190,7 @@ where }; // decode justification target - let target = pallet_substrate_bridge::decode_justification_target::

(&justification); + let target = pallet_substrate_bridge::decode_justification_target::(&justification); let target = match target { Ok((target_hash, target_number)) => HeaderId(target_number.into(), target_hash.into()), Err(error) => { @@ -266,18 +273,18 @@ where async fn best_finalized_header_id(client: &Client) -> Result, SubstrateError> where P: SubstrateHeadersSyncPipeline, - P::Number: Decode + From, - P::Hash: Decode + From, + P::Number: Decode, + P::Hash: Decode, C: Chain, { let call = P::FINALIZED_BLOCK_METHOD.into(); let data = Bytes(Vec::new()); let encoded_response = client.state_call(call, data, None).await?; - let decoded_response: (C::BlockNumber, C::Hash) = + let decoded_response: (P::Number, P::Hash) = Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; - let best_header_id = HeaderId(decoded_response.0.into(), decoded_response.1.into()); + let best_header_id = HeaderId(decoded_response.0, decoded_response.1); Ok(best_header_id) } diff --git a/bridges/relays/substrate/src/headers_pipeline.rs b/bridges/relays/substrate/src/headers_pipeline.rs index 3a6348961fb7..b00417da39d0 100644 --- a/bridges/relays/substrate/src/headers_pipeline.rs +++ b/bridges/relays/substrate/src/headers_pipeline.rs @@ -124,8 +124,11 @@ pub async fn run( } }; - let sync_maintain = - SubstrateHeadersToSubstrateMaintain::new(pipeline.clone(), source_client.clone(), source_justifications); + let sync_maintain = SubstrateHeadersToSubstrateMaintain::<_, SourceChain, _>::new( + pipeline.clone(), + target_client.clone(), + source_justifications, + ); headers_relay::sync_loop::run( HeadersSource::new(source_client),