Skip to content

Commit

Permalink
Merge pull request #533 from paritytech/sandreim/hist_fix
Browse files Browse the repository at this point in the history
Fix historical subscription
  • Loading branch information
sandreim authored Sep 1, 2023
2 parents 0df5012 + ee6b03a commit d116675
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
10 changes: 9 additions & 1 deletion essentials/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ impl Collector {
block_number: u32,
) -> color_eyre::Result<(), CollectorError> {
info!(
"imported new block hash: {:?}, number: {}, previous number: {}, previous hashes: {:?}",
"importing new block hash: {:?}, number: {}, previous number: {}, previous hashes: {:?}",
block_hash,
block_number,
self.state.current_relay_chain_block_number,
Expand Down Expand Up @@ -581,6 +581,14 @@ impl Collector {
}
self.write_parainherent_data(block_hash, block_number, ts).await?;

debug!(
"Success! new block hash: {:?}, number: {}, previous number: {}, previous hashes: {:?}",
block_hash,
block_number,
self.state.current_relay_chain_block_number,
self.state.current_relay_chain_block_hashes
);

Ok(())
}

Expand Down
50 changes: 35 additions & 15 deletions essentials/src/historical_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// along with polkadot-introspector. If not, see <http://www.gnu.org/licenses/>.
//

use tokio::sync::broadcast::error::TryRecvError;

use crate::{
api::subxt_wrapper::RequestExecutor,
chain_subscription::ChainSubscriptionEvent,
Expand Down Expand Up @@ -98,7 +100,7 @@ impl HistoricalSubscription {
) {
let mut shutdown_rx = shutdown_tx.subscribe();
let mut executor = RequestExecutor::new(retry);
const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(200);
const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(1000);
let mut heartbeat_periodic = interval_at(tokio::time::Instant::now() + HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);

let last_block_number = match executor.get_block(&url, None).await {
Expand All @@ -115,20 +117,7 @@ impl HistoricalSubscription {
}

for block_number in from_block_number..=to_block_number {
tokio::select! {
_ = shutdown_rx.recv() => {
info!("Received interrupt signal shutting down subscription");
return;
}
_ = heartbeat_periodic.tick() => {
debug!("sent heartbeat to subscribers");
let res = update_channel.send(ChainSubscriptionEvent::Heartbeat).await;
if let Err(e) = res {
info!("Event consumer has terminated: {:?}, shutting down", e);
return;
}
}
}
debug!("Subscription advacing to block #{}/#{}", block_number, to_block_number);

let message = executor.get_block_hash(&url, Some(block_number)).await;
let block_hash = match message {
Expand All @@ -144,10 +133,41 @@ impl HistoricalSubscription {
};

info!("[{}] Block imported ({:?})", url, block_hash);
if let Err(e) = update_channel.send(ChainSubscriptionEvent::NewBestHead(block_hash)).await {
info!("Event consumer has terminated: {:?}, shutting down", e);
return
}
if let Err(e) = update_channel.send(ChainSubscriptionEvent::NewFinalizedBlock(block_hash)).await {
info!("Event consumer has terminated: {:?}, shutting down", e);
return
}

match shutdown_rx.try_recv() {
Err(TryRecvError::Closed) | Ok(_) => {
info!("Received interrupt signal shutting down subscription");
return
},
_ => {},
}
}

loop {
// We wait here for termination.
tokio::select! {
_ = shutdown_rx.recv() => {
info!("Received interrupt signal shutting down subscription");
return;
}
_ = heartbeat_periodic.tick() => {
debug!("sent heartbeat to subscribers");
let res = update_channel.send(ChainSubscriptionEvent::Heartbeat).await;
if let Err(e) = res {
info!("Event consumer has terminated: {:?}, shutting down", e);
return;
}
heartbeat_periodic = interval_at(tokio::time::Instant::now() + HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);
}
}
}
}

Expand Down

0 comments on commit d116675

Please sign in to comment.