From 16ae37650346e2586d0fd1c38243bf113e6b6f28 Mon Sep 17 00:00:00 2001 From: Harsh1s Date: Fri, 8 Mar 2024 04:10:08 +0530 Subject: [PATCH] feat: Instrument await using await-tree Signed-off-by: Harsh1s --- crates/curp/Cargo.toml | 1 + crates/curp/src/rpc/connect.rs | 31 ++++++++++++++++--- crates/curp/src/server/mod.rs | 46 ++++++++++++++++++++++------ crates/xline/Cargo.toml | 1 + crates/xline/src/server/kv_server.rs | 12 ++++++-- 5 files changed, 75 insertions(+), 16 deletions(-) diff --git a/crates/curp/Cargo.toml b/crates/curp/Cargo.toml index 070a70f9f..a380a894c 100644 --- a/crates/curp/Cargo.toml +++ b/crates/curp/Cargo.toml @@ -14,6 +14,7 @@ version = "0.1.0" [dependencies] async-stream = "0.3.4" async-trait = "0.1.53" +await-tree = "0.1.2" bincode = "1.3.3" bytes = "1.4.0" clippy-utilities = "0.2.0" diff --git a/crates/curp/src/rpc/connect.rs b/crates/curp/src/rpc/connect.rs index 82714558e..56db242a5 100644 --- a/crates/curp/src/rpc/connect.rs +++ b/crates/curp/src/rpc/connect.rs @@ -8,6 +8,7 @@ use std::{ use async_stream::stream; use async_trait::async_trait; +use await_tree::InstrumentAwait; use bytes::BytesMut; use clippy_utilities::NumericCast; use engine::SnapshotApi; @@ -396,7 +397,11 @@ impl ConnectApi for Connect> { if let Some(token) = token { _ = req.metadata_mut().insert("token", token.parse()?); } - client.propose(req).await.map_err(Into::into) + client + .propose(req) + .instrument_await("client propose") + .await + .map_err(Into::into) } /// Send `ShutdownRequest` @@ -410,7 +415,11 @@ impl ConnectApi for Connect> { let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.shutdown(req).await.map_err(Into::into) + client + .shutdown(req) + .instrument_await("client shutdown") + .await + .map_err(Into::into) } /// Send `ProposeRequest` @@ -424,7 +433,11 @@ impl ConnectApi for Connect> { let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.propose_conf_change(req).await.map_err(Into::into) + client + .propose_conf_change(req) + .instrument_await("client propose conf change") + .await + .map_err(Into::into) } /// Send `PublishRequest` @@ -438,7 +451,11 @@ impl ConnectApi for Connect> { let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.publish(req).await.map_err(Into::into) + client + .publish(req) + .instrument_await("client publish") + .await + .map_err(Into::into) } /// Send `WaitSyncedRequest` @@ -452,7 +469,11 @@ impl ConnectApi for Connect> { let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); - client.wait_synced(req).await.map_err(Into::into) + client + .wait_synced(req) + .instrument_await("client propose wait synced request") + .await + .map_err(Into::into) } /// Send `FetchClusterRequest` diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index 66470ceec..b54ad5506 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -1,5 +1,6 @@ use std::{fmt::Debug, sync::Arc}; +use await_tree::InstrumentAwait; use engine::SnapshotAllocator; use tokio::sync::broadcast; #[cfg(not(madsim))] @@ -81,7 +82,10 @@ impl crate::rpc::Protocol for Rpc { ) -> Result, tonic::Status> { request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.propose(request.into_inner()).await?, + self.inner + .propose(request.into_inner()) + .instrument_await("curp_propose") + .await?, )) } @@ -92,7 +96,10 @@ impl crate::rpc::Protocol for Rpc { ) -> Result, tonic::Status> { request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.shutdown(request.into_inner()).await?, + self.inner + .shutdown(request.into_inner()) + .instrument_await("curp_shutdown") + .await?, )) } @@ -103,7 +110,10 @@ impl crate::rpc::Protocol for Rpc { ) -> Result, tonic::Status> { request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.propose_conf_change(request.into_inner()).await?, + self.inner + .propose_conf_change(request.into_inner()) + .instrument_await("curp_propose_conf_change") + .await?, )) } @@ -125,7 +135,10 @@ impl crate::rpc::Protocol for Rpc { ) -> Result, tonic::Status> { request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.wait_synced(request.into_inner()).await?, + self.inner + .wait_synced(request.into_inner()) + .instrument_await("curp_wait_synced") + .await?, )) } @@ -155,7 +168,10 @@ impl crate::rpc::Protocol for Rpc { request: tonic::Request, ) -> Result, tonic::Status> { Ok(tonic::Response::new( - self.inner.move_leader(request.into_inner()).await?, + self.inner + .move_leader(request.into_inner()) + .instrument_await("curp_move_leader") + .await?, )) } @@ -167,7 +183,10 @@ impl crate::rpc::Protocol for Rpc { ) -> Result, tonic::Status> { let req_stream = request.into_inner(); Ok(tonic::Response::new( - self.inner.lease_keep_alive(req_stream).await?, + self.inner + .lease_keep_alive(req_stream) + .instrument_await("lease_keep_alive") + .await?, )) } } @@ -190,7 +209,10 @@ impl crate::rpc::InnerProtocol for Rpc { request: tonic::Request, ) -> Result, tonic::Status> { Ok(tonic::Response::new( - self.inner.vote(request.into_inner()).await?, + self.inner + .vote(request.into_inner()) + .instrument_await("curp_vote") + .await?, )) } @@ -211,7 +233,10 @@ impl crate::rpc::InnerProtocol for Rpc { ) -> Result, tonic::Status> { let req_stream = request.into_inner(); Ok(tonic::Response::new( - self.inner.install_snapshot(req_stream).await?, + self.inner + .install_snapshot(req_stream) + .instrument_await("curp_install_snapshot") + .await?, )) } @@ -221,7 +246,10 @@ impl crate::rpc::InnerProtocol for Rpc { request: tonic::Request, ) -> Result, tonic::Status> { Ok(tonic::Response::new( - self.inner.try_become_leader_now(request.get_ref()).await?, + self.inner + .try_become_leader_now(request.get_ref()) + .instrument_await("curp_try_become_leader_now") + .await?, )) } } diff --git a/crates/xline/Cargo.toml b/crates/xline/Cargo.toml index 9980e4405..dd21364bd 100644 --- a/crates/xline/Cargo.toml +++ b/crates/xline/Cargo.toml @@ -15,6 +15,7 @@ categories = ["KV"] anyhow = "1.0.57" async-stream = "0.3.5" async-trait = "0.1.53" +await-tree = "0.1.2" axum = "0.6.20" bytes = "1.4.0" clap = { version = "4", features = ["derive"] } diff --git a/crates/xline/src/server/kv_server.rs b/crates/xline/src/server/kv_server.rs index 2d1ea9881..44d794e88 100644 --- a/crates/xline/src/server/kv_server.rs +++ b/crates/xline/src/server/kv_server.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use await_tree::InstrumentAwait; use curp::rpc::ReadState; use dashmap::DashMap; use event_listener::Event; @@ -223,7 +224,9 @@ where let request = RequestWrapper::from(request.into_inner()); let cmd = Command::new_with_auth_info(request.keys(), request, auth_info); if !is_serializable { - self.wait_read_state(&cmd).await?; + self.wait_read_state(&cmd) + .instrument_await("xline wait read state for range request") + .await?; // Double check whether the range request is compacted or not since the compaction request // may be executed during the process of `wait_read_state` which results in the result of // previous `check_range_request` outdated. @@ -259,6 +262,7 @@ where let is_fast_path = true; let (cmd_res, sync_res) = self .propose(request.into_inner(), auth_info, is_fast_path) + .instrument_await("xline propose put request") .await?; let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { @@ -291,6 +295,7 @@ where let is_fast_path = true; let (cmd_res, sync_res) = self .propose(request.into_inner(), auth_info, is_fast_path) + .instrument_await("xline propose delete range request") .await?; let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { @@ -331,7 +336,9 @@ where let request = RequestWrapper::from(request.into_inner()); let cmd = Command::new_with_auth_info(request.keys(), request, auth_info); if !is_serializable { - self.wait_read_state(&cmd).await?; + self.wait_read_state(&cmd) + .instrument_await("xline wait read state for txn request") + .await?; } self.do_serializable(&cmd)? } else { @@ -386,6 +393,7 @@ where let (cmd_res, _sync_res) = self.client.propose(&cmd, None, !physical).await??; let resp = cmd_res.into_inner(); if timeout(self.compact_timeout, compact_physical_fut) + .instrument_await("xline wait compact physical event") .await .is_err() {