Skip to content

Commit

Permalink
linera-client: structured chain_listener
Browse files Browse the repository at this point in the history
  • Loading branch information
Twey committed Jul 15, 2024
1 parent a07f795 commit 9fd1872
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 161 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async-graphql = "=7.0.2"
async-graphql-axum = "=7.0.2"
async-graphql-derive = "=7.0.2"
async-lock = "3.3.0"
async-stream = "0.3.5"
async-trait = "0.1.77"
async-tungstenite = { version = "0.22", features = ["tokio-runtime"] }
aws-config = "1.1.7"
Expand Down
1 change: 1 addition & 0 deletions linera-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ kubernetes = []
[dependencies]
anyhow.workspace = true
async-graphql.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bcs.workspace = true
chrono = { workspace = true, features = ["clock"] }
Expand Down
276 changes: 122 additions & 154 deletions linera-client/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use std::{collections::HashSet, sync::Arc, time::Duration};

use async_trait::async_trait;
use futures::{
future::{self, Either, FutureExt as _},
future::{self, Either},
lock::Mutex,
StreamExt,
stream::{self, Stream, StreamExt as _},
};
use linera_base::{
crypto::KeyPair,
Expand All @@ -18,12 +18,11 @@ use linera_chain::data_types::OutgoingMessage;
use linera_core::{
client::{ChainClient, Client},
node::{ValidatorNode, ValidatorNodeProvider},
worker::Reason,
worker::{Notification, Reason},
};
use linera_execution::{Message, SystemMessage};
use linera_storage::Storage;
use linera_views::views::ViewError;
use tracing::{error, info, warn};

use crate::wallet::Wallet;

Expand Down Expand Up @@ -76,154 +75,123 @@ pub trait ClientContext: Send {
ViewError: From<<Self::Storage as Storage>::StoreError>;
}

/// A `ChainListener` is a process that listens to notifications from validators and reacts
/// appropriately.
pub async fn chain_listener<C>(
/// Return a stream of notifications from a chain, handling round timeouts as appropriate.
async fn listener<P, S>(
config: ChainListenerConfig,
context: Arc<Mutex<C>>,
)
chain_client: ChainClient<P, S>,
) -> anyhow::Result<impl Stream<Item = Notification>>
where
C: ClientContext + Send + 'static,
ViewError: From<<C::Storage as linera_storage::Storage>::StoreError>,
P: ValidatorNodeProvider + Send + Sync + 'static,
S: Storage + Clone + Send + Sync + 'static,
ViewError: From<S::StoreError>,
{
let (chain_ids, storage) = {
let context_guard = context.lock().await;
(
context_guard.wallet().chain_ids(),
context_guard.client().storage_client().clone(),
)
};
let running: Arc<Mutex<HashSet<ChainId>>> = Arc::default();
for chain_id in chain_ids {
run_with_chain_id(
chain_id,
context.clone(),
storage.clone(),
config.clone(),
running.clone(),
);
}
}
let storage = chain_client.storage_client();
let (listener, _aborter, mut notifications) = chain_client.listen().await?;
let mut listener = Box::pin(listener);
let mut timeout = storage.clock().current_time();

fn run_with_chain_id<C>(
chain_id: ChainId,
context: Arc<Mutex<C>>,
storage: C::Storage,
config: ChainListenerConfig,
running: Arc<Mutex<HashSet<ChainId>>>,
) where
C: ClientContext + Send + 'static,
ViewError: From<<C::Storage as linera_storage::Storage>::StoreError>,
{
let _handle = tokio::task::spawn(async move {
if let Err(err) =
run_client_stream(chain_id, context, storage, config, running).await
{
error!("Stream for chain {} failed: {}", chain_id, err);
}
});
}

async fn run_client_stream<C>(
chain_id: ChainId,
context: Arc<Mutex<C>>,
storage: C::Storage,
config: ChainListenerConfig,
running: Arc<Mutex<HashSet<ChainId>>>,
) -> anyhow::Result<()>
where
C: ClientContext + Send + 'static,
ViewError: From<<C::Storage as linera_storage::Storage>::StoreError>,
{
let chain_client = if running.lock().await.contains(&chain_id) {
return Ok(());
} else {
context.lock().await.make_chain_client(chain_id)
};
let (listener, listen_handle, local_stream) = chain_client.listen().await?;
let ((), ()) = futures::try_join!(
listener.map(Ok),
process_notifications(
local_stream,
listen_handle,
chain_client,
context,
storage,
config,
running,
)
)?;
Ok(())
}

async fn process_notifications<C>(
mut local_stream: impl futures::Stream<Item = linera_core::worker::Notification> + Unpin,
_listen_handle: linera_core::client::AbortOnDrop,
chain_client: ChainClient<C::ValidatorNodeProvider, C::Storage>,
context: Arc<Mutex<C>>,
storage: C::Storage,
config: ChainListenerConfig,
running: Arc<Mutex<HashSet<ChainId>>>,
) -> anyhow::Result<()>
where
C: ClientContext + Send + 'static,
ViewError: From<<C::Storage as linera_storage::Storage>::StoreError>,
{
let mut timeout = storage.clock().current_time();
Ok(async_stream::stream! {
loop {
let sleep = Box::pin(storage.clock().sleep_until(timeout));
let notification = match future::select(local_stream.next(), sleep).await {
Either::Left((Some(notification), _)) => notification,
Either::Left((None, _)) => return Ok(()),
Either::Right(((), _)) => {
match chain_client.process_inbox_if_owned().await {
Err(error) => {
warn!(%error, "Failed to process inbox.");
timeout = Timestamp::from(u64::MAX);
}
Ok((_, None)) => timeout = Timestamp::from(u64::MAX),
Ok((_, Some(new_timeout))) => timeout = new_timeout.timestamp,
}
context.lock().await.update_wallet(&chain_client).await;
continue;
}
let next = future::select(
notifications.next(),
future::select(
Box::pin(storage.clock().sleep_until(timeout)),
&mut listener,
),
);

let Either::Left((item, _)) = next.await else {
timeout = next_inbox_timeout(&chain_client).await;
continue;
};
info!("Received new notification: {:?}", notification);

let Some(notification) = item else { break };

maybe_sleep(config.delay_before_ms).await;
match &notification.reason {
match notification.reason {
Reason::NewIncomingMessage { .. } => timeout = storage.clock().current_time(),
Reason::NewBlock { .. } | Reason::NewRound { .. } => {
if let Err(error) = chain_client.update_validators().await {
warn!(
tracing::warn!(
"Failed to update validators about the local chain after \
receiving notification {:?} with error: {:?}",
notification, error
receiving notification {notification:?} with error: {error:?}",
);
}
}
}
maybe_sleep(config.delay_after_ms).await;
let Reason::NewBlock { hash, .. } = notification.reason else {
continue;
};
{
context.lock().await.update_wallet(&chain_client).await;
}
let value = storage.read_hashed_certificate_value(hash).await?;
let Some(executed_block) = value.inner().executed_block() else {
error!("NewBlock notification about value without a block: {hash}");
continue;
};
let new_chains = executed_block
.messages()
.iter()
.flatten()
.filter_map(|outgoing_message| {
if let OutgoingMessage {
destination: Destination::Recipient(new_id),
message: Message::System(SystemMessage::OpenChain(open_chain_config)),
..
} = outgoing_message

yield notification;
}
})
}

/// Process the chain client's inbox to get the current round timeout.
async fn next_inbox_timeout<P, S>(chain_client: &ChainClient<P, S>) -> Timestamp
where
P: ValidatorNodeProvider + Send + Sync + 'static,
S: Storage + Clone + Send + Sync + 'static,
ViewError: From<S::StoreError>,
{
match chain_client.process_inbox_if_owned().await {
Err(error) => {
tracing::warn!(%error, "Failed to process inbox.");
Timestamp::from(u64::MAX)
}
Ok((_, None)) => Timestamp::from(u64::MAX),
Ok((_, Some(timeout))) => timeout.timestamp,
}
}

/// A ‘chain listener’ is a process that listens to notifications from validators and
/// reacts appropriately.
pub async fn chain_listener<C>(
config: ChainListenerConfig,
context: Arc<Mutex<C>>,
) -> anyhow::Result<()>
where
C: ClientContext + Send + 'static,
ViewError: From<<C::Storage as linera_storage::Storage>::StoreError>,
{
let client = context.lock().await.client();
let storage = client.storage_client().clone();
let chain_ids = context.lock().await.wallet().chain_ids();
let mut running: HashSet<ChainId> = HashSet::new();
let mut notifications = stream::SelectAll::new();

for chain_id in chain_ids {
if !running.contains(&chain_id) {
running.insert(chain_id);
notifications.push(Box::pin(listener(
config.clone(),
client.chain(chain_id)
.expect("ClientContext should have chain state for all its chains"),
).await?));
}
}

while let Some(notification) = notifications.next().await {
let chain_client = client.chain(notification.chain_id)
.expect("notifications should come from a known chain");
let Reason::NewBlock { hash, .. } = notification.reason else {
continue;
};
context.lock().await.update_wallet(&chain_client).await;
let value = storage.read_hashed_certificate_value(hash).await?;
let Some(executed_block) = value.inner().executed_block() else {
tracing::error!("NewBlock notification about value without a block: {hash}");
continue;
};
let new_chains = executed_block
.messages()
.iter()
.flatten()
.filter_map(|outgoing_message| {
if let OutgoingMessage {
destination: Destination::Recipient(new_id),
message: Message::System(SystemMessage::OpenChain(open_chain_config)),
..
} = outgoing_message
{
let keys = open_chain_config
.ownership
Expand All @@ -235,28 +203,28 @@ async fn process_notifications<C>(
} else {
None
}
})
.collect::<Vec<_>>();
if new_chains.is_empty() {
continue;
}
let mut context_guard = context.lock().await;
for (new_id, owners, timestamp) in new_chains {
let key_pair = owners
.iter()
.find_map(|public_key| context_guard.wallet().key_pair_for_pk(public_key));
context_guard.update_wallet_for_new_chain(new_id, key_pair, timestamp);
run_with_chain_id(
new_id,
context.clone(),
storage.clone(),
config.clone(),
running.clone(),
);
})
.collect::<Vec<_>>();
if new_chains.is_empty() {
continue;
}
let mut context_guard = context.lock().await;
for (new_id, owners, timestamp) in new_chains {
let key_pair = owners
.iter()
.find_map(|public_key| context_guard.wallet().key_pair_for_pk(public_key));
context_guard.update_wallet_for_new_chain(new_id, key_pair, timestamp);
let new_chain_client = client.chain(new_id).expect("notifications should come from a known chain");
if !running.contains(&new_id) {
running.insert(new_id);
notifications.push(Box::pin(listener(config.clone(), new_chain_client).await?));
}
}
}

Ok(())
}

async fn maybe_sleep(delay_ms: u64) {
if delay_ms > 0 {
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
Expand Down
2 changes: 1 addition & 1 deletion linera-client/src/unit_tests/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ async fn test_chain_listener() -> anyhow::Result<()> {
let public_key = key_pair.public();
context.update_wallet_for_new_chain(chain_id0, Some(key_pair), clock.current_time());
let context = Arc::new(Mutex::new(context));
let listener = chain_listener(config, listener.run(context)).await;
let listener = tokio::spawn(chain_listener(config, context));

Check failure on line 163 in linera-client/src/unit_tests/chain_listener.rs

View workflow job for this annotation

GitHub Actions / test

expected function, found module `chain_listener`

Check failure on line 163 in linera-client/src/unit_tests/chain_listener.rs

View workflow job for this annotation

GitHub Actions / test

expected function, found module `chain_listener`

Check failure on line 163 in linera-client/src/unit_tests/chain_listener.rs

View workflow job for this annotation

GitHub Actions / test

expected function, found module `chain_listener`

// Transfer ownership of chain 0 to the chain listener and some other key. The listener will
// be leader in ~10% of the rounds.
Expand Down
2 changes: 1 addition & 1 deletion linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ where

/// Gets a `ChainClient` for the chain with the given `chain_id` if it is currently
/// known to the [`Client`].
pub fn chain(self: Arc<Self>, chain_id: ChainId) -> Option<ChainClient<P, S>> {
pub fn chain(self: &Arc<Self>, chain_id: ChainId) -> Option<ChainClient<P, S>> {
if self.chains.contains_key(&chain_id) {
Some(ChainClient {
client: self.clone(),
Expand Down
8 changes: 3 additions & 5 deletions linera-service/src/node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use async_graphql::{
use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
use futures::{
future::{self},
future,
lock::Mutex,
Future,
};
Expand Down Expand Up @@ -1031,13 +1031,11 @@ where

info!("GraphiQL IDE: http://localhost:{}", port);

chain_listener(self.config, self.context.clone()).await;
tokio::spawn(chain_listener(self.config, self.context.clone()));
axum::serve(
tokio::net::TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], port))).await?,
app,
)
.await?;

).await?;
Ok(())
}

Expand Down

0 comments on commit 9fd1872

Please sign in to comment.