From f178f16bb444b0808ce3aaf05f4ee56b94232b2c Mon Sep 17 00:00:00 2001 From: ben1009 Date: Fri, 15 Sep 2023 15:44:53 +0800 Subject: [PATCH 1/5] ci: upgrade action & use nextest instead of test Signed-off-by: ben1009 --- .github/workflows/ci.yml | 19 +++++++++++++------ Makefile | 11 +++++++---- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 20a6baed..4db94ed4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,6 +3,9 @@ on: push: branches: - master +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true name: CI @@ -11,14 +14,14 @@ jobs: name: check runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Install Protoc uses: arduino/setup-protoc@v1 with: version: '3.x' repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Rust Cache - uses: Swatinem/rust-cache@v1.4.0 + uses: Swatinem/rust-cache@v2 - name: make check run: make check - name: Catch unexpected changes in the generated code @@ -31,14 +34,16 @@ jobs: CARGO_INCREMENTAL: 0 runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Install Protoc uses: arduino/setup-protoc@v1 with: version: '3.x' repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Rust Cache - uses: Swatinem/rust-cache@v1.4.0 + uses: Swatinem/rust-cache@v2 + - name: Install latest nextest release + uses: taiki-e/install-action@nextest - name: unit test run: make unit-test @@ -48,14 +53,14 @@ jobs: CARGO_INCREMENTAL: 0 runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Install Protoc uses: arduino/setup-protoc@v1 with: version: '3.x' repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Rust Cache - uses: Swatinem/rust-cache@v1.4.0 + uses: Swatinem/rust-cache@v2 - name: install tiup run: curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh - name: start tiup playground @@ -68,5 +73,7 @@ jobs: [[ "$(curl -I http://127.0.0.1:2379/pd/api/v1/regions 2>/dev/null | head -n 1 | cut -d$' ' -f2)" -ne "405" ]] || break sleep 1 done + - name: Install latest nextest release + uses: taiki-e/install-action@nextest - name: integration test run: MULTI_REGION=1 make integration-test diff --git a/Makefile b/Makefile index b57685a5..0ccdb1a4 100644 --- a/Makefile +++ b/Makefile @@ -20,12 +20,15 @@ check: generate cargo clippy --all-targets --features "${ALL_FEATURES}" -- -D clippy::all unit-test: generate - cargo test --all --no-default-features + cargo nextest run --all --no-default-features integration-test: generate - cargo test txn_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture - cargo test raw_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture - cargo test misc_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture + cargo nextest run txn_ --all ${INTEGRATION_TEST_ARGS} +#-- --nocapture + cargo nextest run raw_ --all ${INTEGRATION_TEST_ARGS} +#-- --nocapture + cargo nextest run misc_ --all ${INTEGRATION_TEST_ARGS} +#-- --nocapture test: unit-test integration-test From ae69ee9123769906d51949f7451082c4627c2698 Mon Sep 17 00:00:00 2001 From: ben1009 Date: Fri, 15 Sep 2023 16:13:58 +0800 Subject: [PATCH 2/5] fix DCO check Signed-off-by: ben1009 From 878a39a74bfd04bd7da225020ac444ba6a730a94 Mon Sep 17 00:00:00 2001 From: ben1009 Date: Mon, 18 Sep 2023 11:36:10 +0800 Subject: [PATCH 3/5] back to use test in IT Signed-off-by: ben1009 --- .github/workflows/ci.yml | 3 +++ Makefile | 9 +++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4db94ed4..4a4b7a55 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,6 +7,9 @@ concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} cancel-in-progress: true +env: + CARGO_TERM_COLOR: always + name: CI jobs: diff --git a/Makefile b/Makefile index 0ccdb1a4..8014352d 100644 --- a/Makefile +++ b/Makefile @@ -23,12 +23,9 @@ unit-test: generate cargo nextest run --all --no-default-features integration-test: generate - cargo nextest run txn_ --all ${INTEGRATION_TEST_ARGS} -#-- --nocapture - cargo nextest run raw_ --all ${INTEGRATION_TEST_ARGS} -#-- --nocapture - cargo nextest run misc_ --all ${INTEGRATION_TEST_ARGS} -#-- --nocapture + cargo test txn_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture + cargo test raw_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture + cargo test misc_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture test: unit-test integration-test From 0a9457c6b5ad0fcf365d64a2eece8ee2cff1e616 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sat, 7 Oct 2023 16:46:52 +0800 Subject: [PATCH 4/5] transaction: Support unsafe_destroy_range interface (#420) * add unsafe_destroy_range Signed-off-by: Ping Yu * polish Signed-off-by: Ping Yu * polish Signed-off-by: Ping Yu * fix compile error on lower version of rust Signed-off-by: Ping Yu --------- Signed-off-by: Ping Yu Signed-off-by: ben1009 --- src/backoff.rs | 1 + src/mock.rs | 6 ++- src/pd/client.rs | 14 ++++- src/pd/retry.rs | 1 - src/region_cache.rs | 64 +++++++++++++++++++++++ src/request/mod.rs | 8 ++- src/request/plan.rs | 101 ++++++++++++++++++++++++++++++++++-- src/request/plan_builder.rs | 24 ++++++++- src/store/errors.rs | 2 + src/store/mod.rs | 5 ++ src/store/request.rs | 5 ++ src/transaction/client.rs | 20 ++++++- src/transaction/lowering.rs | 5 ++ src/transaction/requests.rs | 34 +++++++++++- tests/integration_tests.rs | 60 +++++++++++++++++++++ 15 files changed, 338 insertions(+), 12 deletions(-) diff --git a/src/backoff.rs b/src/backoff.rs index 266e8cc2..61f19573 100644 --- a/src/backoff.rs +++ b/src/backoff.rs @@ -8,6 +8,7 @@ use rand::thread_rng; use rand::Rng; pub const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); +pub const DEFAULT_STORE_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 1000, 10); pub const OPTIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); pub const PESSIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); diff --git a/src/mock.rs b/src/mock.rs index 3b0b157f..f9c94aef 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -19,10 +19,10 @@ use crate::proto::metapb::{self}; use crate::region::RegionId; use crate::region::RegionWithLeader; use crate::request::codec::ApiV1TxnCodec; -use crate::store::KvClient; use crate::store::KvConnect; use crate::store::RegionStore; use crate::store::Request; +use crate::store::{KvClient, Store}; use crate::Config; use crate::Error; use crate::Key; @@ -206,6 +206,10 @@ impl PdClient for MockPdClient { } } + async fn all_stores(&self) -> Result> { + Ok(vec![Store::new(Arc::new(self.client.clone()))]) + } + async fn get_timestamp(self: Arc) -> Result { Ok(Timestamp::default()) } diff --git a/src/pd/client.rs b/src/pd/client.rs index f291c62c..5461cb57 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -21,10 +21,10 @@ use crate::region::RegionVerId; use crate::region::RegionWithLeader; use crate::region_cache::RegionCache; use crate::request::codec::{ApiV1TxnCodec, Codec}; -use crate::store::KvClient; use crate::store::KvConnect; use crate::store::RegionStore; use crate::store::TikvConnect; +use crate::store::{KvClient, Store}; use crate::BoundRange; use crate::Config; use crate::Key; @@ -78,6 +78,8 @@ pub trait PdClient: Send + Sync + 'static { self.map_region_to_store(region).await } + async fn all_stores(&self) -> Result>; + fn group_keys_by_region( self: Arc, keys: impl Iterator + Send + Sync + 'static, @@ -255,6 +257,16 @@ impl PdClient for PdRpcClien Self::decode_region(region, self.enable_mvcc_codec) } + async fn all_stores(&self) -> Result> { + let pb_stores = self.region_cache.read_through_all_stores().await?; + let mut stores = Vec::with_capacity(pb_stores.len()); + for store in pb_stores { + let client = self.kv_client(&store.address).await?; + stores.push(Store::new(Arc::new(client))); + } + Ok(stores) + } + async fn get_timestamp(self: Arc) -> Result { self.pd.clone().get_timestamp().await } diff --git a/src/pd/retry.rs b/src/pd/retry.rs index 548fc269..0e65e13b 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -160,7 +160,6 @@ impl RetryClientTrait for RetryClient { }) } - #[allow(dead_code)] async fn get_all_stores(self: Arc) -> Result> { retry!(self, "get_all_stores", |cluster| async { cluster diff --git a/src/region_cache.rs b/src/region_cache.rs index 6e8fe0ab..1746d5bf 100644 --- a/src/region_cache.rs +++ b/src/region_cache.rs @@ -232,6 +232,39 @@ impl RegionCache { cache.key_to_ver_id.remove(&start_key); } } + + pub async fn read_through_all_stores(&self) -> Result> { + let stores = self + .inner_client + .clone() + .get_all_stores() + .await? + .into_iter() + .filter(is_valid_tikv_store) + .collect::>(); + for store in &stores { + self.store_cache + .write() + .await + .insert(store.id, store.clone()); + } + Ok(stores) + } +} + +const ENGINE_LABEL_KEY: &str = "engine"; +const ENGINE_LABEL_TIFLASH: &str = "tiflash"; +const ENGINE_LABEL_TIFLASH_COMPUTE: &str = "tiflash_compute"; + +fn is_valid_tikv_store(store: &metapb::Store) -> bool { + if metapb::StoreState::from_i32(store.state).unwrap() == metapb::StoreState::Tombstone { + return false; + } + let is_tiflash = store.labels.iter().any(|label| { + label.key == ENGINE_LABEL_KEY + && (label.value == ENGINE_LABEL_TIFLASH || label.value == ENGINE_LABEL_TIFLASH_COMPUTE) + }); + !is_tiflash } #[cfg(test)] @@ -253,6 +286,7 @@ mod test { use crate::proto::metapb::{self}; use crate::region::RegionId; use crate::region::RegionWithLeader; + use crate::region_cache::is_valid_tikv_store; use crate::Key; use crate::Result; @@ -512,4 +546,34 @@ mod test { region } + + #[test] + fn test_is_valid_tikv_store() { + let mut store = metapb::Store::default(); + assert!(is_valid_tikv_store(&store)); + + store.state = metapb::StoreState::Tombstone.into(); + assert!(!is_valid_tikv_store(&store)); + + store.state = metapb::StoreState::Up.into(); + assert!(is_valid_tikv_store(&store)); + + store.labels.push(metapb::StoreLabel { + key: "some_key".to_owned(), + value: "some_value".to_owned(), + }); + assert!(is_valid_tikv_store(&store)); + + store.labels.push(metapb::StoreLabel { + key: "engine".to_owned(), + value: "tiflash".to_owned(), + }); + assert!(!is_valid_tikv_store(&store)); + + store.labels[1].value = "tiflash_compute".to_owned(); + assert!(!is_valid_tikv_store(&store)); + + store.labels[1].value = "other".to_owned(); + assert!(is_valid_tikv_store(&store)); + } } diff --git a/src/request/mod.rs b/src/request/mod.rs index 429adea5..aecaf26d 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -28,8 +28,8 @@ use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::backoff::OPTIMISTIC_BACKOFF; use crate::backoff::PESSIMISTIC_BACKOFF; -use crate::store::HasKeyErrors; use crate::store::Request; +use crate::store::{HasKeyErrors, Store}; use crate::transaction::HasLocks; pub mod codec; @@ -47,6 +47,12 @@ pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static { // TODO: fn decode_response() } +/// For requests or plans which are handled at TiKV store (other than region) level. +pub trait StoreRequest { + /// Apply the request to specified TiKV store. + fn apply_store(&mut self, store: &Store); +} + #[derive(Clone, Debug, new, Eq, PartialEq)] pub struct RetryOptions { /// How to retry when there is a region error and we need to resolve regions with PD. diff --git a/src/request/plan.rs b/src/request/plan.rs index 1808078a..ab72e8aa 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -18,15 +18,15 @@ use crate::proto::errorpb; use crate::proto::errorpb::EpochNotMatch; use crate::proto::kvrpcpb; use crate::request::shard::HasNextBatch; -use crate::request::KvRequest; use crate::request::NextBatch; use crate::request::Shardable; +use crate::request::{KvRequest, StoreRequest}; use crate::stats::tikv_stats; -use crate::store::HasKeyErrors; use crate::store::HasRegionError; use crate::store::HasRegionErrors; use crate::store::KvClient; use crate::store::RegionStore; +use crate::store::{HasKeyErrors, Store}; use crate::transaction::resolve_locks; use crate::transaction::HasLocks; use crate::transaction::ResolveLocksContext; @@ -73,7 +73,19 @@ impl Plan for Dispatch { } } +impl StoreRequest for Dispatch { + fn apply_store(&mut self, store: &Store) { + self.kv_client = Some(store.client.clone()); + self.request.apply_store(store); + } +} + const MULTI_REGION_CONCURRENCY: usize = 16; +const MULTI_STORES_CONCURRENCY: usize = 16; + +fn is_grpc_error(e: &Error) -> bool { + matches!(e, Error::GrpcAPI(_) | Error::Grpc(_)) +} pub struct RetryableMultiRegion { pub(super) inner: P, @@ -150,7 +162,6 @@ where let res = plan.execute().await; drop(permit); - let is_grpc_error = |e: &Error| matches!(e, Error::GrpcAPI(_) | Error::Grpc(_)); let mut resp = match res { Ok(resp) => resp, Err(e) if is_grpc_error(&e) => { @@ -354,6 +365,90 @@ where } } +pub struct RetryableAllStores { + pub(super) inner: P, + pub pd_client: Arc, + pub backoff: Backoff, +} + +impl Clone for RetryableAllStores { + fn clone(&self) -> Self { + RetryableAllStores { + inner: self.inner.clone(), + pd_client: self.pd_client.clone(), + backoff: self.backoff.clone(), + } + } +} + +// About `HasRegionError`: +// Store requests should be return region errors. +// But as the response of only store request by now (UnsafeDestroyRangeResponse) has the `region_error` field, +// we require `HasRegionError` to check whether there is region error returned from TiKV. +#[async_trait] +impl Plan for RetryableAllStores +where + P::Result: HasKeyErrors + HasRegionError, +{ + type Result = Vec>; + + async fn execute(&self) -> Result { + let concurrency_permits = Arc::new(Semaphore::new(MULTI_STORES_CONCURRENCY)); + let stores = self.pd_client.clone().all_stores().await?; + let mut handles = Vec::with_capacity(stores.len()); + for store in stores { + let mut clone = self.inner.clone(); + clone.apply_store(&store); + let handle = tokio::spawn(Self::single_store_handler( + clone, + self.backoff.clone(), + concurrency_permits.clone(), + )); + handles.push(handle); + } + let results = try_join_all(handles).await?; + Ok(results.into_iter().collect::>()) + } +} + +impl RetryableAllStores +where + P::Result: HasKeyErrors + HasRegionError, +{ + async fn single_store_handler( + plan: P, + mut backoff: Backoff, + permits: Arc, + ) -> Result { + loop { + let permit = permits.acquire().await.unwrap(); + let res = plan.execute().await; + drop(permit); + + match res { + Ok(mut resp) => { + if let Some(e) = resp.key_errors() { + return Err(Error::MultipleKeyErrors(e)); + } else if let Some(e) = resp.region_error() { + // Store request should not return region error. + return Err(Error::RegionError(Box::new(e))); + } else { + return Ok(resp); + } + } + Err(e) if is_grpc_error(&e) => match backoff.next_delay_duration() { + Some(duration) => { + sleep(duration).await; + continue; + } + None => return Err(e), + }, + Err(e) => return Err(e), + } + } + } +} + /// A technique for merging responses into a single result (with type `Out`). pub trait Merge: Sized + Clone + Send + Sync + 'static { type Out: Send; diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 88a5e9ad..8e2329e7 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -7,9 +7,8 @@ use super::plan::PreserveShard; use crate::backoff::Backoff; use crate::pd::PdClient; use crate::request::codec::EncodedRequest; -use crate::request::plan::CleanupLocks; +use crate::request::plan::{CleanupLocks, RetryableAllStores}; use crate::request::shard::HasNextBatch; -use crate::request::DefaultProcessor; use crate::request::Dispatch; use crate::request::ExtractError; use crate::request::KvRequest; @@ -22,6 +21,7 @@ use crate::request::ProcessResponse; use crate::request::ResolveLock; use crate::request::RetryableMultiRegion; use crate::request::Shardable; +use crate::request::{DefaultProcessor, StoreRequest}; use crate::store::HasKeyErrors; use crate::store::HasRegionError; use crate::store::HasRegionErrors; @@ -194,6 +194,26 @@ impl PlanBuilder, NoTarget> { } } +impl PlanBuilder +where + P::Result: HasKeyErrors + HasRegionError, +{ + pub fn all_stores( + self, + backoff: Backoff, + ) -> PlanBuilder, Targetted> { + PlanBuilder { + pd_client: self.pd_client.clone(), + plan: RetryableAllStores { + inner: self.plan, + pd_client: self.pd_client, + backoff, + }, + phantom: PhantomData, + } + } +} + impl PlanBuilder where P::Result: HasKeyErrors, diff --git a/src/store/errors.rs b/src/store/errors.rs index e40b9f42..c9d6c774 100644 --- a/src/store/errors.rs +++ b/src/store/errors.rs @@ -52,6 +52,7 @@ has_region_error!(kvrpcpb::CheckTxnStatusResponse); has_region_error!(kvrpcpb::CheckSecondaryLocksResponse); has_region_error!(kvrpcpb::DeleteRangeResponse); has_region_error!(kvrpcpb::GcResponse); +has_region_error!(kvrpcpb::UnsafeDestroyRangeResponse); has_region_error!(kvrpcpb::RawGetResponse); has_region_error!(kvrpcpb::RawBatchGetResponse); has_region_error!(kvrpcpb::RawPutResponse); @@ -111,6 +112,7 @@ has_str_error!(kvrpcpb::RawCasResponse); has_str_error!(kvrpcpb::RawCoprocessorResponse); has_str_error!(kvrpcpb::ImportResponse); has_str_error!(kvrpcpb::DeleteRangeResponse); +has_str_error!(kvrpcpb::UnsafeDestroyRangeResponse); impl HasKeyErrors for kvrpcpb::ScanResponse { fn key_errors(&mut self) -> Option> { diff --git a/src/store/mod.rs b/src/store/mod.rs index 0989611b..a244a1bc 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -33,6 +33,11 @@ pub struct RegionStore { pub client: Arc, } +#[derive(new, Clone)] +pub struct Store { + pub client: Arc, +} + #[async_trait] pub trait KvConnectStore: KvConnect { async fn connect_to_store( diff --git a/src/store/request.rs b/src/store/request.rs index 2f1a31a0..ec7e08a4 100644 --- a/src/store/request.rs +++ b/src/store/request.rs @@ -116,3 +116,8 @@ impl_request!( ); impl_request!(GcRequest, kv_gc, "kv_gc"); impl_request!(DeleteRangeRequest, kv_delete_range, "kv_delete_range"); +impl_request!( + UnsafeDestroyRangeRequest, + unsafe_destroy_range, + "unsafe_destroy_range" +); diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 610ea065..4bcb16d9 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use log::debug; use log::info; -use crate::backoff::DEFAULT_REGION_BACKOFF; +use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; @@ -16,6 +16,7 @@ use crate::request::Plan; use crate::timestamp::TimestampExt; use crate::transaction::lock::ResolveLocksOptions; use crate::transaction::lowering::new_scan_lock_request; +use crate::transaction::lowering::new_unsafe_destroy_range_request; use crate::transaction::ResolveLocksContext; use crate::transaction::Snapshot; use crate::transaction::Transaction; @@ -307,6 +308,23 @@ impl Client { plan.execute().await } + /// Cleans up all keys in a range and quickly reclaim disk space. + /// + /// The range can span over multiple regions. + /// + /// Note that the request will directly delete data from RocksDB, and all MVCC will be erased. + /// + /// This interface is intended for special scenarios that resemble operations like "drop table" or "drop database" in TiDB. + pub async fn unsafe_destroy_range(&self, range: impl Into) -> Result<()> { + let req = new_unsafe_destroy_range_request(range.into()); + let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); + let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) + .all_stores(DEFAULT_STORE_BACKOFF) + .merge(crate::request::Collect) + .plan(); + plan.execute().await + } + fn new_transaction( &self, timestamp: Timestamp, diff --git a/src/transaction/lowering.rs b/src/transaction/lowering.rs index c6a931cf..f2364e93 100644 --- a/src/transaction/lowering.rs +++ b/src/transaction/lowering.rs @@ -184,3 +184,8 @@ pub fn new_heart_beat_request( ) -> kvrpcpb::TxnHeartBeatRequest { requests::new_heart_beat_request(start_ts.version(), primary_lock.into(), ttl) } + +pub fn new_unsafe_destroy_range_request(range: BoundRange) -> kvrpcpb::UnsafeDestroyRangeRequest { + let (start_key, end_key) = range.into_keys(); + requests::new_unsafe_destroy_range_request(start_key.into(), end_key.unwrap_or_default().into()) +} diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index ca08d46c..5e5a0514 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -19,7 +19,6 @@ use crate::proto::kvrpcpb::TxnHeartBeatResponse; use crate::proto::kvrpcpb::TxnInfo; use crate::proto::kvrpcpb::{self}; use crate::proto::pdpb::Timestamp; -use crate::request::Batchable; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::CollectWithShard; @@ -32,13 +31,14 @@ use crate::request::Process; use crate::request::ResponseWithShard; use crate::request::Shardable; use crate::request::SingleKey; +use crate::request::{Batchable, StoreRequest}; use crate::shardable_key; use crate::shardable_keys; use crate::shardable_range; -use crate::store::store_stream_for_keys; use crate::store::store_stream_for_range; use crate::store::RegionStore; use crate::store::Request; +use crate::store::{store_stream_for_keys, Store}; use crate::timestamp::TimestampExt; use crate::transaction::HasLocks; use crate::util::iter::FlatMapOkIterExt; @@ -867,6 +867,36 @@ impl HasLocks for kvrpcpb::PrewriteResponse { } } +pub fn new_unsafe_destroy_range_request( + start_key: Vec, + end_key: Vec, +) -> kvrpcpb::UnsafeDestroyRangeRequest { + let mut req = kvrpcpb::UnsafeDestroyRangeRequest::default(); + req.start_key = start_key; + req.end_key = end_key; + req +} + +impl KvRequest for kvrpcpb::UnsafeDestroyRangeRequest { + type Response = kvrpcpb::UnsafeDestroyRangeResponse; +} + +impl StoreRequest for kvrpcpb::UnsafeDestroyRangeRequest { + fn apply_store(&mut self, _store: &Store) {} +} + +impl HasLocks for kvrpcpb::UnsafeDestroyRangeResponse {} + +impl Merge for Collect { + type Out = (); + + fn merge(&self, input: Vec>) -> Result { + let _: Vec = + input.into_iter().collect::>>()?; + Ok(()) + } +} + #[cfg(test)] #[cfg_attr(feature = "protobuf-codec", allow(clippy::useless_conversion))] mod tests { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 850f3554..71d48283 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1200,3 +1200,63 @@ async fn verify_mutate(is_pessimistic: bool) { Some(Value::from("v2".to_owned())).as_ref() ); } + +#[tokio::test] +#[serial] +async fn txn_unsafe_destroy_range() -> Result<()> { + init().await?; + let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + + const DATA_COUNT: usize = 10; + + { + let mut txn = client.begin_pessimistic().await.unwrap(); + for i in 0..DATA_COUNT { + let prefix = i % 2; + let idx = i / 2; + txn.put( + format!("prefix{}_key{}", prefix, idx).into_bytes(), + format!("value{}{}", prefix, idx).into_bytes(), + ) + .await + .unwrap(); + } + txn.commit().await.unwrap(); + + let mut snapshot = client.snapshot( + client.current_timestamp().await.unwrap(), + TransactionOptions::new_pessimistic(), + ); + let kvs = snapshot + .scan(b"prefix0".to_vec()..b"prefix2".to_vec(), 100) + .await + .unwrap() + .collect::>(); + assert_eq!(kvs.len(), DATA_COUNT); + } + + { + // destroy "prefix0" + client + .unsafe_destroy_range(b"prefix0".to_vec()..b"prefix1".to_vec()) + .await + .unwrap(); + + let mut snapshot = client.snapshot( + client.current_timestamp().await.unwrap(), + TransactionOptions::new_pessimistic(), + ); + let kvs = snapshot + .scan(b"prefix0".to_vec()..b"prefix2".to_vec(), 100) + .await + .unwrap() + .collect::>(); + assert_eq!(kvs.len(), DATA_COUNT / 2); + for (i, kv) in kvs.into_iter().enumerate() { + assert_eq!(kv.key(), &Key::from(format!("prefix1_key{}", i))); + assert_eq!(kv.value(), &format!("value1{}", i).into_bytes()); + } + } + + Ok(()) +} From b430a84438c0a15d15ab94f4fdd2a1669b7a96ba Mon Sep 17 00:00:00 2001 From: ben1009 Date: Sun, 8 Oct 2023 12:37:03 +0800 Subject: [PATCH 5/5] add install guid for nextest in readme Signed-off-by: ben1009 --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0bb8917d..6f41a99b 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,11 @@ We welcome your contributions! Contributing code is great, we also appreciate fi ## Building and testing -We use the standard Cargo workflows, e.g., `cargo build` to build and `cargo test` to run unit tests. You will need to use a nightly Rust toolchain to build and run tests. +We use the standard Cargo workflows, e.g., `cargo build` to build and `cargo test/nextest` to run unit tests. You will need to use a nightly Rust toolchain to build and run tests. Could use [nextest](https://nexte.st/index.html) to speed up ut, install nextest first. + +``` +cargo install cargo-nextest --locked +``` Running integration tests or manually testing the client with a TiKV cluster is a little bit more involved. The easiest way is to use [TiUp](https://github.com/pingcap/tiup) (>= 1.5) to initialise a cluster on your local machine: @@ -131,7 +135,7 @@ PD_ADDRS="127.0.0.1:2379" cargo test --package tikv-client --test integration_te We use a standard GitHub PR workflow. We run CI on every PR and require all PRs to build without warnings (including clippy and Rustfmt warnings), pass tests, have a DCO sign-off (use `-s` when you commit, the DCO bot will guide you through completing the DCO agreement for your first PR), and have at least one review. If any of this is difficult for you, don't worry about it and ask on the PR. -To run CI-like tests locally, we recommend you run `cargo clippy`, `cargo test`, and `cargo fmt` before submitting your PR. See above for running integration tests, but you probably won't need to worry about this for your first few PRs. +To run CI-like tests locally, we recommend you run `cargo clippy`, `cargo test/nextest run`, and `cargo fmt` before submitting your PR. See above for running integration tests, but you probably won't need to worry about this for your first few PRs. Please follow PingCAP's [Rust style guide](https://pingcap.github.io/style-guide/rust/). All code PRs should include new tests or test cases.