Skip to content

Commit ef1919a

Browse files
timorltimorl
and
timorl
authored
A0-2517: Other major sync solution (#1200)
* Add fallback block import notifications * Eepy * Less panicky time computation * Less error spamming * This also shouldn't be logged as an error --------- Co-authored-by: timorl <[email protected]>
1 parent d34ff6e commit ef1919a

File tree

5 files changed

+139
-47
lines changed

5 files changed

+139
-47
lines changed

finality-aleph/src/import.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
use std::{collections::HashMap, fmt::Debug, time::Instant};
1+
use std::{
2+
collections::HashMap,
3+
fmt::Debug,
4+
time::{Duration, Instant},
5+
};
26

37
use aleph_primitives::{BlockNumber, ALEPH_ENGINE_ID};
48
use futures::channel::mpsc::{TrySendError, UnboundedSender};
5-
use log::{debug, trace, warn};
9+
use log::{debug, warn};
610
use sc_consensus::{
711
BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
812
};
@@ -11,6 +15,7 @@ use sp_runtime::{
1115
traits::{Block as BlockT, Header},
1216
Justification as SubstrateJustification,
1317
};
18+
use tokio::time::sleep;
1419

1520
use crate::{
1621
justification::{backwards_compatible_decode, DecodeError},
@@ -77,7 +82,8 @@ where
7782

7883
/// A wrapper around a block import that also extracts any present jsutifications and send them to
7984
/// our components which will process them further and possibly finalize the block. It also makes
80-
/// blocks from major sync import as if they came from normal sync.
85+
/// blocks from major sync import slightly slower than they normally would, to avoid breaking the
86+
/// new justificaiton sync. The last part will be removed once we finish rewriting the block sync.
8187
#[derive(Clone)]
8288
pub struct AlephBlockImport<B, I, JT>
8389
where
@@ -177,11 +183,10 @@ where
177183
let post_hash = block.post_hash();
178184

179185
let justifications = block.justifications.take();
186+
180187
if matches!(block.origin, BlockOrigin::NetworkInitialSync) {
181-
trace!(target: "aleph-justification", "Treating block {:?} {:?} from major sync as from a normal sync.", number, block.header.hash());
182-
block.origin = BlockOrigin::NetworkBroadcast;
188+
sleep(Duration::from_millis(2)).await;
183189
}
184-
185190
debug!(target: "aleph-justification", "Importing block {:?} {:?} {:?}", number, block.header.hash(), block.post_hash());
186191
let result = self.inner.import_block(block, cache).await;
187192

finality-aleph/src/nodes.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ use crate::{
2727
session::SessionBoundaryInfo,
2828
session_map::{AuthorityProviderImpl, FinalityNotifierImpl, SessionMapUpdater},
2929
sync::{
30-
ChainStatus, Justification, JustificationTranslator, Service as SyncService,
31-
SubstrateChainStatusNotifier, SubstrateFinalizationInfo, SubstrateJustification,
32-
VerifierCache,
30+
ChainStatus, Justification, Service as SyncService, SubstrateChainStatus,
31+
SubstrateChainStatusNotifier, SubstrateFinalizationInfo, VerifierCache,
3332
},
3433
AlephConfig,
3534
};
@@ -47,15 +46,15 @@ pub async fn new_pen(mnemonic: &str, keystore: Arc<dyn CryptoStore>) -> Authorit
4746
.expect("we just generated this key so everything should work")
4847
}
4948

50-
pub async fn run_validator_node<B, H, C, CS, BE, SC>(aleph_config: AlephConfig<B, H, C, SC, CS>)
51-
where
49+
pub async fn run_validator_node<B, H, C, BE, SC>(
50+
aleph_config: AlephConfig<B, H, C, SC, SubstrateChainStatus<B>>,
51+
) where
5252
B: Block,
5353
B::Header: Header<Number = BlockNumber>,
5454
H: ExHashT,
5555
C: crate::ClientForAleph<B, BE> + Send + Sync + 'static,
5656
C::Api: aleph_primitives::AlephSessionApi<B>,
5757
BE: Backend<B> + 'static,
58-
CS: ChainStatus<SubstrateJustification<B::Header>> + JustificationTranslator<B::Header>,
5958
SC: SelectChain<B> + 'static,
6059
{
6160
let AlephConfig {
@@ -135,7 +134,9 @@ where
135134
let chain_events = SubstrateChainStatusNotifier::new(
136135
client.finality_notification_stream(),
137136
client.import_notification_stream(),
138-
);
137+
chain_status.clone(),
138+
)
139+
.expect("chain status notifier should set up correctly");
139140

140141
let genesis_header = match chain_status.finalized_at(0) {
141142
Ok(Some(justification)) => justification.header().clone(),

finality-aleph/src/sync/service.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -191,15 +191,26 @@ impl<
191191
.handle_justification(justification, peer.clone())
192192
{
193193
Ok(maybe_id) => maybe_id,
194-
Err(e) => {
195-
warn!(
196-
target: LOG_TARGET,
197-
"Error while handling justification from {:?}: {}.",
198-
peer.map_or("user".to_string(), |id| format!("{:?}", id)),
199-
e
200-
);
201-
return;
202-
}
194+
Err(e) => match e {
195+
HandlerError::Verifier(e) => {
196+
debug!(
197+
target: LOG_TARGET,
198+
"Could not verify justification from {:?}: {}.",
199+
peer.map_or("user".to_string(), |id| format!("{:?}", id)),
200+
e
201+
);
202+
return;
203+
}
204+
e => {
205+
warn!(
206+
target: LOG_TARGET,
207+
"Error while handling justification from {:?}: {}.",
208+
peer.map_or("user".to_string(), |id| format!("{:?}", id)),
209+
e
210+
);
211+
return;
212+
}
213+
},
203214
};
204215
if let Some(block_id) = maybe_block_id {
205216
if let Some(previous_block_id) = previous_block_id {
@@ -359,9 +370,9 @@ impl<
359370
}
360371
},
361372
Err(e) => error!(
362-
target: LOG_TARGET,
363-
"Error when retrieving Handler state: {}.", e
364-
),
373+
target: LOG_TARGET,
374+
"Error when retrieving Handler state: {}.", e
375+
),
365376
}
366377
}
367378
}

