diff --git a/crates/curp/Cargo.toml b/crates/curp/Cargo.toml index a9e6a4d55..554e89f49 100644 --- a/crates/curp/Cargo.toml +++ b/crates/curp/Cargo.toml @@ -14,6 +14,7 @@ version = "0.1.0" [dependencies] async-stream = "0.3.4" async-trait = "0.1.80" +await-tree = "0.1.2" bincode = "1.3.3" bytes = "1.4.0" clippy-utilities = "0.2.0" diff --git a/crates/curp/src/rpc/connect.rs b/crates/curp/src/rpc/connect.rs index cc4a4d175..62eb736b1 100644 --- a/crates/curp/src/rpc/connect.rs +++ b/crates/curp/src/rpc/connect.rs @@ -8,6 +8,7 @@ use std::{ use async_stream::stream; use async_trait::async_trait; +use await_tree::InstrumentAwait; use bytes::BytesMut; use clippy_utilities::NumericCast; use engine::SnapshotApi; @@ -396,7 +397,11 @@ impl ConnectApi for Connect> { if let Some(token) = token { _ = req.metadata_mut().insert("token", token.parse()?); } - client.propose(req).await.map_err(Into::into) + client + .propose(req) + .instrument_await("client propose") + .await + .map_err(Into::into) } /// Send `ShutdownRequest` @@ -410,7 +415,11 @@ impl ConnectApi for Connect> { let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.shutdown(req).await.map_err(Into::into) + client + .shutdown(req) + .instrument_await("client shutdown") + .await + .map_err(Into::into) } /// Send `ProposeRequest` @@ -424,7 +433,11 @@ impl ConnectApi for Connect> { let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.propose_conf_change(req).await.map_err(Into::into) + client + .propose_conf_change(req) + .instrument_await("client propose conf change") + .await + .map_err(Into::into) } /// Send `PublishRequest` @@ -438,7 +451,11 @@ impl ConnectApi for Connect> { let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.publish(req).await.map_err(Into::into) + client + .publish(req) + .instrument_await("client publish") + .await + .map_err(Into::into) } /// Send `WaitSyncedRequest` @@ -452,7 +469,11 @@ impl ConnectApi for Connect> { let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.wait_synced(req).await.map_err(Into::into) + client + .wait_synced(req) + .instrument_await("client propose wait synced request") + .await + .map_err(Into::into) } /// Send `FetchClusterRequest` diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index a9933cbe3..ff083ca76 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -1,5 +1,6 @@ use std::{fmt::Debug, sync::Arc}; +use await_tree::InstrumentAwait; use engine::SnapshotAllocator; use tokio::sync::broadcast; #[cfg(not(madsim))] @@ -84,7 +85,10 @@ impl crate::rpc::Protocol for Rpc { ) -> Result, tonic::Status> { request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.propose(request.into_inner()).await?, + self.inner + .propose(request.into_inner()) + .instrument_await("curp_propose") + .await?, )) } @@ -95,7 +99,10 @@ impl crate::rpc::Protocol for Rpc { ) -> Result, tonic::Status> { request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.shutdown(request.into_inner()).await?, + self.inner + .shutdown(request.into_inner()) + .instrument_await("curp_shutdown") + .await?, )) } @@ -106,7 +113,10 @@ impl crate::rpc::Protocol for Rpc { ) -> Result, tonic::Status> { request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.propose_conf_change(request.into_inner()).await?, + self.inner + .propose_conf_change(request.into_inner()) + .instrument_await("curp_propose_conf_change") + .await?, )) } @@ -128,7 +138,10 @@ impl crate::rpc::Protocol for Rpc { ) -> Result, tonic::Status> { request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.wait_synced(request.into_inner()).await?, + self.inner + .wait_synced(request.into_inner()) + .instrument_await("curp_wait_synced") + .await?, )) } @@ -158,7 +171,10 @@ impl crate::rpc::Protocol for Rpc { request: tonic::Request, ) -> Result, tonic::Status> { Ok(tonic::Response::new( - self.inner.move_leader(request.into_inner()).await?, + self.inner + .move_leader(request.into_inner()) + .instrument_await("curp_move_leader") + .await?, )) } @@ -170,7 +186,10 @@ impl crate::rpc::Protocol for Rpc { ) -> Result, tonic::Status> { let req_stream = request.into_inner(); Ok(tonic::Response::new( - self.inner.lease_keep_alive(req_stream).await?, + self.inner + .lease_keep_alive(req_stream) + .instrument_await("lease_keep_alive") + .await?, )) } } @@ -193,7 +212,10 @@ impl crate::rpc::InnerProtocol for Rpc { request: tonic::Request, ) -> Result, tonic::Status> { Ok(tonic::Response::new( - self.inner.vote(request.into_inner()).await?, + self.inner + .vote(request.into_inner()) + .instrument_await("curp_vote") + .await?, )) } @@ -214,7 +236,10 @@ impl crate::rpc::InnerProtocol for Rpc { ) -> Result, tonic::Status> { let req_stream = request.into_inner(); Ok(tonic::Response::new( - self.inner.install_snapshot(req_stream).await?, + self.inner + .install_snapshot(req_stream) + .instrument_await("curp_install_snapshot") + .await?, )) } @@ -224,7 +249,10 @@ impl crate::rpc::InnerProtocol for Rpc { request: tonic::Request, ) -> Result, tonic::Status> { Ok(tonic::Response::new( - self.inner.try_become_leader_now(request.get_ref()).await?, + self.inner + .try_become_leader_now(request.get_ref()) + .instrument_await("curp_try_become_leader_now") + .await?, )) } } diff --git a/crates/xline/Cargo.toml b/crates/xline/Cargo.toml index 94429bd34..4fc6d17df 100644 --- a/crates/xline/Cargo.toml +++ b/crates/xline/Cargo.toml @@ -15,6 +15,7 @@ categories = ["KV"] anyhow = "1.0.82" async-stream = "0.3.5" async-trait = "0.1.80" +await-tree = "0.1.2" axum = "0.6.20" bytes = "1.4.0" clap = { version = "4", features = ["derive"] } diff --git a/crates/xline/src/server/kv_server.rs b/crates/xline/src/server/kv_server.rs index b9d51060a..f17af319f 100644 --- a/crates/xline/src/server/kv_server.rs +++ b/crates/xline/src/server/kv_server.rs @@ -6,6 +6,7 @@ use std::{ time::Duration, }; +use await_tree::InstrumentAwait; use curp::rpc::ReadState; use dashmap::DashMap; use event_listener::Event; @@ -216,7 +217,9 @@ where let request = RequestWrapper::from(request.into_inner()); let cmd = Command::new_with_auth_info(request.keys(), request, auth_info); if !is_serializable { - self.wait_read_state(&cmd).await?; + self.wait_read_state(&cmd) + .instrument_await("xline wait read state for range request") + .await?; // Double check whether the range request is compacted or not since the compaction request // may be executed during the process of `wait_read_state` which results in the result of // previous `check_range_request` outdated. @@ -249,6 +252,7 @@ where let is_fast_path = true; let (cmd_res, sync_res) = self .propose(request.into_inner(), auth_info, is_fast_path) + .instrument_await("xline propose put request") .await?; let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { @@ -278,6 +282,7 @@ where let is_fast_path = true; let (cmd_res, sync_res) = self .propose(request.into_inner(), auth_info, is_fast_path) + .instrument_await("xline propose delete range request") .await?; let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { @@ -315,7 +320,9 @@ where let request = RequestWrapper::from(request.into_inner()); let cmd = Command::new_with_auth_info(request.keys(), request, auth_info); if !is_serializable { - self.wait_read_state(&cmd).await?; + self.wait_read_state(&cmd) + .instrument_await("xline wait read state for txn request") + .await?; } self.do_serializable(&cmd)? } else { @@ -367,6 +374,7 @@ where let (cmd_res, _sync_res) = self.client.propose(&cmd, None, !physical).await??; let resp = cmd_res.into_inner(); if timeout(self.compact_timeout, compact_physical_fut) + .instrument_await("xline wait compact physical event") .await .is_err() {