diff --git a/src/backoff.rs b/src/backoff.rs index 266e8cc2..61f19573 100644 --- a/src/backoff.rs +++ b/src/backoff.rs @@ -8,6 +8,7 @@ use rand::thread_rng; use rand::Rng; pub const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); +pub const DEFAULT_STORE_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 1000, 10); pub const OPTIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); pub const PESSIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); diff --git a/src/mock.rs b/src/mock.rs index 3b0b157f..f9c94aef 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -19,10 +19,10 @@ use crate::proto::metapb::{self}; use crate::region::RegionId; use crate::region::RegionWithLeader; use crate::request::codec::ApiV1TxnCodec; -use crate::store::KvClient; use crate::store::KvConnect; use crate::store::RegionStore; use crate::store::Request; +use crate::store::{KvClient, Store}; use crate::Config; use crate::Error; use crate::Key; @@ -206,6 +206,10 @@ impl PdClient for MockPdClient { } } + async fn all_stores(&self) -> Result> { + Ok(vec![Store::new(Arc::new(self.client.clone()))]) + } + async fn get_timestamp(self: Arc) -> Result { Ok(Timestamp::default()) } diff --git a/src/pd/client.rs b/src/pd/client.rs index f291c62c..5461cb57 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -21,10 +21,10 @@ use crate::region::RegionVerId; use crate::region::RegionWithLeader; use crate::region_cache::RegionCache; use crate::request::codec::{ApiV1TxnCodec, Codec}; -use crate::store::KvClient; use crate::store::KvConnect; use crate::store::RegionStore; use crate::store::TikvConnect; +use crate::store::{KvClient, Store}; use crate::BoundRange; use crate::Config; use crate::Key; @@ -78,6 +78,8 @@ pub trait PdClient: Send + Sync + 'static { self.map_region_to_store(region).await } + async fn all_stores(&self) -> Result>; + fn group_keys_by_region( self: Arc, keys: impl Iterator + Send + Sync + 'static, @@ -255,6 +257,16 @@ impl PdClient for PdRpcClien Self::decode_region(region, self.enable_mvcc_codec) } + async fn all_stores(&self) -> Result> { + let pb_stores = self.region_cache.read_through_all_stores().await?; + let mut stores = Vec::with_capacity(pb_stores.len()); + for store in pb_stores { + let client = self.kv_client(&store.address).await?; + stores.push(Store::new(Arc::new(client))); + } + Ok(stores) + } + async fn get_timestamp(self: Arc) -> Result { self.pd.clone().get_timestamp().await } diff --git a/src/pd/retry.rs b/src/pd/retry.rs index 548fc269..0e65e13b 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -160,7 +160,6 @@ impl RetryClientTrait for RetryClient { }) } - #[allow(dead_code)] async fn get_all_stores(self: Arc) -> Result> { retry!(self, "get_all_stores", |cluster| async { cluster diff --git a/src/region_cache.rs b/src/region_cache.rs index 6e8fe0ab..1746d5bf 100644 --- a/src/region_cache.rs +++ b/src/region_cache.rs @@ -232,6 +232,39 @@ impl RegionCache { cache.key_to_ver_id.remove(&start_key); } } + + pub async fn read_through_all_stores(&self) -> Result> { + let stores = self + .inner_client + .clone() + .get_all_stores() + .await? + .into_iter() + .filter(is_valid_tikv_store) + .collect::>(); + for store in &stores { + self.store_cache + .write() + .await + .insert(store.id, store.clone()); + } + Ok(stores) + } +} + +const ENGINE_LABEL_KEY: &str = "engine"; +const ENGINE_LABEL_TIFLASH: &str = "tiflash"; +const ENGINE_LABEL_TIFLASH_COMPUTE: &str = "tiflash_compute"; + +fn is_valid_tikv_store(store: &metapb::Store) -> bool { + if metapb::StoreState::from_i32(store.state).unwrap() == metapb::StoreState::Tombstone { + return false; + } + let is_tiflash = store.labels.iter().any(|label| { + label.key == ENGINE_LABEL_KEY + && (label.value == ENGINE_LABEL_TIFLASH || label.value == ENGINE_LABEL_TIFLASH_COMPUTE) + }); + !is_tiflash } #[cfg(test)] @@ -253,6 +286,7 @@ mod test { use crate::proto::metapb::{self}; use crate::region::RegionId; use crate::region::RegionWithLeader; + use crate::region_cache::is_valid_tikv_store; use crate::Key; use crate::Result; @@ -512,4 +546,34 @@ mod test { region } + + #[test] + fn test_is_valid_tikv_store() { + let mut store = metapb::Store::default(); + assert!(is_valid_tikv_store(&store)); + + store.state = metapb::StoreState::Tombstone.into(); + assert!(!is_valid_tikv_store(&store)); + + store.state = metapb::StoreState::Up.into(); + assert!(is_valid_tikv_store(&store)); + + store.labels.push(metapb::StoreLabel { + key: "some_key".to_owned(), + value: "some_value".to_owned(), + }); + assert!(is_valid_tikv_store(&store)); + + store.labels.push(metapb::StoreLabel { + key: "engine".to_owned(), + value: "tiflash".to_owned(), + }); + assert!(!is_valid_tikv_store(&store)); + + store.labels[1].value = "tiflash_compute".to_owned(); + assert!(!is_valid_tikv_store(&store)); + + store.labels[1].value = "other".to_owned(); + assert!(is_valid_tikv_store(&store)); + } } diff --git a/src/request/mod.rs b/src/request/mod.rs index 429adea5..aecaf26d 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -28,8 +28,8 @@ use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::backoff::OPTIMISTIC_BACKOFF; use crate::backoff::PESSIMISTIC_BACKOFF; -use crate::store::HasKeyErrors; use crate::store::Request; +use crate::store::{HasKeyErrors, Store}; use crate::transaction::HasLocks; pub mod codec; @@ -47,6 +47,12 @@ pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static { // TODO: fn decode_response() } +/// For requests or plans which are handled at TiKV store (other than region) level. +pub trait StoreRequest { + /// Apply the request to specified TiKV store. + fn apply_store(&mut self, store: &Store); +} + #[derive(Clone, Debug, new, Eq, PartialEq)] pub struct RetryOptions { /// How to retry when there is a region error and we need to resolve regions with PD. diff --git a/src/request/plan.rs b/src/request/plan.rs index 1808078a..ab72e8aa 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -18,15 +18,15 @@ use crate::proto::errorpb; use crate::proto::errorpb::EpochNotMatch; use crate::proto::kvrpcpb; use crate::request::shard::HasNextBatch; -use crate::request::KvRequest; use crate::request::NextBatch; use crate::request::Shardable; +use crate::request::{KvRequest, StoreRequest}; use crate::stats::tikv_stats; -use crate::store::HasKeyErrors; use crate::store::HasRegionError; use crate::store::HasRegionErrors; use crate::store::KvClient; use crate::store::RegionStore; +use crate::store::{HasKeyErrors, Store}; use crate::transaction::resolve_locks; use crate::transaction::HasLocks; use crate::transaction::ResolveLocksContext; @@ -73,7 +73,19 @@ impl Plan for Dispatch { } } +impl StoreRequest for Dispatch { + fn apply_store(&mut self, store: &Store) { + self.kv_client = Some(store.client.clone()); + self.request.apply_store(store); + } +} + const MULTI_REGION_CONCURRENCY: usize = 16; +const MULTI_STORES_CONCURRENCY: usize = 16; + +fn is_grpc_error(e: &Error) -> bool { + matches!(e, Error::GrpcAPI(_) | Error::Grpc(_)) +} pub struct RetryableMultiRegion { pub(super) inner: P, @@ -150,7 +162,6 @@ where let res = plan.execute().await; drop(permit); - let is_grpc_error = |e: &Error| matches!(e, Error::GrpcAPI(_) | Error::Grpc(_)); let mut resp = match res { Ok(resp) => resp, Err(e) if is_grpc_error(&e) => { @@ -354,6 +365,90 @@ where } } +pub struct RetryableAllStores { + pub(super) inner: P, + pub pd_client: Arc, + pub backoff: Backoff, +} + +impl Clone for RetryableAllStores { + fn clone(&self) -> Self { + RetryableAllStores { + inner: self.inner.clone(), + pd_client: self.pd_client.clone(), + backoff: self.backoff.clone(), + } + } +} + +// About `HasRegionError`: +// Store requests should be return region errors. +// But as the response of only store request by now (UnsafeDestroyRangeResponse) has the `region_error` field, +// we require `HasRegionError` to check whether there is region error returned from TiKV. +#[async_trait] +impl Plan for RetryableAllStores +where + P::Result: HasKeyErrors + HasRegionError, +{ + type Result = Vec>; + + async fn execute(&self) -> Result { + let concurrency_permits = Arc::new(Semaphore::new(MULTI_STORES_CONCURRENCY)); + let stores = self.pd_client.clone().all_stores().await?; + let mut handles = Vec::with_capacity(stores.len()); + for store in stores { + let mut clone = self.inner.clone(); + clone.apply_store(&store); + let handle = tokio::spawn(Self::single_store_handler( + clone, + self.backoff.clone(), + concurrency_permits.clone(), + )); + handles.push(handle); + } + let results = try_join_all(handles).await?; + Ok(results.into_iter().collect::>()) + } +} + +impl RetryableAllStores +where + P::Result: HasKeyErrors + HasRegionError, +{ + async fn single_store_handler( + plan: P, + mut backoff: Backoff, + permits: Arc, + ) -> Result { + loop { + let permit = permits.acquire().await.unwrap(); + let res = plan.execute().await; + drop(permit); + + match res { + Ok(mut resp) => { + if let Some(e) = resp.key_errors() { + return Err(Error::MultipleKeyErrors(e)); + } else if let Some(e) = resp.region_error() { + // Store request should not return region error. + return Err(Error::RegionError(Box::new(e))); + } else { + return Ok(resp); + } + } + Err(e) if is_grpc_error(&e) => match backoff.next_delay_duration() { + Some(duration) => { + sleep(duration).await; + continue; + } + None => return Err(e), + }, + Err(e) => return Err(e), + } + } + } +} + /// A technique for merging responses into a single result (with type `Out`). pub trait Merge: Sized + Clone + Send + Sync + 'static { type Out: Send; diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 88a5e9ad..8e2329e7 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -7,9 +7,8 @@ use super::plan::PreserveShard; use crate::backoff::Backoff; use crate::pd::PdClient; use crate::request::codec::EncodedRequest; -use crate::request::plan::CleanupLocks; +use crate::request::plan::{CleanupLocks, RetryableAllStores}; use crate::request::shard::HasNextBatch; -use crate::request::DefaultProcessor; use crate::request::Dispatch; use crate::request::ExtractError; use crate::request::KvRequest; @@ -22,6 +21,7 @@ use crate::request::ProcessResponse; use crate::request::ResolveLock; use crate::request::RetryableMultiRegion; use crate::request::Shardable; +use crate::request::{DefaultProcessor, StoreRequest}; use crate::store::HasKeyErrors; use crate::store::HasRegionError; use crate::store::HasRegionErrors; @@ -194,6 +194,26 @@ impl PlanBuilder, NoTarget> { } } +impl PlanBuilder +where + P::Result: HasKeyErrors + HasRegionError, +{ + pub fn all_stores( + self, + backoff: Backoff, + ) -> PlanBuilder, Targetted> { + PlanBuilder { + pd_client: self.pd_client.clone(), + plan: RetryableAllStores { + inner: self.plan, + pd_client: self.pd_client, + backoff, + }, + phantom: PhantomData, + } + } +} + impl PlanBuilder where P::Result: HasKeyErrors, diff --git a/src/store/errors.rs b/src/store/errors.rs index e40b9f42..c9d6c774 100644 --- a/src/store/errors.rs +++ b/src/store/errors.rs @@ -52,6 +52,7 @@ has_region_error!(kvrpcpb::CheckTxnStatusResponse); has_region_error!(kvrpcpb::CheckSecondaryLocksResponse); has_region_error!(kvrpcpb::DeleteRangeResponse); has_region_error!(kvrpcpb::GcResponse); +has_region_error!(kvrpcpb::UnsafeDestroyRangeResponse); has_region_error!(kvrpcpb::RawGetResponse); has_region_error!(kvrpcpb::RawBatchGetResponse); has_region_error!(kvrpcpb::RawPutResponse); @@ -111,6 +112,7 @@ has_str_error!(kvrpcpb::RawCasResponse); has_str_error!(kvrpcpb::RawCoprocessorResponse); has_str_error!(kvrpcpb::ImportResponse); has_str_error!(kvrpcpb::DeleteRangeResponse); +has_str_error!(kvrpcpb::UnsafeDestroyRangeResponse); impl HasKeyErrors for kvrpcpb::ScanResponse { fn key_errors(&mut self) -> Option> { diff --git a/src/store/mod.rs b/src/store/mod.rs index 0989611b..a244a1bc 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -33,6 +33,11 @@ pub struct RegionStore { pub client: Arc, } +#[derive(new, Clone)] +pub struct Store { + pub client: Arc, +} + #[async_trait] pub trait KvConnectStore: KvConnect { async fn connect_to_store( diff --git a/src/store/request.rs b/src/store/request.rs index 2f1a31a0..ec7e08a4 100644 --- a/src/store/request.rs +++ b/src/store/request.rs @@ -116,3 +116,8 @@ impl_request!( ); impl_request!(GcRequest, kv_gc, "kv_gc"); impl_request!(DeleteRangeRequest, kv_delete_range, "kv_delete_range"); +impl_request!( + UnsafeDestroyRangeRequest, + unsafe_destroy_range, + "unsafe_destroy_range" +); diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 610ea065..4bcb16d9 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use log::debug; use log::info; -use crate::backoff::DEFAULT_REGION_BACKOFF; +use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; @@ -16,6 +16,7 @@ use crate::request::Plan; use crate::timestamp::TimestampExt; use crate::transaction::lock::ResolveLocksOptions; use crate::transaction::lowering::new_scan_lock_request; +use crate::transaction::lowering::new_unsafe_destroy_range_request; use crate::transaction::ResolveLocksContext; use crate::transaction::Snapshot; use crate::transaction::Transaction; @@ -307,6 +308,23 @@ impl Client { plan.execute().await } + /// Cleans up all keys in a range and quickly reclaim disk space. + /// + /// The range can span over multiple regions. + /// + /// Note that the request will directly delete data from RocksDB, and all MVCC will be erased. + /// + /// This interface is intended for special scenarios that resemble operations like "drop table" or "drop database" in TiDB. + pub async fn unsafe_destroy_range(&self, range: impl Into) -> Result<()> { + let req = new_unsafe_destroy_range_request(range.into()); + let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); + let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) + .all_stores(DEFAULT_STORE_BACKOFF) + .merge(crate::request::Collect) + .plan(); + plan.execute().await + } + fn new_transaction( &self, timestamp: Timestamp, diff --git a/src/transaction/lowering.rs b/src/transaction/lowering.rs index c6a931cf..f2364e93 100644 --- a/src/transaction/lowering.rs +++ b/src/transaction/lowering.rs @@ -184,3 +184,8 @@ pub fn new_heart_beat_request( ) -> kvrpcpb::TxnHeartBeatRequest { requests::new_heart_beat_request(start_ts.version(), primary_lock.into(), ttl) } + +pub fn new_unsafe_destroy_range_request(range: BoundRange) -> kvrpcpb::UnsafeDestroyRangeRequest { + let (start_key, end_key) = range.into_keys(); + requests::new_unsafe_destroy_range_request(start_key.into(), end_key.unwrap_or_default().into()) +} diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index ca08d46c..5e5a0514 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -19,7 +19,6 @@ use crate::proto::kvrpcpb::TxnHeartBeatResponse; use crate::proto::kvrpcpb::TxnInfo; use crate::proto::kvrpcpb::{self}; use crate::proto::pdpb::Timestamp; -use crate::request::Batchable; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::CollectWithShard; @@ -32,13 +31,14 @@ use crate::request::Process; use crate::request::ResponseWithShard; use crate::request::Shardable; use crate::request::SingleKey; +use crate::request::{Batchable, StoreRequest}; use crate::shardable_key; use crate::shardable_keys; use crate::shardable_range; -use crate::store::store_stream_for_keys; use crate::store::store_stream_for_range; use crate::store::RegionStore; use crate::store::Request; +use crate::store::{store_stream_for_keys, Store}; use crate::timestamp::TimestampExt; use crate::transaction::HasLocks; use crate::util::iter::FlatMapOkIterExt; @@ -867,6 +867,36 @@ impl HasLocks for kvrpcpb::PrewriteResponse { } } +pub fn new_unsafe_destroy_range_request( + start_key: Vec, + end_key: Vec, +) -> kvrpcpb::UnsafeDestroyRangeRequest { + let mut req = kvrpcpb::UnsafeDestroyRangeRequest::default(); + req.start_key = start_key; + req.end_key = end_key; + req +} + +impl KvRequest for kvrpcpb::UnsafeDestroyRangeRequest { + type Response = kvrpcpb::UnsafeDestroyRangeResponse; +} + +impl StoreRequest for kvrpcpb::UnsafeDestroyRangeRequest { + fn apply_store(&mut self, _store: &Store) {} +} + +impl HasLocks for kvrpcpb::UnsafeDestroyRangeResponse {} + +impl Merge for Collect { + type Out = (); + + fn merge(&self, input: Vec>) -> Result { + let _: Vec = + input.into_iter().collect::>>()?; + Ok(()) + } +} + #[cfg(test)] #[cfg_attr(feature = "protobuf-codec", allow(clippy::useless_conversion))] mod tests { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 850f3554..71d48283 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1200,3 +1200,63 @@ async fn verify_mutate(is_pessimistic: bool) { Some(Value::from("v2".to_owned())).as_ref() ); } + +#[tokio::test] +#[serial] +async fn txn_unsafe_destroy_range() -> Result<()> { + init().await?; + let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + + const DATA_COUNT: usize = 10; + + { + let mut txn = client.begin_pessimistic().await.unwrap(); + for i in 0..DATA_COUNT { + let prefix = i % 2; + let idx = i / 2; + txn.put( + format!("prefix{}_key{}", prefix, idx).into_bytes(), + format!("value{}{}", prefix, idx).into_bytes(), + ) + .await + .unwrap(); + } + txn.commit().await.unwrap(); + + let mut snapshot = client.snapshot( + client.current_timestamp().await.unwrap(), + TransactionOptions::new_pessimistic(), + ); + let kvs = snapshot + .scan(b"prefix0".to_vec()..b"prefix2".to_vec(), 100) + .await + .unwrap() + .collect::>(); + assert_eq!(kvs.len(), DATA_COUNT); + } + + { + // destroy "prefix0" + client + .unsafe_destroy_range(b"prefix0".to_vec()..b"prefix1".to_vec()) + .await + .unwrap(); + + let mut snapshot = client.snapshot( + client.current_timestamp().await.unwrap(), + TransactionOptions::new_pessimistic(), + ); + let kvs = snapshot + .scan(b"prefix0".to_vec()..b"prefix2".to_vec(), 100) + .await + .unwrap() + .collect::>(); + assert_eq!(kvs.len(), DATA_COUNT / 2); + for (i, kv) in kvs.into_iter().enumerate() { + assert_eq!(kv.key(), &Key::from(format!("prefix1_key{}", i))); + assert_eq!(kv.value(), &format!("value1{}", i).into_bytes()); + } + } + + Ok(()) +}