diff --git a/linera-base/src/data_types.rs b/linera-base/src/data_types.rs index 5bc7a9e8a83..a620aafa95e 100644 --- a/linera-base/src/data_types.rs +++ b/linera-base/src/data_types.rs @@ -1092,7 +1092,7 @@ impl Blob { self.content.into_bytes() } - /// Loads data blob content from a file. + /// Loads data blob from a file. pub async fn load_data_blob_from_file(path: impl AsRef) -> io::Result { Ok(Self::new_data(fs::read(path)?)) } diff --git a/linera-chain/src/manager.rs b/linera-chain/src/manager.rs index 537a39bec7f..f2ee8435312 100644 --- a/linera-chain/src/manager.rs +++ b/linera-chain/src/manager.rs @@ -624,9 +624,6 @@ pub struct ChainManagerInfo { /// The timestamp when the current round times out. #[debug(skip_if = Option::is_none)] pub round_timeout: Option, - /// These are blobs belonging to the locked block. - #[debug(skip_if = Vec::is_empty)] - pub locked_blobs: Vec, } impl From<&ChainManager> for ChainManagerInfo { @@ -650,7 +647,6 @@ impl From<&ChainManager> for ChainManagerInfo { current_round, leader: manager.round_leader(current_round).cloned(), round_timeout: manager.round_timeout, - locked_blobs: Vec::new(), } } } @@ -660,7 +656,6 @@ impl ChainManagerInfo { pub fn add_values(&mut self, manager: &ChainManager) { self.requested_proposed = manager.proposed.clone().map(Box::new); self.requested_locked = manager.locked.clone().map(Box::new); - self.locked_blobs = manager.locked_blobs.values().cloned().collect(); self.requested_confirmed = manager .confirmed_vote .as_ref() diff --git a/linera-core/src/chain_worker/actor.rs b/linera-core/src/chain_worker/actor.rs index 269e41e28c9..0be483ac42d 100644 --- a/linera-core/src/chain_worker/actor.rs +++ b/linera-core/src/chain_worker/actor.rs @@ -14,7 +14,7 @@ use linera_base::{ crypto::CryptoHash, data_types::{Blob, BlockHeight, Timestamp, UserApplicationDescription}, hashed::Hashed, - identifiers::{ChainId, UserApplicationId}, + identifiers::{BlobId, ChainId, UserApplicationId}, }; use linera_chain::{ data_types::{Block, BlockProposal, ExecutedBlock, MessageBundle, Origin, Target}, @@ -142,6 +142,13 @@ where callback: oneshot::Sender>, }, + /// Get a blob if it belongs to the current locked block or pending proposal. + DownloadPendingBlob { + blob_id: BlobId, + #[debug(skip)] + callback: oneshot::Sender>, + }, + /// Update the received certificate trackers to at least the given values. UpdateReceivedCertificateTrackers { new_trackers: BTreeMap, @@ -330,6 +337,9 @@ where ChainWorkerRequest::HandleChainInfoQuery { query, callback } => callback .send(self.worker.handle_chain_info_query(query).await) .is_ok(), + ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } => callback + .send(self.worker.download_pending_blob(blob_id).await) + .is_ok(), ChainWorkerRequest::UpdateReceivedCertificateTrackers { new_trackers, callback, diff --git a/linera-core/src/chain_worker/state/mod.rs b/linera-core/src/chain_worker/state/mod.rs index 6fe35048cff..abc0e08cb98 100644 --- a/linera-core/src/chain_worker/state/mod.rs +++ b/linera-core/src/chain_worker/state/mod.rs @@ -294,6 +294,18 @@ where Ok((response, actions)) } + /// Returns the requested blob, if it belongs to the current locked block or pending proposal. + pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result { + let manager = self.chain.manager.get(); + manager + .proposed + .as_ref() + .and_then(|proposal| proposal.blobs.iter().find(|blob| blob.id() == blob_id)) + .or_else(|| manager.locked_blobs.get(&blob_id)) + .cloned() + .ok_or_else(|| WorkerError::BlobsNotFound(vec![blob_id])) + } + /// Ensures that the current chain is active, returning an error otherwise. fn ensure_is_active(&mut self) -> Result<(), WorkerError> { if !self.knows_chain_is_active { diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index a0a3c755392..30bc6f69789 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -1766,8 +1766,7 @@ where } if let Some(cert) = info.manager.requested_locked { let hash = cert.hash(); - let blobs = info.manager.locked_blobs.clone(); - if let Err(err) = self.process_certificate(*cert.clone(), blobs).await { + if let Err(err) = self.try_process_locked_block_from(remote_node, cert).await { warn!( "Skipping certificate {hash} from validator {}: {err}", remote_node.name @@ -1777,6 +1776,30 @@ where Ok(()) } + async fn try_process_locked_block_from( + &self, + remote_node: &RemoteNode, + certificate: Box>, + ) -> Result<(), ChainClientError> { + let chain_id = certificate.inner().chain_id(); + match self.process_certificate(*certificate.clone(), vec![]).await { + Err(LocalNodeError::BlobsNotFound(blob_ids)) => { + let mut blobs = Vec::new(); + for blob_id in blob_ids { + let blob_content = remote_node + .node + .download_pending_blob(chain_id, blob_id) + .await?; + blobs.push(Blob::new(blob_content)); + } + self.process_certificate(*certificate, blobs).await?; + Ok(()) + } + Err(err) => Err(err.into()), + Ok(()) => Ok(()), + } + } + /// Downloads and processes from the specified validator a confirmed block certificates that /// use the given blobs. If this succeeds, the blob will be in our storage. async fn update_local_node_with_blobs_from( diff --git a/linera-core/src/node.rs b/linera-core/src/node.rs index 8c3bc6f3143..91ce36173c3 100644 --- a/linera-core/src/node.rs +++ b/linera-core/src/node.rs @@ -102,12 +102,20 @@ pub trait ValidatorNode { /// Subscribes to receiving notifications for a collection of chains. async fn subscribe(&self, chains: Vec) -> Result; - // Uploads a blob content. Returns an error if the validator has not seen a + // Uploads a blob. Returns an error if the validator has not seen a // certificate using this blob. async fn upload_blob(&self, content: BlobContent) -> Result; + /// Downloads a blob. Returns an error if the validator does not have the blob. async fn download_blob(&self, blob_id: BlobId) -> Result; + /// Downloads a blob that belongs to a pending proposal or the locked block on a chain. + async fn download_pending_blob( + &self, + chain_id: ChainId, + blob_id: BlobId, + ) -> Result; + async fn download_certificate( &self, hash: CryptoHash, diff --git a/linera-core/src/unit_tests/test_utils.rs b/linera-core/src/unit_tests/test_utils.rs index 09b937593fc..fd6b12d57c5 100644 --- a/linera-core/src/unit_tests/test_utils.rs +++ b/linera-core/src/unit_tests/test_utils.rs @@ -189,6 +189,17 @@ where .await } + async fn download_pending_blob( + &self, + chain_id: ChainId, + blob_id: BlobId, + ) -> Result { + self.spawn_and_receive(move |validator, sender| { + validator.do_download_pending_blob(chain_id, blob_id, sender) + }) + .await + } + async fn download_certificate( &self, hash: CryptoHash, @@ -490,6 +501,21 @@ where sender.send(blob.map(|blob| blob.into_content())) } + async fn do_download_pending_blob( + self, + chain_id: ChainId, + blob_id: BlobId, + sender: oneshot::Sender>, + ) -> Result<(), Result> { + let validator = self.client.lock().await; + let result = validator + .state + .download_pending_blob(chain_id, blob_id) + .await + .map_err(Into::into); + sender.send(result.map(|blob| blob.into_content())) + } + async fn do_download_certificate( self, hash: CryptoHash, diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 2ae841e8998..41a6168502f 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -923,6 +923,32 @@ where result } + #[instrument(skip_all, fields( + nick = self.nickname, + chain_id = format!("{:.8}", chain_id) + ))] + pub async fn download_pending_blob( + &self, + chain_id: ChainId, + blob_id: BlobId, + ) -> Result { + trace!( + "{} <-- download_pending_blob({chain_id:8}, {blob_id:8})", + self.nickname + ); + let result = self + .query_chain_worker(chain_id, move |callback| { + ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } + }) + .await; + trace!( + "{} --> {:?}", + self.nickname, + result.as_ref().map(|_| blob_id) + ); + result + } + #[instrument(skip_all, fields( nick = self.nickname, chain_id = format!("{:.8}", request.target_chain_id()) diff --git a/linera-execution/src/lib.rs b/linera-execution/src/lib.rs index 4af459ba7f4..ec234233ea7 100644 --- a/linera-execution/src/lib.rs +++ b/linera-execution/src/lib.rs @@ -618,7 +618,7 @@ pub trait BaseRuntime { /// owner, not a super owner. fn assert_before(&mut self, timestamp: Timestamp) -> Result<(), ExecutionError>; - /// Reads a data blob content specified by a given hash. + /// Reads a data blob specified by a given hash. fn read_data_blob(&mut self, hash: &CryptoHash) -> Result, ExecutionError>; /// Asserts the existence of a data blob with the given hash. diff --git a/linera-rpc/proto/rpc.proto b/linera-rpc/proto/rpc.proto index 6420a48d03a..19d1ad66794 100644 --- a/linera-rpc/proto/rpc.proto +++ b/linera-rpc/proto/rpc.proto @@ -31,6 +31,9 @@ service ValidatorWorker { // Handle information queries for this chain. rpc HandleChainInfoQuery(ChainInfoQuery) returns (ChainInfoResult); + // Download a blob that belongs to a pending block on the given chain. + rpc DownloadPendingBlob(PendingBlobRequest) returns (PendingBlobResult); + // Handle a (trusted!) cross-chain request. rpc HandleCrossChainRequest(CrossChainRequest) returns (google.protobuf.Empty); } @@ -61,23 +64,26 @@ service ValidatorNode { // Request the genesis configuration hash of the network this node is part of. rpc GetGenesisConfigHash(google.protobuf.Empty) returns (CryptoHash); - // Downloads a blob content. + // Download a blob. rpc DownloadBlob(BlobId) returns (BlobContent); - // Uploads a blob content. Returns an error if the validator has not seen a + // Download a blob that belongs to a pending block on the given chain. + rpc DownloadPendingBlob(PendingBlobRequest) returns (PendingBlobResult); + + // Upload a blob. Returns an error if the validator has not seen a // certificate using this blob. rpc UploadBlob(BlobContent) returns (BlobId); - // Downloads a certificate. + // Download a certificate. rpc DownloadCertificate(CryptoHash) returns (Certificate); - // Downloads a batch of certificates. + // Download a batch of certificates. rpc DownloadCertificates(CertificatesBatchRequest) returns (CertificatesBatchResponse); - // Returns the hash of the `Certificate` that last used a blob. + // Return the hash of the `Certificate` that last used a blob. rpc BlobLastUsedBy(BlobId) returns (CryptoHash); - // Returns the `BlobId`s that are not contained as `Blob`. + // Return the `BlobId`s that are not contained as `Blob`. rpc MissingBlobIds(BlobIds) returns (BlobIds); } @@ -263,6 +269,21 @@ message HandleConfirmedCertificateRequest { bool wait_for_outgoing_messages = 3; } +// A request for a pending blob. +message PendingBlobRequest { + ChainId chain_id = 1; + BlobId blob_id = 2; +} + +// A requested pending blob, or an error. +message PendingBlobResult { + oneof inner { + BlobContent blob = 1; + // a bincode wrapper around `NodeError` + bytes error = 2; + } +} + // A certified statement from the committee. message Certificate { // The certified value diff --git a/linera-rpc/src/client.rs b/linera-rpc/src/client.rs index 727c86fa0ab..7cc2de0dacc 100644 --- a/linera-rpc/src/client.rs +++ b/linera-rpc/src/client.rs @@ -190,6 +190,25 @@ impl ValidatorNode for Client { }) } + async fn download_pending_blob( + &self, + chain_id: ChainId, + blob_id: BlobId, + ) -> Result { + Ok(match self { + Client::Grpc(grpc_client) => { + grpc_client.download_pending_blob(chain_id, blob_id).await? + } + + #[cfg(with_simple_network)] + Client::Simple(simple_client) => { + simple_client + .download_pending_blob(chain_id, blob_id) + .await? + } + }) + } + async fn download_certificate( &self, hash: CryptoHash, diff --git a/linera-rpc/src/grpc/client.rs b/linera-rpc/src/grpc/client.rs index 199162dce77..dc0b2291203 100644 --- a/linera-rpc/src/grpc/client.rs +++ b/linera-rpc/src/grpc/client.rs @@ -32,10 +32,7 @@ use { }; use super::{ - api::{ - self, chain_info_result::Inner, validator_node_client::ValidatorNodeClient, - SubscriptionRequest, - }, + api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest}, transport, GRPC_MAX_MESSAGE_SIZE, }; use crate::{ @@ -146,22 +143,40 @@ impl GrpcClient { fn try_into_chain_info( result: api::ChainInfoResult, ) -> Result { - let inner = result.inner.ok_or(NodeError::GrpcError { + let inner = result.inner.ok_or_else(|| NodeError::GrpcError { error: "missing body from response".to_string(), })?; match inner { - Inner::ChainInfoResponse(response) => { + api::chain_info_result::Inner::ChainInfoResponse(response) => { Ok(response.try_into().map_err(|err| NodeError::GrpcError { error: format!("failed to unmarshal response: {}", err), })?) } - Inner::Error(error) => { - Err( - bincode::deserialize(&error).map_err(|err| NodeError::GrpcError { - error: format!("failed to unmarshal error message: {}", err), - })?, - ) + api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error) + .map_err(|err| NodeError::GrpcError { + error: format!("failed to unmarshal error message: {}", err), + })?), + } + } +} + +impl TryFrom for BlobContent { + type Error = NodeError; + + fn try_from(result: api::PendingBlobResult) -> Result { + let inner = result.inner.ok_or_else(|| NodeError::GrpcError { + error: "missing body from response".to_string(), + })?; + match inner { + api::pending_blob_result::Inner::Blob(blob) => { + Ok(blob.try_into().map_err(|err| NodeError::GrpcError { + error: format!("failed to unmarshal response: {}", err), + })?) } + api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error) + .map_err(|err| NodeError::GrpcError { + error: format!("failed to unmarshal error message: {}", err), + })?), } } } @@ -359,6 +374,16 @@ impl ValidatorNode for GrpcClient { Ok(client_delegate!(self, download_blob, req)?.try_into()?) } + #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))] + async fn download_pending_blob( + &self, + chain_id: ChainId, + blob_id: BlobId, + ) -> Result { + let req = api::PendingBlobRequest::try_from((chain_id, blob_id))?; + client_delegate!(self, download_pending_blob, req)?.try_into() + } + #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))] async fn download_certificate( &self, @@ -459,12 +484,12 @@ impl mass_client::MassClient for GrpcClient { .inner .ok_or(GrpcProtoConversionError::MissingField)? { - Inner::ChainInfoResponse(chain_info_response) => { + api::chain_info_result::Inner::ChainInfoResponse(chain_info_response) => { Ok(Some(RpcMessage::ChainInfoResponse(Box::new( chain_info_response.try_into()?, )))) } - Inner::Error(error) => { + api::chain_info_result::Inner::Error(error) => { let error = bincode::deserialize::(&error) .map_err(GrpcProtoConversionError::BincodeError)?; tracing::error!(?error, "received error response"); diff --git a/linera-rpc/src/grpc/conversions.rs b/linera-rpc/src/grpc/conversions.rs index 612b5e57d9d..f99f29a57b9 100644 --- a/linera-rpc/src/grpc/conversions.rs +++ b/linera-rpc/src/grpc/conversions.rs @@ -24,7 +24,7 @@ use linera_execution::committee::ValidatorName; use thiserror::Error; use tonic::{Code, Status}; -use super::api; +use super::api::{self, PendingBlobRequest}; use crate::{ HandleConfirmedCertificateRequest, HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest, @@ -690,6 +690,49 @@ impl TryFrom for ChainInfoResponse { } } +impl TryFrom<(ChainId, BlobId)> for api::PendingBlobRequest { + type Error = GrpcProtoConversionError; + + fn try_from((chain_id, blob_id): (ChainId, BlobId)) -> Result { + Ok(Self { + chain_id: Some(chain_id.into()), + blob_id: Some(blob_id.try_into()?), + }) + } +} + +impl TryFrom for (ChainId, BlobId) { + type Error = GrpcProtoConversionError; + + fn try_from(request: PendingBlobRequest) -> Result { + Ok(( + try_proto_convert(request.chain_id)?, + try_proto_convert(request.blob_id)?, + )) + } +} + +impl TryFrom for api::PendingBlobResult { + type Error = GrpcProtoConversionError; + + fn try_from(blob: BlobContent) -> Result { + Ok(Self { + inner: Some(api::pending_blob_result::Inner::Blob(blob.try_into()?)), + }) + } +} + +impl TryFrom for api::PendingBlobResult { + type Error = GrpcProtoConversionError; + + fn try_from(node_error: NodeError) -> Result { + let error = bincode::serialize(&node_error)?; + Ok(api::PendingBlobResult { + inner: Some(api::pending_blob_result::Inner::Error(error)), + }) + } +} + impl From for api::BlockHeight { fn from(block_height: BlockHeight) -> Self { Self { diff --git a/linera-rpc/src/grpc/server.rs b/linera-rpc/src/grpc/server.rs index 7df8dfdb24d..e0fcb93cb50 100644 --- a/linera-rpc/src/grpc/server.rs +++ b/linera-rpc/src/grpc/server.rs @@ -43,6 +43,7 @@ use super::{ validator_worker_client::ValidatorWorkerClient, validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer}, BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest, LiteCertificate, + PendingBlobRequest, PendingBlobResult, }, pool::GrpcConnectionPool, GrpcError, GRPC_MAX_MESSAGE_SIZE, @@ -685,6 +686,45 @@ where } } + #[instrument( + target = "grpc_server", + skip_all, + err, + fields( + nickname = self.state.nickname(), + chain_id = ?request.get_ref().chain_id() + ) + )] + async fn download_pending_blob( + &self, + request: Request, + ) -> Result, Status> { + let start = Instant::now(); + let (chain_id, blob_id) = request.into_inner().try_into()?; + trace!(?chain_id, ?blob_id, "Download pending blob"); + match self + .state + .clone() + .download_pending_blob(chain_id, blob_id) + .await + { + Ok(blob) => { + Self::log_request_success_and_latency(start, "download_pending_blob"); + Ok(Response::new(blob.into_content().try_into()?)) + } + Err(error) => { + #[cfg(with_metrics)] + { + SERVER_REQUEST_ERROR + .with_label_values(&["download_pending_blob"]) + .inc(); + } + error!(nickname = self.state.nickname(), %error, "Failed to download pending blob"); + Ok(Response::new(NodeError::from(error).try_into()?)) + } + } + } + #[instrument( target = "grpc_server", skip_all, @@ -762,6 +802,12 @@ impl GrpcProxyable for ChainInfoQuery { } } +impl GrpcProxyable for PendingBlobRequest { + fn chain_id(&self) -> Option { + self.chain_id.clone()?.try_into().ok() + } +} + impl GrpcProxyable for CrossChainRequest { fn chain_id(&self) -> Option { use super::api::cross_chain_request::Inner; diff --git a/linera-rpc/src/message.rs b/linera-rpc/src/message.rs index e03eb468479..2c8cac9b1da 100644 --- a/linera-rpc/src/message.rs +++ b/linera-rpc/src/message.rs @@ -35,6 +35,7 @@ pub enum RpcMessage { ChainInfoQuery(Box), UploadBlob(Box), DownloadBlob(Box), + DownloadPendingBlob(Box<(ChainId, BlobId)>), DownloadConfirmedBlock(Box), DownloadCertificates(Vec), BlobLastUsedBy(Box), @@ -50,6 +51,7 @@ pub enum RpcMessage { GenesisConfigHashResponse(Box), UploadBlobResponse(Box), DownloadBlobResponse(Box), + DownloadPendingBlobResponse(Box), DownloadConfirmedBlockResponse(Box), DownloadCertificatesResponse(Vec), BlobLastUsedByResponse(Box), @@ -74,6 +76,7 @@ impl RpcMessage { ConfirmedCertificate(request) => request.certificate.inner().chain_id(), ChainInfoQuery(query) => query.chain_id, CrossChainRequest(request) => request.target_chain_id(), + DownloadPendingBlob(request) => request.0, Vote(_) | Error(_) | ChainInfoResponse(_) @@ -85,6 +88,7 @@ impl RpcMessage { | UploadBlobResponse(_) | DownloadBlob(_) | DownloadBlobResponse(_) + | DownloadPendingBlobResponse(_) | DownloadConfirmedBlock(_) | DownloadConfirmedBlockResponse(_) | DownloadCertificates(_) @@ -127,6 +131,8 @@ impl RpcMessage { | VersionInfoResponse(_) | GenesisConfigHashResponse(_) | UploadBlobResponse(_) + | DownloadPendingBlob(_) + | DownloadPendingBlobResponse(_) | DownloadBlobResponse(_) | DownloadConfirmedBlockResponse(_) | BlobLastUsedByResponse(_) @@ -162,7 +168,8 @@ impl TryFrom for BlobContent { type Error = NodeError; fn try_from(message: RpcMessage) -> Result { match message { - RpcMessage::DownloadBlobResponse(blob) => Ok(*blob), + RpcMessage::DownloadBlobResponse(blob) + | RpcMessage::DownloadPendingBlobResponse(blob) => Ok(*blob), RpcMessage::Error(error) => Err(*error), _ => Err(NodeError::UnexpectedMessage), } diff --git a/linera-rpc/src/simple/client.rs b/linera-rpc/src/simple/client.rs index a9bb261f5a7..03e227609ca 100644 --- a/linera-rpc/src/simple/client.rs +++ b/linera-rpc/src/simple/client.rs @@ -171,6 +171,17 @@ impl ValidatorNode for SimpleClient { .await } + async fn download_pending_blob( + &self, + chain_id: ChainId, + blob_id: BlobId, + ) -> Result { + self.query(RpcMessage::DownloadPendingBlob(Box::new(( + chain_id, blob_id, + )))) + .await + } + async fn download_certificate( &self, hash: CryptoHash, diff --git a/linera-rpc/src/simple/server.rs b/linera-rpc/src/simple/server.rs index 81f7b48b93a..387347247a0 100644 --- a/linera-rpc/src/simple/server.rs +++ b/linera-rpc/src/simple/server.rs @@ -339,6 +339,24 @@ where // No user to respond to. Ok(None) } + RpcMessage::DownloadPendingBlob(request) => { + let (chain_id, blob_id) = *request; + match self + .server + .state + .download_pending_blob(chain_id, blob_id) + .await + { + Ok(blob) => Ok(Some(RpcMessage::DownloadPendingBlobResponse(Box::new( + blob.into(), + )))), + Err(error) => { + let nickname = self.server.state.nickname(); + error!(nickname, %error, "Failed to handle pending blob request"); + Err(error.into()) + } + } + } RpcMessage::VersionInfoQuery => { Ok(Some(RpcMessage::VersionInfoResponse(Box::default()))) @@ -352,6 +370,7 @@ where | RpcMessage::GenesisConfigHashResponse(_) | RpcMessage::DownloadBlob(_) | RpcMessage::DownloadBlobResponse(_) + | RpcMessage::DownloadPendingBlobResponse(_) | RpcMessage::DownloadConfirmedBlock(_) | RpcMessage::DownloadConfirmedBlockResponse(_) | RpcMessage::BlobLastUsedBy(_) diff --git a/linera-rpc/tests/snapshots/format__format.yaml.snap b/linera-rpc/tests/snapshots/format__format.yaml.snap index 3f1c17018f1..74289dea678 100644 --- a/linera-rpc/tests/snapshots/format__format.yaml.snap +++ b/linera-rpc/tests/snapshots/format__format.yaml.snap @@ -285,9 +285,6 @@ ChainManagerInfo: - round_timeout: OPTION: TYPENAME: Timestamp - - locked_blobs: - SEQ: - TYPENAME: BlobContent ChainOwnership: STRUCT: - super_owners: @@ -806,74 +803,84 @@ RpcMessage: NEWTYPE: TYPENAME: BlobId 8: + DownloadPendingBlob: + NEWTYPE: + TUPLE: + - TYPENAME: ChainId + - TYPENAME: BlobId + 9: DownloadConfirmedBlock: NEWTYPE: TYPENAME: CryptoHash - 9: + 10: DownloadCertificates: NEWTYPE: SEQ: TYPENAME: CryptoHash - 10: + 11: BlobLastUsedBy: NEWTYPE: TYPENAME: BlobId - 11: + 12: MissingBlobIds: NEWTYPE: SEQ: TYPENAME: BlobId - 12: - VersionInfoQuery: UNIT 13: - GenesisConfigHashQuery: UNIT + VersionInfoQuery: UNIT 14: + GenesisConfigHashQuery: UNIT + 15: Vote: NEWTYPE: TYPENAME: LiteVote - 15: + 16: ChainInfoResponse: NEWTYPE: TYPENAME: ChainInfoResponse - 16: + 17: Error: NEWTYPE: TYPENAME: NodeError - 17: + 18: VersionInfoResponse: NEWTYPE: TYPENAME: VersionInfo - 18: + 19: GenesisConfigHashResponse: NEWTYPE: TYPENAME: CryptoHash - 19: + 20: UploadBlobResponse: NEWTYPE: TYPENAME: BlobId - 20: + 21: DownloadBlobResponse: NEWTYPE: TYPENAME: BlobContent - 21: + 22: + DownloadPendingBlobResponse: + NEWTYPE: + TYPENAME: BlobContent + 23: DownloadConfirmedBlockResponse: NEWTYPE: TYPENAME: ExecutedBlock - 22: + 24: DownloadCertificatesResponse: NEWTYPE: SEQ: TYPENAME: ConfirmedBlockCertificate - 23: + 25: BlobLastUsedByResponse: NEWTYPE: TYPENAME: CryptoHash - 24: + 26: MissingBlobIdsResponse: NEWTYPE: SEQ: TYPENAME: BlobId - 25: + 27: CrossChainRequest: NEWTYPE: TYPENAME: CrossChainRequest diff --git a/linera-service/src/proxy/grpc.rs b/linera-service/src/proxy/grpc.rs index 8e186e2a740..04783e8b771 100644 --- a/linera-service/src/proxy/grpc.rs +++ b/linera-service/src/proxy/grpc.rs @@ -34,7 +34,8 @@ use linera_rpc::{ validator_worker_client::ValidatorWorkerClient, BlobContent, BlobId, BlobIds, BlockProposal, Certificate, CertificatesBatchRequest, CertificatesBatchResponse, ChainInfoQuery, ChainInfoResult, CryptoHash, - LiteCertificate, Notification, SubscriptionRequest, VersionInfo, + LiteCertificate, Notification, PendingBlobRequest, PendingBlobResult, + SubscriptionRequest, VersionInfo, }, pool::GrpcConnectionPool, GrpcProtoConversionError, GrpcProxyable, GRPC_CHUNKED_MESSAGE_FILL_LIMIT, @@ -502,6 +503,31 @@ where Ok(Response::new(blob.into_content().try_into()?)) } + #[instrument(skip_all, err(Display))] + async fn download_pending_blob( + &self, + request: Request, + ) -> Result, Status> { + let (mut client, inner) = self.worker_client(request).await?; + #[cfg_attr(not(with_metrics), expect(clippy::needless_match))] + match client.download_pending_blob(inner).await { + Ok(blob_result) => { + #[cfg(with_metrics)] + PROXY_REQUEST_SUCCESS + .with_label_values(&["download_pending_blob"]) + .inc(); + Ok(blob_result) + } + Err(status) => { + #[cfg(with_metrics)] + PROXY_REQUEST_ERROR + .with_label_values(&["download_pending_blob"]) + .inc(); + Err(status) + } + } + } + #[instrument(skip_all, err(Display))] async fn download_certificate( &self, diff --git a/linera-service/src/proxy/main.rs b/linera-service/src/proxy/main.rs index bb22fcbf7e5..5a1c7168493 100644 --- a/linera-service/src/proxy/main.rs +++ b/linera-service/src/proxy/main.rs @@ -344,6 +344,8 @@ where | VersionInfoResponse(_) | GenesisConfigHashResponse(_) | DownloadBlobResponse(_) + | DownloadPendingBlob(_) + | DownloadPendingBlobResponse(_) | BlobLastUsedByResponse(_) | MissingBlobIdsResponse(_) | DownloadConfirmedBlockResponse(_) diff --git a/linera-service/src/schema_export.rs b/linera-service/src/schema_export.rs index 366375bdfc3..7105224fcd5 100644 --- a/linera-service/src/schema_export.rs +++ b/linera-service/src/schema_export.rs @@ -84,6 +84,10 @@ impl ValidatorNode for DummyValidatorNode { Err(NodeError::UnexpectedMessage) } + async fn download_pending_blob(&self, _: ChainId, _: BlobId) -> Result { + Err(NodeError::UnexpectedMessage) + } + async fn subscribe(&self, _: Vec) -> Result { Err(NodeError::UnexpectedMessage) }