diff --git a/src/pd/cluster.rs b/src/pd/cluster.rs index 668ec0b3..3961e108 100644 --- a/src/pd/cluster.rs +++ b/src/pd/cluster.rs @@ -11,6 +11,7 @@ use tonic::IntoRequest; use tonic::Request; use tracing::error; use tracing::info; +use tracing::instrument; use tracing::warn; use super::timestamp::TimestampOracle; @@ -103,6 +104,7 @@ impl Connection { Connection { security_mgr } } + #[instrument(name = "pd::Connection::connect_cluster", skip_all)] pub async fn connect_cluster( &self, endpoints: &[String], @@ -122,6 +124,7 @@ impl Connection { } // Re-establish connection with PD leader in asynchronous fashion. + #[instrument(name = "pd::Connection::reconnect", skip_all)] pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> { warn!("updating pd client"); let start = Instant::now(); diff --git a/src/pd/retry.rs b/src/pd/retry.rs index 2dd54487..5cf173c7 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -10,6 +10,9 @@ use std::time::Instant; use async_trait::async_trait; use tokio::sync::RwLock; use tokio::time::sleep; +use tracing::debug; +use tracing::info_span; +use tracing::instrument; use crate::pd::Cluster; use crate::pd::Connection; @@ -74,7 +77,10 @@ macro_rules! retry_core { ($self: ident, $tag: literal, $call: expr) => {{ let stats = pd_stats($tag); let mut last_err = Ok(()); - for _ in 0..LEADER_CHANGE_RETRY { + for retry in 0..LEADER_CHANGE_RETRY { + let span = info_span!("RetryClient::retry", retry); + let _enter = span.enter(); + let res = $call; match stats.done(res) { @@ -82,6 +88,7 @@ macro_rules! retry_core { Err(e) => last_err = Err(e), } + debug!("retry on last_err: {:?}", last_err); let mut reconnect_count = MAX_REQUEST_COUNT; while let Err(e) = $self.reconnect(RECONNECT_INTERVAL_SEC).await { reconnect_count -= 1; @@ -142,6 +149,7 @@ impl RetryClient { impl RetryClientTrait for RetryClient { // These get_* functions will try multiple times to make a request, reconnecting as necessary. // It does not know about encoding. Caller should take care of it. + #[instrument(name = "RetryClient::get_region", skip_all)] async fn get_region(self: Arc, key: Vec) -> Result { retry_mut!(self, "get_region", |cluster| { let key = key.clone(); @@ -156,6 +164,7 @@ impl RetryClientTrait for RetryClient { }) } + #[instrument(name = "RetryClient::get_region_by_id", skip(self))] async fn get_region_by_id(self: Arc, region_id: RegionId) -> Result { retry_mut!(self, "get_region_by_id", |cluster| async { cluster @@ -167,6 +176,7 @@ impl RetryClientTrait for RetryClient { }) } + #[instrument(name = "RetryClient::get_store", skip(self))] async fn get_store(self: Arc, id: StoreId) -> Result { retry_mut!(self, "get_store", |cluster| async { cluster @@ -176,6 +186,7 @@ impl RetryClientTrait for RetryClient { }) } + #[instrument(name = "RetryClient::get_all_stores", skip(self))] async fn get_all_stores(self: Arc) -> Result> { retry_mut!(self, "get_all_stores", |cluster| async { cluster @@ -185,10 +196,12 @@ impl RetryClientTrait for RetryClient { }) } + #[instrument(name = "RetryClient::get_timestamp", skip(self))] async fn get_timestamp(self: Arc) -> Result { retry!(self, "get_timestamp", |cluster| cluster.get_timestamp()) } + #[instrument(name = "RetryClient::update_safepoint", skip(self))] async fn update_safepoint(self: Arc, safepoint: u64) -> Result { retry_mut!(self, "update_gc_safepoint", |cluster| async { cluster diff --git a/src/pd/timestamp.rs b/src/pd/timestamp.rs index b7e25fea..5fe16a12 100644 --- a/src/pd/timestamp.rs +++ b/src/pd/timestamp.rs @@ -27,6 +27,7 @@ use tokio::sync::Mutex; use tonic::transport::Channel; use tracing::debug; use tracing::info; +use tracing::info_span; use tracing::instrument; use crate::internal_err; @@ -76,6 +77,7 @@ impl TimestampOracle { } } +#[instrument(name = "TimestampOracle::run_tso", skip_all)] async fn run_tso( cluster_id: u64, mut pd_client: PdClient, @@ -100,6 +102,10 @@ async fn run_tso( let mut responses = pd_client.tso(request_stream).await?.into_inner(); while let Some(Ok(resp)) = responses.next().await { + let span = info_span!("handle_response"); + let _enter = span.enter(); + debug!("got response: {:?}", resp); + let mut pending_requests = pending_requests.lock().await; // Wake up the sending future blocked by too many pending requests as we are consuming @@ -132,6 +138,7 @@ struct TsoRequestStream { impl Stream for TsoRequestStream { type Item = TsoRequest; + #[instrument(name = "TsoRequestStream::poll_next", skip_all)] fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); @@ -155,6 +162,12 @@ impl Stream for TsoRequestStream { } } + debug!( + "got requests: len {}, pending_requests {}", + requests.len(), + pending_requests.len() + ); + if !requests.is_empty() { let req = TsoRequest { header: Some(RequestHeader { @@ -171,6 +184,12 @@ impl Stream for TsoRequestStream { }; pending_requests.push_back(request_group); + debug!( + "sending request to PD: {:?}, pending_requests {}", + req, + pending_requests.len() + ); + Poll::Ready(Some(req)) } else { // Set the waker to the context, then the stream can be waked up after the pending queue