From 8139c408898e650548918af26e0ccf87884b335e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 7 May 2024 21:45:16 +0800 Subject: [PATCH] change: use install_snapshot_v2, remove v0 and v1 --- Cargo.lock | 8 +- Cargo.toml | 2 +- src/meta/service/src/lib.rs | 1 + .../service/src/meta_service/meta_node.rs | 4 +- src/meta/service/src/network.rs | 230 +++++++++--------- src/meta/types/src/grpc_helper.rs | 17 ++ 6 files changed, 139 insertions(+), 123 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f28ab77362806..a483948745b9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10198,9 +10198,9 @@ dependencies = [ [[package]] name = "openraft" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f29df094119e6d79274ddec81e44189b4b9f59f1a38635920151e1264b6687c" +checksum = "b3450250d289f5277dfad62c337a9e6021e7ba744ffdf7ba6605b4dd7a91dfeb" dependencies = [ "anyerror", "byte-unit", @@ -10220,9 +10220,9 @@ dependencies = [ [[package]] name = "openraft-macros" -version = "0.9.7" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c468091464f0b38c1c1a9ec72a1e582c46cf4b861b6e0c214da4c6fd96b60358" +checksum = "b92d20c7992736c364af14c9e51f03b3f2dadbf876d843ad11fdf9341118bc79" dependencies = [ "chrono", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index 88dc641931f27..99c03d3ec500b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,7 +125,7 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus # openraft for debugging #openraft = { git = "https://github.com/drmingdrmer/openraft", branch = "release-0.9", features = [ -openraft = { version = "0.9.9", features = [ +openraft = { version = "0.9.10", features = [ "serde", "tracing-log", "generic-snapshot-data", diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 0e0be4499bec3..1f7a75acd925f 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(try_blocks)] #![feature(coroutines)] #![feature(lazy_cell)] #![allow(clippy::uninlined_format_args)] diff --git a/src/meta/service/src/meta_service/meta_node.rs b/src/meta/service/src/meta_service/meta_node.rs index 25bdfdbaf41b4..71e926b39f650 100644 --- a/src/meta/service/src/meta_service/meta_node.rs +++ b/src/meta/service/src/meta_service/meta_node.rs @@ -86,7 +86,7 @@ use crate::meta_service::forwarder::MetaForwarder; use crate::meta_service::meta_leader::MetaLeader; use crate::meta_service::RaftServiceImpl; use crate::metrics::server_metrics; -use crate::network::Network; +use crate::network::NetworkFactory; use crate::request_handling::Forwarder; use crate::request_handling::Handler; use crate::store::RaftStore; @@ -202,7 +202,7 @@ impl MetaNodeBuilder { .take() .ok_or_else(|| MetaStartupError::InvalidConfig(String::from("sto is not set")))?; - let net = Network::new(sto.clone()); + let net = NetworkFactory::new(sto.clone()); let log_store = sto.clone(); let sm_store = sto.clone(); diff --git a/src/meta/service/src/network.rs b/src/meta/service/src/network.rs index d6bc49371c854..30015d1a99a1f 100644 --- a/src/meta/service/src/network.rs +++ b/src/meta/service/src/network.rs @@ -23,9 +23,12 @@ use anyerror::AnyError; use async_trait::async_trait; use backon::BackoffBuilder; use backon::ExponentialBuilder; +use databend_common_base::base::tokio::io::AsyncReadExt; +use databend_common_base::base::tokio::sync::mpsc; use databend_common_base::containers::ItemManager; use databend_common_base::containers::Pool; use databend_common_base::future::TimingFutureExt; +use databend_common_base::runtime; use databend_common_meta_sled_store::openraft; use databend_common_meta_sled_store::openraft::error::PayloadTooLarge; use databend_common_meta_sled_store::openraft::error::ReplicationClosed; @@ -33,17 +36,15 @@ use databend_common_meta_sled_store::openraft::error::Unreachable; use databend_common_meta_sled_store::openraft::network::RPCOption; use databend_common_meta_sled_store::openraft::MessageSummary; use databend_common_meta_sled_store::openraft::RaftNetworkFactory; +use databend_common_meta_sled_store::openraft::StorageError; use databend_common_meta_types::protobuf::RaftRequest; -use databend_common_meta_types::protobuf::SnapshotChunkRequest; +use databend_common_meta_types::protobuf::SnapshotChunkRequestV2; use databend_common_meta_types::AppendEntriesRequest; use databend_common_meta_types::AppendEntriesResponse; use databend_common_meta_types::Endpoint; use databend_common_meta_types::Fatal; use databend_common_meta_types::GrpcConfig; use databend_common_meta_types::GrpcHelper; -use databend_common_meta_types::InstallSnapshotError; -use databend_common_meta_types::InstallSnapshotRequest; -use databend_common_meta_types::InstallSnapshotResponse; use databend_common_meta_types::MembershipNode; use databend_common_meta_types::NetworkError; use databend_common_meta_types::NodeId; @@ -52,16 +53,21 @@ use databend_common_meta_types::RaftError; use databend_common_meta_types::RemoteError; use databend_common_meta_types::Snapshot; use databend_common_meta_types::SnapshotResponse; +use databend_common_meta_types::StorageIOError; use databend_common_meta_types::StreamingError; use databend_common_meta_types::TypeConfig; use databend_common_meta_types::Vote; use databend_common_meta_types::VoteRequest; use databend_common_meta_types::VoteResponse; use databend_common_metrics::count::Count; +use futures::FutureExt; +use futures::TryStreamExt; use log::debug; +use log::error; use log::info; use log::warn; use openraft::RaftNetwork; +use tokio_stream::wrappers::ReceiverStream; use tonic::client::GrpcService; use tonic::transport::channel::Channel; @@ -146,7 +152,7 @@ impl Default for Backoff { } #[derive(Clone)] -pub struct Network { +pub struct NetworkFactory { sto: RaftStore, conn_pool: Arc>, @@ -154,10 +160,10 @@ pub struct Network { backoff: Backoff, } -impl Network { - pub fn new(sto: RaftStore) -> Network { +impl NetworkFactory { + pub fn new(sto: RaftStore) -> NetworkFactory { let mgr = ChannelManager {}; - Network { + NetworkFactory { sto, conn_pool: Arc::new(Pool::new(mgr, Duration::from_millis(50))), backoff: Backoff::default(), @@ -165,7 +171,7 @@ impl Network { } } -pub struct NetworkConnection { +pub struct Network { /// This node id id: NodeId, @@ -175,14 +181,6 @@ pub struct NetworkConnection { /// The node info to send message to. target_node: MembershipNode, - /// A counter to send snapshot via v0 API. - /// - /// v0 API should only be used during upgrading a meta cluster. - /// During this period, i.e., this counter is `>0`, - /// try to send via v0 if the remote is not upgraded. - /// When this counter reaches 0, start sending via v1 API. - install_snapshot_via_v0: u64, - sto: RaftStore, conn_pool: Arc>, @@ -190,7 +188,7 @@ pub struct NetworkConnection { backoff: Backoff, } -impl NetworkConnection { +impl Network { #[logcall::logcall(err = "debug")] #[minitrace::trace] pub async fn make_client(&self) -> Result<(RaftClient, Endpoint), Unreachable> { @@ -239,6 +237,11 @@ impl NetworkConnection { RPCError::RemoteError(remote_err) } + /// Wrap an error with [`RemoteError`], when building return value for an RPC method. + pub(crate) fn to_remote_err(&self, e: E) -> RemoteError { + RemoteError::new_with_node(self.target, self.target_node.clone(), e) + } + /// Create a new RaftRequest for AppendEntriesRequest, /// if it is too large, return `PayloadTooLarge` error /// to tell Openraft to split it in to smaller chunks. @@ -296,15 +299,8 @@ impl NetworkConnection { policy.chain(zero) } - /// Convert gRPC status to `RPCError` - fn status_to_unreachable( - &self, - status: tonic::Status, - endpoint: Endpoint, - ) -> RPCError> - where - E: std::error::Error, - { + /// Convert gRPC status to `Unreachable` + fn status_to_unreachable(&self, status: tonic::Status, endpoint: Endpoint) -> Unreachable { warn!( "target={}, endpoint={} gRPC error: {:?}", self.target, endpoint, status @@ -312,11 +308,54 @@ impl NetworkConnection { let any_err = AnyError::new(&status) .add_context(|| format!("gRPC target={}, endpoint={}", self.target, endpoint)); - RPCError::Unreachable(Unreachable::new(&any_err)) + Unreachable::new(&any_err) + } + + #[futures_async_stream::try_stream(boxed, ok = SnapshotChunkRequestV2, error = StreamingError)] + async fn snapshot_chunk_stream_v2( + vote: Vote, + snapshot: Snapshot, + cancel: impl Future + Send + 'static, + option: RPCOption, + ) { + let chunk_size = option.snapshot_chunk_size().unwrap_or(1024 * 1024); + + let snapshot_meta = snapshot.meta; + let mut snapshot_data = snapshot.snapshot; + + let mut c = std::pin::pin!(cancel); + + loop { + // If canceled, return at once + if let Some(err) = c.as_mut().now_or_never() { + return Err(err.into()); + } + + let mut buf = Vec::with_capacity(chunk_size); + while buf.capacity() > buf.len() { + let n = snapshot_data.read_buf(&mut buf).await.map_err(|e| { + let io_err = StorageIOError::read_snapshot(Some(snapshot_meta.signature()), &e); + StorageError::from(io_err) + })?; + if n == 0 { + break; + } + } + + if buf.is_empty() { + break; + } + + let chunk = SnapshotChunkRequestV2::new_chunk(buf); + yield chunk; + } + + let end = SnapshotChunkRequestV2::new_end_chunk(vote, snapshot_meta.clone()); + yield end; } } -impl RaftNetwork for NetworkConnection { +impl RaftNetwork for Network { #[logcall::logcall(err = "debug")] #[minitrace::trace] async fn append_entries( @@ -362,96 +401,67 @@ impl RaftNetwork for NetworkConnection { &mut self, vote: Vote, snapshot: Snapshot, - cancel: impl Future + Send, + cancel: impl Future + Send + 'static, option: RPCOption, ) -> Result> { - // This implementation just delegates to `Chunked::send_snapshot`, - // which depends on `Self::install_snapshot()` to send chunks. - use openraft::network::snapshot_transport::Chunked; - use openraft::network::snapshot_transport::SnapshotTransport; - - let resp = Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; - Ok(resp) - } - - #[logcall::logcall(err = "debug")] - #[minitrace::trace] - async fn install_snapshot( - &mut self, - rpc: InstallSnapshotRequest, - _option: RPCOption, - ) -> Result>> { - info!( - id = self.id, - target = self.target, - rpc = rpc.summary(); - "send_install_snapshot" - ); + info!(id = self.id, target = self.target; "{}", func_name!()); let _g = snapshot_send_inflight(self.target).counter_guard(); let (mut client, endpoint) = self.make_client().await?; - let bytes = rpc.data.len() as u64; - raft_metrics::network::incr_sendto_bytes(&self.target, bytes); + // Using strm of type `Pin>` result in a higher rank lifetime error + // See: + // - https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=8c382b5a6d932aaf81815f3825efd5ed + // - https://github.com/rust-lang/rust/issues/87425 + // + // Here we convert it to a concrete type `ReceiverStream` to avoid the error. + let mut res_strm = Self::snapshot_chunk_stream_v2(vote, snapshot, cancel, option); - // Try send via `v1` API, if the remote peer does not provide `v1` API, - // revert to `v0` API. - let v1_res = if self.install_snapshot_via_v0 == 0 { - // Send via v1 API + let (tx, rx) = mpsc::channel(16); - let v1_req = SnapshotChunkRequest::new_v1(rpc.clone()); - let req = databend_common_tracing::inject_span_to_tonic_request(v1_req); - let res = client - .install_snapshot_v1(req) - .timed(observe_snapshot_send_spent(self.target)) - .await; + let strm = ReceiverStream::new(rx); - if is_unimplemented(&res) { - warn!( - "target={} does not support install_snapshot_v1 API, fallback to v0 API for next 10 times", - self.target - ); - self.install_snapshot_via_v0 = 10; - None - } else { - Some(res) - } - } else { - None - }; + let target = self.target; - let grpc_res = if let Some(v1_res) = v1_res { - v1_res - } else { - // Via v1 API is not tried or failed, - // Send via v0 API + let _forward_handle = runtime::spawn(async move { + while let Some(x) = res_strm.try_next().await? { + raft_metrics::network::incr_sendto_bytes(&target, x.chunk.len() as u64); - self.install_snapshot_via_v0 -= 1; + let send_res = tx.send(x).await; - let req = databend_common_tracing::inject_span_to_tonic_request(rpc.clone()); - client - .install_snapshot(req) + if let Err(e) = send_res { + error!("{} error sending to snapshot stream: {}", func_name!(), e); + } + } + + Ok::<_, StreamingError>(()) + }); + + let res: Result> = try { + let grpc_res = client + .install_snapshot_v2(strm) .timed(observe_snapshot_send_spent(self.target)) - .await - }; + .await; - info!( - "install_snapshot resp target={}: {:?}", - self.target, grpc_res - ); + info!( + "{} resp from: target={}: grpc_result: {:?}", + func_name!(), + self.target, + grpc_res, + ); - let resp = grpc_res.map_err(|e| { - self.report_metrics_snapshot(false); - self.status_to_unreachable(e, endpoint) - })?; + let grpc_response = grpc_res.map_err(|e| self.status_to_unreachable(e, endpoint))?; - let raft_res = GrpcHelper::parse_raft_reply(resp) - .map_err(|serde_err| new_net_err(&serde_err, || "parse install_snapshot reply"))?; + let remote_result: Result = + GrpcHelper::parse_raft_reply_generic(grpc_response.into_inner()) + .map_err(|serde_err| new_net_err(&serde_err, || "parse full_snapshot reply"))?; - self.report_metrics_snapshot(raft_res.is_ok()); + remote_result.map_err(|e| self.to_remote_err(e))? + }; - raft_res.map_err(|e| self.to_rpc_err(e)) + self.report_metrics_snapshot(res.is_ok()); + res } #[logcall::logcall(err = "debug")] @@ -491,11 +501,11 @@ impl RaftNetwork for NetworkConnection { } } -impl RaftNetworkFactory for Network { - type Network = NetworkConnection; +impl RaftNetworkFactory for NetworkFactory { + type Network = Network; async fn new_client( - self: &mut Network, + self: &mut NetworkFactory, target: NodeId, node: &MembershipNode, ) -> Self::Network { @@ -504,11 +514,10 @@ impl RaftNetworkFactory for Network { self.sto.id, target, node ); - NetworkConnection { + Network { id: self.sto.id, target, target_node: node.clone(), - install_snapshot_via_v0: 0, sto: self.sto.clone(), conn_pool: self.conn_pool.clone(), backoff: self.backoff.clone(), @@ -541,14 +550,3 @@ fn observe_snapshot_send_spent(target: NodeId) -> impl Fn(Duration, Duration) { fn snapshot_send_inflight(target: NodeId) -> impl FnMut(i64) { move |i: i64| raft_metrics::network::incr_snapshot_sendto_inflight(&target, i) } - -/// Return true if it IS an error and the error code is Unimplemented. -/// -/// Return false if it is NOT an error or the error code is NOT Unimplemented. -fn is_unimplemented(res: &Result) -> bool { - if let Err(status) = res { - status.code() == tonic::Code::Unimplemented - } else { - false - } -} diff --git a/src/meta/types/src/grpc_helper.rs b/src/meta/types/src/grpc_helper.rs index dcc998d7a94e8..7cbafad8e4923 100644 --- a/src/meta/types/src/grpc_helper.rs +++ b/src/meta/types/src/grpc_helper.rs @@ -137,6 +137,23 @@ impl GrpcHelper { } } + /// Parse RaftReply to `Result` + pub fn parse_raft_reply_generic( + reply: RaftReply, + ) -> Result, serde_json::Error> + where + T: serde::de::DeserializeOwned, + E: serde::Serialize + serde::de::DeserializeOwned, + { + if !reply.error.is_empty() { + let e: E = serde_json::from_str(&reply.error)?; + Ok(Err(e)) + } else { + let d: T = serde_json::from_str(&reply.data)?; + Ok(Ok(d)) + } + } + /// Parse tonic::Request and decode it into required type. pub fn parse_req(request: tonic::Request) -> Result where T: serde::de::DeserializeOwned {