Skip to content

Commit

Permalink
Fetch locked blobs individually, not in ChainManagerInfo. (#3121)
Browse files Browse the repository at this point in the history
## Motivation

Ultimately we want to transfer all blobs separately, rather than in a
single message.
(#3048)

This PR is one step towards that goal: When the client fetches the
locked block from a validator, it now requests the corresponding blobs
one by one, rather than all at once.

## Proposal

Add a `DownloadPendingBlob` endpoint; remove the locked blobs from the
`ChainManagerInfo`.

## Test Plan

Existing tests exercise this scenario, e.g.
`test_finalize_locked_block_with_blobs`.

## Release Plan

- Nothing to do / These changes follow the usual release cycle.

## Links

- Part of #3048.
- [reviewer
checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
  • Loading branch information
afck authored Jan 13, 2025
1 parent 930557b commit 3a61111
Show file tree
Hide file tree
Showing 21 changed files with 384 additions and 54 deletions.
2 changes: 1 addition & 1 deletion linera-base/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ impl Blob {
self.content.into_bytes()
}

/// Loads data blob content from a file.
/// Loads data blob from a file.
pub async fn load_data_blob_from_file(path: impl AsRef<Path>) -> io::Result<Self> {
Ok(Self::new_data(fs::read(path)?))
}
Expand Down
5 changes: 0 additions & 5 deletions linera-chain/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,9 +624,6 @@ pub struct ChainManagerInfo {
/// The timestamp when the current round times out.
#[debug(skip_if = Option::is_none)]
pub round_timeout: Option<Timestamp>,
/// These are blobs belonging to the locked block.
#[debug(skip_if = Vec::is_empty)]
pub locked_blobs: Vec<Blob>,
}

impl From<&ChainManager> for ChainManagerInfo {
Expand All @@ -650,7 +647,6 @@ impl From<&ChainManager> for ChainManagerInfo {
current_round,
leader: manager.round_leader(current_round).cloned(),
round_timeout: manager.round_timeout,
locked_blobs: Vec::new(),
}
}
}
Expand All @@ -660,7 +656,6 @@ impl ChainManagerInfo {
pub fn add_values(&mut self, manager: &ChainManager) {
self.requested_proposed = manager.proposed.clone().map(Box::new);
self.requested_locked = manager.locked.clone().map(Box::new);
self.locked_blobs = manager.locked_blobs.values().cloned().collect();
self.requested_confirmed = manager
.confirmed_vote
.as_ref()
Expand Down
12 changes: 11 additions & 1 deletion linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use linera_base::{
crypto::CryptoHash,
data_types::{Blob, BlockHeight, Timestamp, UserApplicationDescription},
hashed::Hashed,
identifiers::{ChainId, UserApplicationId},
identifiers::{BlobId, ChainId, UserApplicationId},
};
use linera_chain::{
data_types::{Block, BlockProposal, ExecutedBlock, MessageBundle, Origin, Target},
Expand Down Expand Up @@ -142,6 +142,13 @@ where
callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
},

/// Get a blob if it belongs to the current locked block or pending proposal.
DownloadPendingBlob {
blob_id: BlobId,
#[debug(skip)]
callback: oneshot::Sender<Result<Blob, WorkerError>>,
},

/// Update the received certificate trackers to at least the given values.
UpdateReceivedCertificateTrackers {
new_trackers: BTreeMap<ValidatorName, u64>,
Expand Down Expand Up @@ -330,6 +337,9 @@ where
ChainWorkerRequest::HandleChainInfoQuery { query, callback } => callback
.send(self.worker.handle_chain_info_query(query).await)
.is_ok(),
ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } => callback
.send(self.worker.download_pending_blob(blob_id).await)
.is_ok(),
ChainWorkerRequest::UpdateReceivedCertificateTrackers {
new_trackers,
callback,
Expand Down
12 changes: 12 additions & 0 deletions linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,18 @@ where
Ok((response, actions))
}

/// Returns the requested blob, if it belongs to the current locked block or pending proposal.
pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result<Blob, WorkerError> {
let manager = self.chain.manager.get();
manager
.proposed
.as_ref()
.and_then(|proposal| proposal.blobs.iter().find(|blob| blob.id() == blob_id))
.or_else(|| manager.locked_blobs.get(&blob_id))
.cloned()
.ok_or_else(|| WorkerError::BlobsNotFound(vec![blob_id]))
}

/// Ensures that the current chain is active, returning an error otherwise.
fn ensure_is_active(&mut self) -> Result<(), WorkerError> {
if !self.knows_chain_is_active {
Expand Down
27 changes: 25 additions & 2 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1766,8 +1766,7 @@ where
}
if let Some(cert) = info.manager.requested_locked {
let hash = cert.hash();
let blobs = info.manager.locked_blobs.clone();
if let Err(err) = self.process_certificate(*cert.clone(), blobs).await {
if let Err(err) = self.try_process_locked_block_from(remote_node, cert).await {
warn!(
"Skipping certificate {hash} from validator {}: {err}",
remote_node.name
Expand All @@ -1777,6 +1776,30 @@ where
Ok(())
}

async fn try_process_locked_block_from(
&self,
remote_node: &RemoteNode<P::Node>,
certificate: Box<GenericCertificate<ValidatedBlock>>,
) -> Result<(), ChainClientError> {
let chain_id = certificate.inner().chain_id();
match self.process_certificate(*certificate.clone(), vec![]).await {
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
let mut blobs = Vec::new();
for blob_id in blob_ids {
let blob_content = remote_node
.node
.download_pending_blob(chain_id, blob_id)
.await?;
blobs.push(Blob::new(blob_content));
}
self.process_certificate(*certificate, blobs).await?;
Ok(())
}
Err(err) => Err(err.into()),
Ok(()) => Ok(()),
}
}

/// Downloads and processes from the specified validator a confirmed block certificates that
/// use the given blobs. If this succeeds, the blob will be in our storage.
async fn update_local_node_with_blobs_from(
Expand Down
10 changes: 9 additions & 1 deletion linera-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,20 @@ pub trait ValidatorNode {
/// Subscribes to receiving notifications for a collection of chains.
async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;

// Uploads a blob content. Returns an error if the validator has not seen a
// Uploads a blob. Returns an error if the validator has not seen a
// certificate using this blob.
async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;

/// Downloads a blob. Returns an error if the validator does not have the blob.
async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;

/// Downloads a blob that belongs to a pending proposal or the locked block on a chain.
async fn download_pending_blob(
&self,
chain_id: ChainId,
blob_id: BlobId,
) -> Result<BlobContent, NodeError>;

async fn download_certificate(
&self,
hash: CryptoHash,
Expand Down
26 changes: 26 additions & 0 deletions linera-core/src/unit_tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ where
.await
}

async fn download_pending_blob(
&self,
chain_id: ChainId,
blob_id: BlobId,
) -> Result<BlobContent, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_download_pending_blob(chain_id, blob_id, sender)
})
.await
}

async fn download_certificate(
&self,
hash: CryptoHash,
Expand Down Expand Up @@ -490,6 +501,21 @@ where
sender.send(blob.map(|blob| blob.into_content()))
}

async fn do_download_pending_blob(
self,
chain_id: ChainId,
blob_id: BlobId,
sender: oneshot::Sender<Result<BlobContent, NodeError>>,
) -> Result<(), Result<BlobContent, NodeError>> {
let validator = self.client.lock().await;
let result = validator
.state
.download_pending_blob(chain_id, blob_id)
.await
.map_err(Into::into);
sender.send(result.map(|blob| blob.into_content()))
}

async fn do_download_certificate(
self,
hash: CryptoHash,
Expand Down
26 changes: 26 additions & 0 deletions linera-core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,32 @@ where
result
}

#[instrument(skip_all, fields(
nick = self.nickname,
chain_id = format!("{:.8}", chain_id)
))]
pub async fn download_pending_blob(
&self,
chain_id: ChainId,
blob_id: BlobId,
) -> Result<Blob, WorkerError> {
trace!(
"{} <-- download_pending_blob({chain_id:8}, {blob_id:8})",
self.nickname
);
let result = self
.query_chain_worker(chain_id, move |callback| {
ChainWorkerRequest::DownloadPendingBlob { blob_id, callback }
})
.await;
trace!(
"{} --> {:?}",
self.nickname,
result.as_ref().map(|_| blob_id)
);
result
}

#[instrument(skip_all, fields(
nick = self.nickname,
chain_id = format!("{:.8}", request.target_chain_id())
Expand Down
2 changes: 1 addition & 1 deletion linera-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ pub trait BaseRuntime {
/// owner, not a super owner.
fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError>;

/// Reads a data blob content specified by a given hash.
/// Reads a data blob specified by a given hash.
fn read_data_blob(&mut self, hash: &CryptoHash) -> Result<Vec<u8>, ExecutionError>;

/// Asserts the existence of a data blob with the given hash.
Expand Down
33 changes: 27 additions & 6 deletions linera-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ service ValidatorWorker {
// Handle information queries for this chain.
rpc HandleChainInfoQuery(ChainInfoQuery) returns (ChainInfoResult);

// Download a blob that belongs to a pending block on the given chain.
rpc DownloadPendingBlob(PendingBlobRequest) returns (PendingBlobResult);

// Handle a (trusted!) cross-chain request.
rpc HandleCrossChainRequest(CrossChainRequest) returns (google.protobuf.Empty);
}
Expand Down Expand Up @@ -61,23 +64,26 @@ service ValidatorNode {
// Request the genesis configuration hash of the network this node is part of.
rpc GetGenesisConfigHash(google.protobuf.Empty) returns (CryptoHash);

// Downloads a blob content.
// Download a blob.
rpc DownloadBlob(BlobId) returns (BlobContent);

// Uploads a blob content. Returns an error if the validator has not seen a
// Download a blob that belongs to a pending block on the given chain.
rpc DownloadPendingBlob(PendingBlobRequest) returns (PendingBlobResult);

// Upload a blob. Returns an error if the validator has not seen a
// certificate using this blob.
rpc UploadBlob(BlobContent) returns (BlobId);

// Downloads a certificate.
// Download a certificate.
rpc DownloadCertificate(CryptoHash) returns (Certificate);

// Downloads a batch of certificates.
// Download a batch of certificates.
rpc DownloadCertificates(CertificatesBatchRequest) returns (CertificatesBatchResponse);

// Returns the hash of the `Certificate` that last used a blob.
// Return the hash of the `Certificate` that last used a blob.
rpc BlobLastUsedBy(BlobId) returns (CryptoHash);

// Returns the `BlobId`s that are not contained as `Blob`.
// Return the `BlobId`s that are not contained as `Blob`.
rpc MissingBlobIds(BlobIds) returns (BlobIds);
}

Expand Down Expand Up @@ -263,6 +269,21 @@ message HandleConfirmedCertificateRequest {
bool wait_for_outgoing_messages = 3;
}

// A request for a pending blob.
message PendingBlobRequest {
ChainId chain_id = 1;
BlobId blob_id = 2;
}

// A requested pending blob, or an error.
message PendingBlobResult {
oneof inner {
BlobContent blob = 1;
// a bincode wrapper around `NodeError`
bytes error = 2;
}
}

// A certified statement from the committee.
message Certificate {
// The certified value
Expand Down
19 changes: 19 additions & 0 deletions linera-rpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,25 @@ impl ValidatorNode for Client {
})
}

async fn download_pending_blob(
&self,
chain_id: ChainId,
blob_id: BlobId,
) -> Result<BlobContent, NodeError> {
Ok(match self {
Client::Grpc(grpc_client) => {
grpc_client.download_pending_blob(chain_id, blob_id).await?
}

#[cfg(with_simple_network)]
Client::Simple(simple_client) => {
simple_client
.download_pending_blob(chain_id, blob_id)
.await?
}
})
}

async fn download_certificate(
&self,
hash: CryptoHash,
Expand Down
Loading

0 comments on commit 3a61111

Please sign in to comment.