Skip to content

Commit

Permalink
added logs (#294)
Browse files Browse the repository at this point in the history
Signed-off-by: Shashwat Jaiswal <[email protected]>

Co-authored-by: Ziqian Qin <[email protected]>
  • Loading branch information
shashwatj07 and ekexium authored May 31, 2021
1 parent ad01f59 commit b7b8b8b
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 1 deletion.
11 changes: 11 additions & 0 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
request::{Collect, Plan},
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
};
use log::debug;
use std::{sync::Arc, u32};

const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
Expand Down Expand Up @@ -147,6 +148,7 @@ impl Client {
/// # });
/// ```
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking raw get request");
let request = new_raw_get_request(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
Expand Down Expand Up @@ -179,6 +181,7 @@ impl Client {
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
debug!("invoking raw batch_get request");
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
Expand Down Expand Up @@ -207,6 +210,7 @@ impl Client {
/// # });
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!("invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
Expand Down Expand Up @@ -239,6 +243,7 @@ impl Client {
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
debug!("invoking raw batch_put request");
let request = new_raw_batch_put_request(
pairs.into_iter().map(Into::into),
self.cf.clone(),
Expand Down Expand Up @@ -271,6 +276,7 @@ impl Client {
/// # });
/// ```
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
debug!("invoking raw delete request");
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
Expand Down Expand Up @@ -300,6 +306,7 @@ impl Client {
/// # });
/// ```
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
debug!("invoking raw batch_delete request");
self.assert_non_atomic()?;
let request =
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
Expand Down Expand Up @@ -328,6 +335,7 @@ impl Client {
/// # });
/// ```
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
debug!("invoking raw delete_range request");
self.assert_non_atomic()?;
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
Expand Down Expand Up @@ -359,6 +367,7 @@ impl Client {
/// # });
/// ```
pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
debug!("invoking raw scan request");
self.scan_inner(range.into(), limit, false).await
}

Expand All @@ -382,6 +391,7 @@ impl Client {
/// # });
/// ```
pub async fn scan_keys(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<Key>> {
debug!("invoking raw scan_keys request");
Ok(self
.scan_inner(range, limit, true)
.await?
Expand Down Expand Up @@ -418,6 +428,7 @@ impl Client {
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
) -> Result<Vec<KvPair>> {
debug!("invoking raw batch_scan request");
self.batch_scan_inner(ranges, each_limit, false).await
}

Expand Down
8 changes: 8 additions & 0 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
transaction::{Snapshot, Transaction, TransactionOptions},
Result,
};
use log::debug;
use std::{mem, sync::Arc};
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};

Expand Down Expand Up @@ -53,6 +54,7 @@ impl Client {
/// # });
/// ```
pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Client> {
debug!("creating transactional client");
Self::new_with_config(pd_endpoints, Config::default()).await
}

Expand All @@ -79,6 +81,7 @@ impl Client {
pd_endpoints: Vec<S>,
config: Config,
) -> Result<Client> {
debug!("creating transactional client with custom configuration");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, true).await?);
Ok(Client { pd })
Expand All @@ -105,6 +108,7 @@ impl Client {
/// # });
/// ```
pub async fn begin_optimistic(&self) -> Result<Transaction> {
debug!("creating new optimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
}
Expand All @@ -127,6 +131,7 @@ impl Client {
/// # });
/// ```
pub async fn begin_pessimistic(&self) -> Result<Transaction> {
debug!("creating new pessimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
}
Expand All @@ -149,12 +154,14 @@ impl Client {
/// # });
/// ```
pub async fn begin_with_options(&self, options: TransactionOptions) -> Result<Transaction> {
debug!("creating new customized transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, options))
}

/// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp).
pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot {
debug!("creating new snapshot");
Snapshot::new(self.new_transaction(timestamp, options.read_only()))
}

Expand Down Expand Up @@ -187,6 +194,7 @@ impl Client {
/// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview).
/// We skip the second step "delete ranges" which is an optimization for TiDB.
pub async fn gc(&self, safepoint: Timestamp) -> Result<bool> {
debug!("invoking transactional gc request");
// scan all locks with ts <= safepoint
let mut locks: Vec<kvrpcpb::LockInfo> = vec![];
let mut start_key = vec![];
Expand Down
6 changes: 5 additions & 1 deletion src/transaction/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
transaction::requests,
Error, Result,
};
use log::debug;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand All @@ -28,6 +29,7 @@ pub async fn resolve_locks(
locks: Vec<kvrpcpb::LockInfo>,
pd_client: Arc<impl PdClient>,
) -> Result<bool> {
debug!("resolving locks");
let ts = pd_client.clone().get_timestamp().await?;
let mut has_live_locks = false;
let expired_locks = locks.into_iter().filter(|lock| {
Expand Down Expand Up @@ -94,9 +96,11 @@ async fn resolve_lock_with_retry(
commit_version: u64,
pd_client: Arc<impl PdClient>,
) -> Result<RegionVerId> {
debug!("resolving locks with retry");
// FIXME: Add backoff
let mut error = None;
for _ in 0..RESOLVE_LOCK_RETRY_LIMIT {
for i in 0..RESOLVE_LOCK_RETRY_LIMIT {
debug!("resolving locks: attempt {}", (i + 1));
let store = pd_client.clone().store_for_key(key.into()).await?;
let ver_id = store.region.ver_id();
let request = requests::new_resolve_lock_request(start_version, commit_version);
Expand Down
7 changes: 7 additions & 0 deletions src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::{BoundRange, Key, KvPair, Result, Transaction, Value};
use derive_new::new;
use futures::stream::BoxStream;
use log::debug;
use std::ops::RangeBounds;

/// A read-only transaction which reads at the given timestamp.
Expand All @@ -20,11 +21,13 @@ pub struct Snapshot {
impl Snapshot {
/// Get the value associated with the given key.
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking get request on snapshot");
self.transaction.get(key).await
}

/// Check whether the key exists.
pub async fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool> {
debug!("invoking key_exists request on snapshot");
self.transaction.key_exists(key).await
}

Expand All @@ -33,6 +36,7 @@ impl Snapshot {
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = KvPair>> {
debug!("invoking batch_get request on snapshot");
self.transaction.batch_get(keys).await
}

Expand All @@ -42,6 +46,7 @@ impl Snapshot {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
debug!("invoking scan request on snapshot");
self.transaction.scan(range, limit).await
}

Expand All @@ -51,12 +56,14 @@ impl Snapshot {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = Key>> {
debug!("invoking scan_keys request on snapshot");
self.transaction.scan_keys(range, limit).await
}

/// Unimplemented. Similar to scan, but in the reverse direction.
#[allow(dead_code)]
fn scan_reverse(&mut self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
debug!("invoking scan_reverse request on snapshot");
self.transaction.scan_reverse(range)
}
}
19 changes: 19 additions & 0 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
use derive_new::new;
use fail::fail_point;
use futures::{prelude::*, stream::BoxStream};
use log::debug;
use std::{iter, ops::RangeBounds, sync::Arc, time::Instant};
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
use tokio::{sync::RwLock, time::Duration};
Expand Down Expand Up @@ -104,6 +105,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking transactional get request");
self.check_allow_operation().await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
Expand Down Expand Up @@ -168,6 +170,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking transactional get_for_update request");
self.check_allow_operation().await?;
if !self.is_pessimistic() {
let key = key.into();
Expand Down Expand Up @@ -198,6 +201,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool> {
debug!("invoking transactional key_exists request");
let key = key.into();
Ok(self.scan_keys(key.clone()..=key, 1).await?.next().is_some())
}
Expand Down Expand Up @@ -234,6 +238,7 @@ impl<PdC: PdClient> Transaction<PdC> {
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = KvPair>> {
debug!("invoking transactional batch_get request");
self.check_allow_operation().await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
Expand Down Expand Up @@ -286,6 +291,7 @@ impl<PdC: PdClient> Transaction<PdC> {
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
debug!("invoking transactional batch_get_for_update request");
self.check_allow_operation().await?;
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
if !self.is_pessimistic() {
Expand Down Expand Up @@ -329,6 +335,7 @@ impl<PdC: PdClient> Transaction<PdC> {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
debug!("invoking transactional scan request");
self.scan_inner(range, limit, false).await
}

Expand Down Expand Up @@ -364,6 +371,7 @@ impl<PdC: PdClient> Transaction<PdC> {
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = Key>> {
debug!("invoking transactional scan_keys request");
Ok(self
.scan_inner(range, limit, true)
.await?
Expand All @@ -374,6 +382,7 @@ impl<PdC: PdClient> Transaction<PdC> {
///
/// Similar to [`scan`](Transaction::scan), but scans in the reverse direction.
pub(crate) fn scan_reverse(&self, _range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
debug!("invoking transactional scan_reverse request");
unimplemented!()
}

Expand All @@ -394,6 +403,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn put(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!("invoking transactional put request");
self.check_allow_operation().await?;
let key = key.into();
if self.is_pessimistic() {
Expand Down Expand Up @@ -424,6 +434,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!("invoking transactional insert request");
self.check_allow_operation().await?;
let key = key.into();
if self.buffer.get(&key).is_some() {
Expand Down Expand Up @@ -458,6 +469,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn delete(&mut self, key: impl Into<Key>) -> Result<()> {
debug!("invoking transactional delete request");
self.check_allow_operation().await?;
let key = key.into();
if self.is_pessimistic() {
Expand Down Expand Up @@ -495,6 +507,7 @@ impl<PdC: PdClient> Transaction<PdC> {
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<()> {
debug!("invoking transactional lock_keys request");
self.check_allow_operation().await?;
match self.options.kind {
TransactionKind::Optimistic => {
Expand Down Expand Up @@ -526,6 +539,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn commit(&mut self) -> Result<Option<Timestamp>> {
debug!("commiting transaction");
{
let mut status = self.status.write().await;
if !matches!(
Expand Down Expand Up @@ -582,6 +596,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// # });
/// ```
pub async fn rollback(&mut self) -> Result<()> {
debug!("rolling back transaction");
{
let status = self.status.read().await;
if !matches!(
Expand Down Expand Up @@ -625,6 +640,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// Returns the TTL set on the transaction's locks by TiKV.
#[doc(hidden)]
pub async fn send_heart_beat(&mut self) -> Result<u64> {
debug!("sending heart_beat");
self.check_allow_operation().await?;
let primary_key = match self.buffer.get_primary_key() {
Some(k) => k,
Expand Down Expand Up @@ -686,6 +702,7 @@ impl<PdC: PdClient> Transaction<PdC> {
keys: impl IntoIterator<Item = impl PessimisticLock>,
need_value: bool,
) -> Result<Vec<KvPair>> {
debug!("acquiring pessimistic lock");
assert!(
matches!(self.options.kind, TransactionKind::Pessimistic(_)),
"`pessimistic_lock` is only valid to use with pessimistic transactions"
Expand Down Expand Up @@ -752,6 +769,7 @@ impl<PdC: PdClient> Transaction<PdC> {
}

async fn start_auto_heartbeat(&mut self) {
debug!("starting auto_heartbeat");
if !self.options.heartbeat_option.is_auto_heartbeat() || self.is_heartbeat_started {
return;
}
Expand Down Expand Up @@ -810,6 +828,7 @@ impl<PdC: PdClient> Transaction<PdC> {

impl<PdC: PdClient> Drop for Transaction<PdC> {
fn drop(&mut self) {
debug!("dropping transaction");
if std::thread::panicking() {
return;
}
Expand Down
Loading

0 comments on commit b7b8b8b

Please sign in to comment.