From 3ab6b2c7a54901583bf74fb4d0e006cc6abe6b4e Mon Sep 17 00:00:00 2001 From: KostekIV Date: Wed, 30 Jun 2021 20:35:55 +0200 Subject: [PATCH] Fix rotating party. --- finality-aleph/src/party.rs | 51 ++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/finality-aleph/src/party.rs b/finality-aleph/src/party.rs index d58c8904f7..a74e9e251b 100644 --- a/finality-aleph/src/party.rs +++ b/finality-aleph/src/party.rs @@ -87,13 +87,6 @@ fn run_justification_handler( finalization_proposals_rx } -struct SessionInstance -where - B: Block, -{ - pub(crate) session: Session>, -} - struct ConsensusParty where B: Block, @@ -105,7 +98,7 @@ where NumberFor: From, { network: N, - sessions: HashMap>, + sessions: HashMap>>, spawn_handle: SpawnHandle, client: Arc, select_chain: SC, @@ -115,6 +108,8 @@ where finalization_proposals_rx: mpsc::UnboundedReceiver>, } +/// If we are on the authority list for the given session, runs an +/// AlephBFT task and returns `true` upon completion. Otherwise, immediately returns `false`. async fn maybe_run_session_as_authority( authority: AuthorityId, auth_keystore: AuthorityKeystore, @@ -201,6 +196,10 @@ where break; } }, + else => { + debug!(target: "afa", "finished party {:?} with finalized block at {:?}", session_id.0, client.info().finalized_number); + break; + } } } exit_tx.send(()).expect("consensus task should not fail"); @@ -249,7 +248,6 @@ where self.sessions .get(&prev_id) .expect("The current session should be known already") - .session .stop_h } }; @@ -258,18 +256,20 @@ where .runtime_api() .current_session(&BlockId::Number(prev_block_number)) { - Ok(session) => session, + Ok(session) => { + self.sessions.insert(session_id, session.clone()); + session + } _ => { error!(target: "afa", "No session found for current block #{}", 0); return; } }; - let current_stop_h = session.stop_h; - let client = self.client.clone(); - let finalization_proposals_rx = &mut self.finalization_proposals_rx; let proposals_task = { let client = self.client.clone(); + let current_stop_h = session.stop_h; + let finalization_proposals_rx = &mut self.finalization_proposals_rx; async move { while client.info().finalized_number < current_stop_h { if let Some(proposal) = finalization_proposals_rx.next().await { @@ -282,30 +282,35 @@ where ); } else { debug!(target: "afa", "the channel of proposed blocks closed unexpectedly"); - return true; + break; } } - true } }; + // returns true if we participated in the session let session_task = maybe_run_session_as_authority( self.authority.clone(), self.auth_keystore.clone(), - client.clone(), + self.client.clone(), session_manger, session, self.spawn_handle.clone(), self.select_chain.clone(), ); - let mut tasks: FuturesUnordered<_> = - vec![proposals_task.left_future(), session_task.right_future()] - .into_iter() - .collect(); - // wait for the first task returning true - tasks.next().await; - //tasks.filter(|b| std::future::ready(*b)).next().await; + // We run concurrently `proposal_task` and `session_task` until either + // * `proposal_tasks` terminates, or + // * `session_task` terminates AND returns true. + + let tasks: FuturesUnordered<_> = vec![ + proposals_task.map(|_| true).left_future(), + session_task.right_future(), + ] + .into_iter() + .collect(); + + tasks.filter(|b| std::future::ready(*b)).next().await; } async fn run(mut self) {