Skip to content

Commit

Permalink
Merge pull request #76 from kostekIV/fix/rotating-party
Browse files Browse the repository at this point in the history
Fix rotating party.
  • Loading branch information
michalseweryn authored Jul 2, 2021
2 parents aec6cfd + 3ab6b2c commit 4129d6a
Showing 1 changed file with 28 additions and 23 deletions.
51 changes: 28 additions & 23 deletions finality-aleph/src/party.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,6 @@ fn run_justification_handler<B: Block>(
finalization_proposals_rx
}

struct SessionInstance<B>
where
B: Block,
{
pub(crate) session: Session<AuthorityId, NumberFor<B>>,
}

struct ConsensusParty<B, N, C, BE, SC>
where
B: Block,
Expand All @@ -105,7 +98,7 @@ where
NumberFor<B>: From<u32>,
{
network: N,
sessions: HashMap<u32, SessionInstance<B>>,
sessions: HashMap<u32, Session<AuthorityId, NumberFor<B>>>,
spawn_handle: SpawnHandle,
client: Arc<C>,
select_chain: SC,
Expand All @@ -115,6 +108,8 @@ where
finalization_proposals_rx: mpsc::UnboundedReceiver<JustificationNotification<B>>,
}

/// If we are on the authority list for the given session, runs an
/// AlephBFT task and returns `true` upon completion. Otherwise, immediately returns `false`.
async fn maybe_run_session_as_authority<B, C, BE, SC>(
authority: AuthorityId,
auth_keystore: AuthorityKeystore,
Expand Down Expand Up @@ -201,6 +196,10 @@ where
break;
}
},
else => {
debug!(target: "afa", "finished party {:?} with finalized block at {:?}", session_id.0, client.info().finalized_number);
break;
}
}
}
exit_tx.send(()).expect("consensus task should not fail");
Expand Down Expand Up @@ -249,7 +248,6 @@ where
self.sessions
.get(&prev_id)
.expect("The current session should be known already")
.session
.stop_h
}
};
Expand All @@ -258,18 +256,20 @@ where
.runtime_api()
.current_session(&BlockId::Number(prev_block_number))
{
Ok(session) => session,
Ok(session) => {
self.sessions.insert(session_id, session.clone());
session
}
_ => {
error!(target: "afa", "No session found for current block #{}", 0);
return;
}
};

let current_stop_h = session.stop_h;
let client = self.client.clone();
let finalization_proposals_rx = &mut self.finalization_proposals_rx;
let proposals_task = {
let client = self.client.clone();
let current_stop_h = session.stop_h;
let finalization_proposals_rx = &mut self.finalization_proposals_rx;
async move {
while client.info().finalized_number < current_stop_h {
if let Some(proposal) = finalization_proposals_rx.next().await {
Expand All @@ -282,30 +282,35 @@ where
);
} else {
debug!(target: "afa", "the channel of proposed blocks closed unexpectedly");
return true;
break;
}
}
true
}
};

// returns true if we participated in the session
let session_task = maybe_run_session_as_authority(
self.authority.clone(),
self.auth_keystore.clone(),
client.clone(),
self.client.clone(),
session_manger,
session,
self.spawn_handle.clone(),
self.select_chain.clone(),
);

let mut tasks: FuturesUnordered<_> =
vec![proposals_task.left_future(), session_task.right_future()]
.into_iter()
.collect();
// wait for the first task returning true
tasks.next().await;
//tasks.filter(|b| std::future::ready(*b)).next().await;
// We run concurrently `proposal_task` and `session_task` until either
// * `proposal_tasks` terminates, or
// * `session_task` terminates AND returns true.

let tasks: FuturesUnordered<_> = vec![
proposals_task.map(|_| true).left_future(),
session_task.right_future(),
]
.into_iter()
.collect();

tasks.filter(|b| std::future::ready(*b)).next().await;
}

async fn run(mut self) {
Expand Down

0 comments on commit 4129d6a

Please sign in to comment.