Skip to content

Commit

Permalink
more trace for tso
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Nov 4, 2023
1 parent a0bd83f commit 1d3d074
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/pd/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tonic::IntoRequest;
use tonic::Request;
use tracing::error;
use tracing::info;
use tracing::instrument;
use tracing::warn;

use super::timestamp::TimestampOracle;
Expand Down Expand Up @@ -103,6 +104,7 @@ impl Connection {
Connection { security_mgr }
}

#[instrument(name = "pd::Connection::connect_cluster", skip_all)]
pub async fn connect_cluster(
&self,
endpoints: &[String],
Expand All @@ -122,6 +124,7 @@ impl Connection {
}

// Re-establish connection with PD leader in asynchronous fashion.
#[instrument(name = "pd::Connection::reconnect", skip_all)]
pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> {
warn!("updating pd client");
let start = Instant::now();
Expand Down
15 changes: 14 additions & 1 deletion src/pd/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use std::time::Instant;
use async_trait::async_trait;
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::debug;
use tracing::info_span;
use tracing::instrument;

use crate::pd::Cluster;
use crate::pd::Connection;
Expand Down Expand Up @@ -74,14 +77,18 @@ macro_rules! retry_core {
($self: ident, $tag: literal, $call: expr) => {{
let stats = pd_stats($tag);
let mut last_err = Ok(());
for _ in 0..LEADER_CHANGE_RETRY {
for retry in 0..LEADER_CHANGE_RETRY {
let span = info_span!("RetryClient::retry", retry);
let _enter = span.enter();

let res = $call;

match stats.done(res) {
Ok(r) => return Ok(r),
Err(e) => last_err = Err(e),
}

debug!("retry on last_err: {:?}", last_err);
let mut reconnect_count = MAX_REQUEST_COUNT;
while let Err(e) = $self.reconnect(RECONNECT_INTERVAL_SEC).await {
reconnect_count -= 1;
Expand Down Expand Up @@ -142,6 +149,7 @@ impl RetryClient<Cluster> {
impl RetryClientTrait for RetryClient<Cluster> {
// These get_* functions will try multiple times to make a request, reconnecting as necessary.
// It does not know about encoding. Caller should take care of it.
#[instrument(name = "RetryClient::get_region", skip_all)]
async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<RegionWithLeader> {
retry_mut!(self, "get_region", |cluster| {
let key = key.clone();
Expand All @@ -156,6 +164,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[instrument(name = "RetryClient::get_region_by_id", skip(self))]
async fn get_region_by_id(self: Arc<Self>, region_id: RegionId) -> Result<RegionWithLeader> {
retry_mut!(self, "get_region_by_id", |cluster| async {
cluster
Expand All @@ -167,6 +176,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[instrument(name = "RetryClient::get_store", skip(self))]
async fn get_store(self: Arc<Self>, id: StoreId) -> Result<metapb::Store> {
retry_mut!(self, "get_store", |cluster| async {
cluster
Expand All @@ -176,6 +186,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[instrument(name = "RetryClient::get_all_stores", skip(self))]
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<metapb::Store>> {
retry_mut!(self, "get_all_stores", |cluster| async {
cluster
Expand All @@ -185,10 +196,12 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[instrument(name = "RetryClient::get_timestamp", skip(self))]
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
retry!(self, "get_timestamp", |cluster| cluster.get_timestamp())
}

#[instrument(name = "RetryClient::update_safepoint", skip(self))]
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool> {
retry_mut!(self, "update_gc_safepoint", |cluster| async {
cluster
Expand Down
19 changes: 19 additions & 0 deletions src/pd/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tokio::sync::Mutex;
use tonic::transport::Channel;
use tracing::debug;
use tracing::info;
use tracing::info_span;
use tracing::instrument;

use crate::internal_err;
Expand Down Expand Up @@ -76,6 +77,7 @@ impl TimestampOracle {
}
}

#[instrument(name = "TimestampOracle::run_tso", skip_all)]
async fn run_tso(
cluster_id: u64,
mut pd_client: PdClient<Channel>,
Expand All @@ -100,6 +102,10 @@ async fn run_tso(
let mut responses = pd_client.tso(request_stream).await?.into_inner();

while let Some(Ok(resp)) = responses.next().await {
let span = info_span!("handle_response");
let _enter = span.enter();
debug!("got response: {:?}", resp);

let mut pending_requests = pending_requests.lock().await;

// Wake up the sending future blocked by too many pending requests as we are consuming
Expand Down Expand Up @@ -132,6 +138,7 @@ struct TsoRequestStream {
impl Stream for TsoRequestStream {
type Item = TsoRequest;

#[instrument(name = "TsoRequestStream::poll_next", skip_all)]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();

Expand All @@ -155,6 +162,12 @@ impl Stream for TsoRequestStream {
}
}

debug!(
"got requests: len {}, pending_requests {}",
requests.len(),
pending_requests.len()
);

if !requests.is_empty() {
let req = TsoRequest {
header: Some(RequestHeader {
Expand All @@ -171,6 +184,12 @@ impl Stream for TsoRequestStream {
};
pending_requests.push_back(request_group);

debug!(
"sending request to PD: {:?}, pending_requests {}",
req,
pending_requests.len()
);

Poll::Ready(Some(req))
} else {
// Set the waker to the context, then the stream can be waked up after the pending queue
Expand Down

0 comments on commit 1d3d074

Please sign in to comment.