finality-aleph/src/sync/substrate/chain_status.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55

66
use aleph_primitives::{BlockNumber, ALEPH_ENGINE_ID};
77
use log::warn;
8-
use sc_client_api::{blockchain::HeaderBackend, Backend as _};
8+
use sc_client_api::{Backend as _, HeaderBackend};
99
use sc_service::TFullBackend;
1010
use sp_blockchain::{Backend as _, Error as BackendError, Info};
1111
use sp_runtime::traits::{Block as BlockT, Header as SubstrateHeader};
@@ -93,11 +93,11 @@ where
9393
self.backend.blockchain().info()
9494
}
9595

96-
fn hash_for_number(&self, number: BlockNumber) -> Result<Option<B::Hash>, BackendError> {
96+
pub fn hash_for_number(&self, number: BlockNumber) -> Result<Option<B::Hash>, BackendError> {
9797
self.backend.blockchain().hash(number)
9898
}
9999

100-
fn header_for_hash(&self, hash: B::Hash) -> Result<Option<B::Header>, BackendError> {
100+
pub fn header_for_hash(&self, hash: B::Hash) -> Result<Option<B::Header>, BackendError> {
101101
self.backend.blockchain().header(hash)
102102
}
103103

finality-aleph/src/sync/substrate/status_notifier.rs

Lines changed: 94 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,37 @@
1-
use std::fmt::{Display, Error as FmtError, Formatter};
1+
use std::{
2+
fmt::{Display, Error as FmtError, Formatter},
3+
time::{Duration, Instant},
4+
};
25

36
use aleph_primitives::BlockNumber;
47
use futures::StreamExt;
8+
use log::debug;
59
use sc_client_api::client::{FinalityNotifications, ImportNotifications};
610
use sp_runtime::traits::{Block as BlockT, Header as SubstrateHeader};
7-
use tokio::select;
11+
use tokio::{select, time::sleep};
812

9-
use crate::sync::{ChainStatusNotification, ChainStatusNotifier};
13+
use crate::sync::{
14+
substrate::chain_status::Error as ChainStatusError, BlockIdentifier, ChainStatus,
15+
ChainStatusNotification, ChainStatusNotifier, Header, SubstrateChainStatus, LOG_TARGET,
16+
};
1017

1118
/// What can go wrong when waiting for next chain status notification.
1219
#[derive(Debug)]
13-
pub enum Error {
20+
pub enum Error<B>
21+
where
22+
B: BlockT,
23+
B::Header: SubstrateHeader<Number = BlockNumber>,
24+
{
1425
JustificationStreamClosed,
1526
ImportStreamClosed,
27+
ChainStatus(ChainStatusError<B>),
1628
}
1729

18-
impl Display for Error {
30+
impl<B> Display for Error<B>
31+
where
32+
B: BlockT,
33+
B::Header: SubstrateHeader<Number = BlockNumber>,
34+
{
1935
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
2036
use Error::*;
2137
match self {
@@ -25,30 +41,56 @@ impl Display for Error {
2541
ImportStreamClosed => {
2642
write!(f, "import notification stream has ended")
2743
}
44+
ChainStatus(e) => {
45+
write!(f, "chain status error: {}", e)
46+
}
2847
}
2948
}
3049
}
3150

32-
/// Substrate specific implementation of `ChainStatusNotifier`.
51+
/// Substrate specific implementation of `ChainStatusNotifier`. If no blocks are reported through
52+
/// the usual channels for some time it falls back to reading the DB directly and produces
53+
/// notifications that way.
3354
pub struct SubstrateChainStatusNotifier<B>
3455
where
3556
B: BlockT,
57+
B::Header: SubstrateHeader<Number = BlockNumber>,
3658
{
3759
finality_notifications: FinalityNotifications<B>,
3860
import_notifications: ImportNotifications<B>,
61+
// The things below here are a hack to ensure all blocks get to the user, even during a major
62+
// sync. They should almost surely be removed after A0-1760.
63+
backend: SubstrateChainStatus<B>,
64+
last_reported: BlockNumber,
65+
trying_since: Instant,
66+
catching_up: bool,
3967
}
4068

4169
impl<B> SubstrateChainStatusNotifier<B>
4270
where
4371
B: BlockT,
72+
B::Header: SubstrateHeader<Number = BlockNumber>,
4473
{
4574
pub fn new(
4675
finality_notifications: FinalityNotifications<B>,
4776
import_notifications: ImportNotifications<B>,
48-
) -> Self {
49-
Self {
77+
backend: SubstrateChainStatus<B>,
78+
) -> Result<Self, ChainStatusError<B>> {
79+
let last_reported = backend.best_block()?.id().number();
80+
Ok(Self {
5081
finality_notifications,
5182
import_notifications,
83+
backend,
84+
last_reported,
85+
trying_since: Instant::now(),
86+
catching_up: false,
87+
})
88+
}
89+
90+
fn header_at(&self, number: BlockNumber) -> Result<Option<B::Header>, ChainStatusError<B>> {
91+
match self.backend.hash_for_number(number)? {
92+
Some(hash) => Ok(self.backend.header_for_hash(hash)?),
93+
None => Ok(None),
5294
}
5395
}
5496
}
@@ -59,19 +101,52 @@ where
59101
B: BlockT,
60102
B::Header: SubstrateHeader<Number = BlockNumber>,
61103
{
62-
type Error = Error;
104+
type Error = Error<B>;
63105

64106
async fn next(&mut self) -> Result<ChainStatusNotification<B::Header>, Self::Error> {
65-
select! {
66-
maybe_block = self.finality_notifications.next() => {
67-
maybe_block
68-
.map(|block| ChainStatusNotification::BlockFinalized(block.header))
69-
.ok_or(Error::JustificationStreamClosed)
70-
},
71-
maybe_block = self.import_notifications.next() => {
72-
maybe_block
73-
.map(|block| ChainStatusNotification::BlockImported(block.header))
74-
.ok_or(Error::ImportStreamClosed)
107+
loop {
108+
if self.catching_up {
109+
match self
110+
.header_at(self.last_reported + 1)
111+
.map_err(Error::ChainStatus)?
112+
{
113+
Some(header) => {
114+
self.last_reported += 1;
115+
return Ok(ChainStatusNotification::BlockImported(header));
116+
}
117+
None => {
118+
self.catching_up = false;
119+
self.trying_since = Instant::now();
120+
debug!(
121+
target: LOG_TARGET,
122+
"Manual reporting caught up, back to normal waiting for imports."
123+
);
124+
}
125+
}
126+
}
127+
select! {
128+
maybe_block = self.finality_notifications.next() => {
129+
self.trying_since = Instant::now();
130+
return maybe_block
131+
.map(|block| ChainStatusNotification::BlockFinalized(block.header))
132+
.ok_or(Error::JustificationStreamClosed)
133+
},
134+
maybe_block = self.import_notifications.next() => {
135+
if let Some(block) = &maybe_block {
136+
let number = block.header.id().number();
137+
if number > self.last_reported {
138+
self.last_reported = number;
139+
}
140+
}
141+
self.trying_since = Instant::now();
142+
return maybe_block
143+
.map(|block| ChainStatusNotification::BlockImported(block.header))
144+
.ok_or(Error::ImportStreamClosed)
145+
},
146+
_ = sleep((self.trying_since + Duration::from_secs(2)).saturating_duration_since(Instant::now())) => {
147+
self.catching_up = true;
148+
debug!(target: LOG_TARGET, "No new blocks for 2 seconds, falling back to manual reporting.");
149+
}
75150
}
76151
}
77152
}

0 commit comments

Comments
 (0)