Skip to content

Commit

Permalink
change: use install_snapshot_v2, remove v0 and v1
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jun 5, 2024
1 parent e980c27 commit eae6532
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 63 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus

# openraft for debugging
#openraft = { git = "https://github.com/drmingdrmer/openraft", branch = "release-0.9", features = [
openraft = { version = "0.9.9", features = [
openraft = { version = "0.9.10", features = [
"serde",
"tracing-log",
"generic-snapshot-data",
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(try_blocks)]
#![feature(coroutines)]
#![feature(lazy_cell)]
#![allow(clippy::uninlined_format_args)]
Expand Down
4 changes: 2 additions & 2 deletions src/meta/service/src/meta_service/meta_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ use crate::meta_service::forwarder::MetaForwarder;
use crate::meta_service::meta_leader::MetaLeader;
use crate::meta_service::RaftServiceImpl;
use crate::metrics::server_metrics;
use crate::network::Network;
use crate::network::NetworkFactory;
use crate::request_handling::Forwarder;
use crate::request_handling::Handler;
use crate::store::RaftStore;
Expand Down Expand Up @@ -205,7 +205,7 @@ impl MetaNodeBuilder {
.take()
.ok_or_else(|| MetaStartupError::InvalidConfig(String::from("sto is not set")))?;

let net = Network::new(sto.clone());
let net = NetworkFactory::new(sto.clone());

let log_store = sto.clone();
let sm_store = sto.clone();
Expand Down
177 changes: 121 additions & 56 deletions src/meta/service/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ use anyerror::AnyError;
use async_trait::async_trait;
use backon::BackoffBuilder;
use backon::ExponentialBuilder;
use databend_common_base::base::tokio::io::AsyncReadExt;
use databend_common_base::base::tokio::sync::mpsc;
use databend_common_base::containers::ItemManager;
use databend_common_base::containers::Pool;
use databend_common_base::future::TimingFutureExt;
use databend_common_base::runtime;
use databend_common_meta_sled_store::openraft;
use databend_common_meta_sled_store::openraft::error::PayloadTooLarge;
use databend_common_meta_sled_store::openraft::error::ReplicationClosed;
Expand All @@ -34,17 +37,15 @@ use databend_common_meta_sled_store::openraft::network::RPCOption;
use databend_common_meta_sled_store::openraft::MessageSummary;
use databend_common_meta_sled_store::openraft::RaftNetworkFactory;
use databend_common_meta_types::protobuf::RaftReply;
use databend_common_meta_sled_store::openraft::StorageError;
use databend_common_meta_types::protobuf::RaftRequest;
use databend_common_meta_types::protobuf::SnapshotChunkRequest;
use databend_common_meta_types::protobuf::SnapshotChunkRequestV2;
use databend_common_meta_types::AppendEntriesRequest;
use databend_common_meta_types::AppendEntriesResponse;
use databend_common_meta_types::Endpoint;
use databend_common_meta_types::Fatal;
use databend_common_meta_types::GrpcConfig;
use databend_common_meta_types::GrpcHelper;
use databend_common_meta_types::InstallSnapshotError;
use databend_common_meta_types::InstallSnapshotRequest;
use databend_common_meta_types::InstallSnapshotResponse;
use databend_common_meta_types::MembershipNode;
use databend_common_meta_types::NetworkError;
use databend_common_meta_types::NodeId;
Expand All @@ -53,16 +54,21 @@ use databend_common_meta_types::RaftError;
use databend_common_meta_types::RemoteError;
use databend_common_meta_types::Snapshot;
use databend_common_meta_types::SnapshotResponse;
use databend_common_meta_types::StorageIOError;
use databend_common_meta_types::StreamingError;
use databend_common_meta_types::TypeConfig;
use databend_common_meta_types::Vote;
use databend_common_meta_types::VoteRequest;
use databend_common_meta_types::VoteResponse;
use databend_common_metrics::count::Count;
use futures::FutureExt;
use futures::TryStreamExt;
use log::debug;
use log::error;
use log::info;
use log::warn;
use openraft::RaftNetwork;
use tokio_stream::wrappers::ReceiverStream;
use tonic::client::GrpcService;
use tonic::transport::channel::Channel;

Expand Down Expand Up @@ -147,26 +153,26 @@ impl Default for Backoff {
}

#[derive(Clone)]
pub struct Network {
pub struct NetworkFactory {
sto: RaftStore,

conn_pool: Arc<Pool<ChannelManager>>,

backoff: Backoff,
}

impl Network {
pub fn new(sto: RaftStore) -> Network {
impl NetworkFactory {
pub fn new(sto: RaftStore) -> NetworkFactory {
let mgr = ChannelManager {};
Network {
NetworkFactory {
sto,
conn_pool: Arc::new(Pool::new(mgr, Duration::from_millis(50))),
backoff: Backoff::default(),
}
}
}

pub struct NetworkConnection {
pub struct Network {
/// This node id
id: NodeId,

Expand All @@ -188,7 +194,7 @@ pub struct NetworkConnection {
backoff: Backoff,
}

impl NetworkConnection {
impl Network {
#[logcall::logcall(err = "debug")]
#[minitrace::trace]
pub async fn make_client(&mut self) -> Result<RaftClient, Unreachable> {
Expand Down Expand Up @@ -239,6 +245,11 @@ impl NetworkConnection {
RPCError::RemoteError(remote_err)
}

/// Wrap an error with [`RemoteError`], when building return value for an RPC method.
pub(crate) fn to_remote_err<E: Error>(&self, e: E) -> RemoteError<E> {
RemoteError::new_with_node(self.target, self.target_node.clone(), e)
}

/// Create a new RaftRequest for AppendEntriesRequest,
/// if it is too large, return `PayloadTooLarge` error
/// to tell Openraft to split it in to smaller chunks.
Expand Down Expand Up @@ -320,9 +331,8 @@ impl NetworkConnection {
raft_res.map_err(|e| self.to_rpc_err(e))
}

/// Convert gRPC status to `RPCError`
fn status_to_unreachable<E>(&self, status: tonic::Status) -> RPCError<RaftError<E>>
where E: std::error::Error {
/// Convert gRPC status to `Unreachable`
fn status_to_unreachable(&self, status: tonic::Status, endpoint: Endpoint) -> Unreachable {
warn!(
"target={}, endpoint={} gRPC error: {:?}",
self.target, self.endpoint, status
Expand All @@ -331,11 +341,55 @@ impl NetworkConnection {
let any_err = AnyError::new(&status)
.add_context(|| format!("gRPC target={}, endpoint={}", self.target, self.endpoint));

RPCError::Unreachable(Unreachable::new(&any_err))

Unreachable::new(&any_err)
}

#[futures_async_stream::try_stream(boxed, ok = SnapshotChunkRequestV2, error = StreamingError<Fatal>)]
async fn snapshot_chunk_stream_v2(
vote: Vote,
snapshot: Snapshot,
cancel: impl Future<Output = ReplicationClosed> + Send + 'static,
option: RPCOption,
) {
let chunk_size = option.snapshot_chunk_size().unwrap_or(1024 * 1024);

let snapshot_meta = snapshot.meta;
let mut snapshot_data = snapshot.snapshot;

let mut c = std::pin::pin!(cancel);

loop {
// If canceled, return at once
if let Some(err) = c.as_mut().now_or_never() {
return Err(err.into());
}

let mut buf = Vec::with_capacity(chunk_size);
while buf.capacity() > buf.len() {
let n = snapshot_data.read_buf(&mut buf).await.map_err(|e| {
let io_err = StorageIOError::read_snapshot(Some(snapshot_meta.signature()), &e);
StorageError::from(io_err)
})?;
if n == 0 {
break;
}
}

if buf.is_empty() {
break;
}

let chunk = SnapshotChunkRequestV2::new_chunk(buf);
yield chunk;
}

let end = SnapshotChunkRequestV2::new_end_chunk(vote, snapshot_meta.clone());
yield end;
}
}

impl RaftNetwork<TypeConfig> for NetworkConnection {
impl RaftNetwork<TypeConfig> for Network {
#[logcall::logcall(err = "debug")]
#[minitrace::trace]
async fn append_entries(
Expand Down Expand Up @@ -376,55 +430,66 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
&mut self,
vote: Vote,
snapshot: Snapshot,
cancel: impl Future<Output = ReplicationClosed> + Send,
cancel: impl Future<Output = ReplicationClosed> + Send + 'static,
option: RPCOption,
) -> Result<SnapshotResponse, StreamingError<Fatal>> {
// This implementation just delegates to `Chunked::send_snapshot`,
// which depends on `Self::install_snapshot()` to send chunks.
use openraft::network::snapshot_transport::Chunked;
use openraft::network::snapshot_transport::SnapshotTransport;

let resp = Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?;
Ok(resp)
}

#[logcall::logcall(err = "debug")]
#[minitrace::trace]
async fn install_snapshot(
&mut self,
rpc: InstallSnapshotRequest,
_option: RPCOption,
) -> Result<InstallSnapshotResponse, RPCError<RaftError<InstallSnapshotError>>> {
info!(
id = self.id,
target = self.target,
rpc = rpc.summary();
"send_install_snapshot"
);
info!(id = self.id, target = self.target; "{}", func_name!());

let _g = snapshot_send_inflight(self.target).counter_guard();

let mut client = self.make_client().await?;

let bytes = rpc.data.len() as u64;
raft_metrics::network::incr_sendto_bytes(&self.target, bytes);
// Using strm of type `Pin<Box<Stream + Send + 'static>>` result in a higher rank lifetime error
// See:
// - https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=8c382b5a6d932aaf81815f3825efd5ed
// - https://github.com/rust-lang/rust/issues/87425
//
// Here we convert it to a concrete type `ReceiverStream` to avoid the error.
let mut res_strm = Self::snapshot_chunk_stream_v2(vote, snapshot, cancel, option);

let v1_req = SnapshotChunkRequest::new_v1(rpc);
let req = databend_common_tracing::inject_span_to_tonic_request(v1_req);
let (tx, rx) = mpsc::channel(16);

let grpc_res = client
.install_snapshot_v1(req)
.timed(observe_snapshot_send_spent(self.target))
.await;
let strm = ReceiverStream::new(rx);

info!(
"install_snapshot resp target={}: {:?}",
self.target, grpc_res
);
let target = self.target;

let res = self.parse_grpc_resp(grpc_res);
self.report_metrics_snapshot(res.is_ok());
let _forward_handle = runtime::spawn(async move {
while let Some(x) = res_strm.try_next().await? {
raft_metrics::network::incr_sendto_bytes(&target, x.chunk.len() as u64);

let send_res = tx.send(x).await;

if let Err(e) = send_res {
error!("{} error sending to snapshot stream: {}", func_name!(), e);
}
}

Ok::<_, StreamingError<Fatal>>(())
});

let res: Result<SnapshotResponse, StreamingError<Fatal>> = try {
let grpc_res = client
.install_snapshot_v2(strm)
.timed(observe_snapshot_send_spent(self.target))
.await;

info!(
"{} resp from: target={}: grpc_result: {:?}",
func_name!(),
self.target,
grpc_res,
);

let grpc_response = grpc_res.map_err(|e| self.status_to_unreachable(e, endpoint))?;

let remote_result: Result<SnapshotResponse, Fatal> =
GrpcHelper::parse_raft_reply_generic(grpc_response.into_inner())
.map_err(|serde_err| new_net_err(&serde_err, || "parse full_snapshot reply"))?;

remote_result.map_err(|e| self.to_remote_err(e))?
};

self.report_metrics_snapshot(res.is_ok());
res
}

Expand Down Expand Up @@ -460,11 +525,11 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
}
}

impl RaftNetworkFactory<TypeConfig> for Network {
type Network = NetworkConnection;
impl RaftNetworkFactory<TypeConfig> for NetworkFactory {
type Network = Network;

async fn new_client(
self: &mut Network,
self: &mut NetworkFactory,
target: NodeId,
node: &MembershipNode,
) -> Self::Network {
Expand All @@ -473,7 +538,7 @@ impl RaftNetworkFactory<TypeConfig> for Network {
self.sto.id, target, node
);

NetworkConnection {
Network {
id: self.sto.id,
target,
target_node: node.clone(),
Expand Down
17 changes: 17 additions & 0 deletions src/meta/types/src/grpc_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,23 @@ impl GrpcHelper {
}
}

/// Parse RaftReply to `Result<T,E>`
pub fn parse_raft_reply_generic<T, E>(
reply: RaftReply,
) -> Result<Result<T, E>, serde_json::Error>
where
T: serde::de::DeserializeOwned,
E: serde::Serialize + serde::de::DeserializeOwned,
{
if !reply.error.is_empty() {
let e: E = serde_json::from_str(&reply.error)?;
Ok(Err(e))
} else {
let d: T = serde_json::from_str(&reply.data)?;
Ok(Ok(d))
}
}

/// Parse tonic::Request and decode it into required type.
pub fn parse_req<T>(request: tonic::Request<RaftRequest>) -> Result<T, tonic::Status>
where T: serde::de::DeserializeOwned {
Expand Down

0 comments on commit eae6532

Please sign in to comment.