diff --git a/Makefile b/Makefile index 8014352d..aef0ad45 100644 --- a/Makefile +++ b/Makefile @@ -2,8 +2,8 @@ export RUSTFLAGS=-Dwarnings .PHONY: default check unit-test integration-tests test doc docker-pd docker-kv docker all -PD_ADDRS ?= "127.0.0.1:2379" -MULTI_REGION ?= 1 +export PD_ADDRS ?= 127.0.0.1:2379 +export MULTI_REGION ?= 1 ALL_FEATURES := integration-tests diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 23bfce73..0be733cf 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -16,6 +16,7 @@ use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::ApiVersion; use crate::proto::metapb; use crate::proto::tikvpb::tikv_client::TikvClient; +use crate::range_request; use crate::request::plan::ResponseWithShard; use crate::request::Collect; use crate::request::CollectSingle; @@ -23,6 +24,7 @@ use crate::request::DefaultProcessor; use crate::request::KvRequest; use crate::request::Merge; use crate::request::Process; +use crate::request::RangeRequest; use crate::request::Shardable; use crate::request::SingleKey; use crate::shardable_key; @@ -227,6 +229,7 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest { type Response = kvrpcpb::RawDeleteRangeResponse; } +range_request!(kvrpcpb::RawDeleteRangeRequest); shardable_range!(kvrpcpb::RawDeleteRangeRequest); pub fn new_raw_scan_request( @@ -250,6 +253,7 @@ impl KvRequest for kvrpcpb::RawScanRequest { type Response = kvrpcpb::RawScanResponse; } +range_request!(kvrpcpb::RawScanRequest); // TODO: support reverse raw scan. shardable_range!(kvrpcpb::RawScanRequest); impl Merge for Collect { diff --git a/src/request/mod.rs b/src/request/mod.rs index aecaf26d..8c3a45cb 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -23,6 +23,7 @@ pub use self::plan_builder::SingleKey; pub use self::shard::Batchable; pub use self::shard::HasNextBatch; pub use self::shard::NextBatch; +pub use self::shard::RangeRequest; pub use self::shard::Shardable; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; diff --git a/src/request/shard.rs b/src/request/shard.rs index 7c78743d..ec234239 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -12,6 +12,7 @@ use crate::request::KvRequest; use crate::request::Plan; use crate::request::ResolveLock; use crate::store::RegionStore; +use crate::store::Request; use crate::Result; macro_rules! impl_inner_shardable { @@ -204,6 +205,32 @@ macro_rules! shardable_keys { }; } +pub trait RangeRequest: Request { + fn is_reverse(&self) -> bool { + false + } +} + +#[doc(hidden)] +#[macro_export] +macro_rules! range_request { + ($type_: ty) => { + impl RangeRequest for $type_ {} + }; +} + +#[doc(hidden)] +#[macro_export] +macro_rules! reversible_range_request { + ($type_: ty) => { + impl RangeRequest for $type_ { + fn is_reverse(&self) -> bool { + self.reverse + } + } + }; +} + #[doc(hidden)] #[macro_export] macro_rules! shardable_range { @@ -215,8 +242,13 @@ macro_rules! shardable_range { &self, pd_client: &Arc, ) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::store::RegionStore)>> { - let start_key = self.start_key.clone().into(); - let end_key = self.end_key.clone().into(); + let mut start_key = self.start_key.clone().into(); + let mut end_key = self.end_key.clone().into(); + // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. + // Therefore, before fetching the regions from PD, it is necessary to swap the values of start_key and end_key. + if self.is_reverse() { + std::mem::swap(&mut start_key, &mut end_key); + } $crate::store::store_stream_for_range((start_key, end_key), pd_client.clone()) } @@ -227,8 +259,13 @@ macro_rules! shardable_range { ) -> $crate::Result<()> { self.set_context(store.region_with_leader.context()?); + // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. + // As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request. self.start_key = shard.0.into(); self.end_key = shard.1.into(); + if self.is_reverse() { + std::mem::swap(&mut self.start_key, &mut self.end_key); + } Ok(()) } } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 6d3c0999..4f3e1b93 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -28,10 +28,12 @@ use crate::request::KvRequest; use crate::request::Merge; use crate::request::NextBatch; use crate::request::Process; +use crate::request::RangeRequest; use crate::request::ResponseWithShard; use crate::request::Shardable; use crate::request::SingleKey; use crate::request::{Batchable, StoreRequest}; +use crate::reversible_range_request; use crate::shardable_key; use crate::shardable_keys; use crate::shardable_range; @@ -170,6 +172,7 @@ impl KvRequest for kvrpcpb::ScanRequest { type Response = kvrpcpb::ScanResponse; } +reversible_range_request!(kvrpcpb::ScanRequest); shardable_range!(kvrpcpb::ScanRequest); impl Merge for Collect { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 71d48283..82442c4b 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1028,27 +1028,108 @@ async fn txn_scan_reverse() -> Result<()> { init().await?; let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; - let k1 = b"a1".to_vec(); - let k2 = b"a2".to_vec(); - let v1 = b"b1".to_vec(); - let v2 = b"b2".to_vec(); - - let reverse_resp = vec![ - (Key::from(k2.clone()), v2.clone()), - (Key::from(k1.clone()), v1.clone()), - ]; + let k1 = b"k1".to_vec(); + let k2 = b"k2".to_vec(); + let k3 = b"k3".to_vec(); + + let v1 = b"v1".to_vec(); + let v2 = b"v2".to_vec(); + let v3 = b"v3".to_vec(); // Pessimistic option is not stable in this case. Use optimistic options instead. let option = TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn); let mut t = client.begin_with_options(option.clone()).await?; - t.put(k1.clone(), v1).await?; - t.put(k2.clone(), v2).await?; + t.put(k1.clone(), v1.clone()).await?; + t.put(k2.clone(), v2.clone()).await?; + t.put(k3.clone(), v3.clone()).await?; + t.commit().await?; + + let mut t2 = client.begin_with_options(option).await?; + { + // For [k1, k3]: + let bound_range: BoundRange = (k1.clone()..=k3.clone()).into(); + let resp = t2 + .scan_reverse(bound_range, 3) + .await? + .map(|kv| (kv.0, kv.1)) + .collect::)>>(); + assert_eq!( + resp, + vec![ + (Key::from(k3.clone()), v3.clone()), + (Key::from(k2.clone()), v2.clone()), + (Key::from(k1.clone()), v1.clone()), + ] + ); + } + { + // For [k1, k3): + let bound_range: BoundRange = (k1.clone()..k3.clone()).into(); + let resp = t2 + .scan_reverse(bound_range, 3) + .await? + .map(|kv| (kv.0, kv.1)) + .collect::)>>(); + assert_eq!( + resp, + vec![ + (Key::from(k2.clone()), v2.clone()), + (Key::from(k1.clone()), v1), + ] + ); + } + { + // For (k1, k3): + let mut start_key = k1.clone(); + start_key.push(0); + let bound_range: BoundRange = (start_key..k3).into(); + let resp = t2 + .scan_reverse(bound_range, 3) + .await? + .map(|kv| (kv.0, kv.1)) + .collect::)>>(); + assert_eq!(resp, vec![(Key::from(k2), v2),]); + } + t2.commit().await?; + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn txn_scan_reverse_multi_regions() -> Result<()> { + init().await?; + let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + + // Keys in `keys` should locate in different regions. See `init()` for boundary of regions. + let keys: Vec = vec![ + 0x00000000_u32.to_be_bytes().to_vec(), + 0x40000000_u32.to_be_bytes().to_vec(), + 0x80000000_u32.to_be_bytes().to_vec(), + 0xC0000000_u32.to_be_bytes().to_vec(), + ] + .into_iter() + .map(Into::into) + .collect(); + let values: Vec> = (0..keys.len()) + .map(|i| format!("v{}", i).into_bytes()) + .collect(); + let bound_range: BoundRange = + (keys.first().unwrap().clone()..=keys.last().unwrap().clone()).into(); + + // Pessimistic option is not stable in this case. Use optimistic options instead. + let option = TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn); + let mut t = client.begin_with_options(option.clone()).await?; + let mut reverse_resp = Vec::with_capacity(keys.len()); + for (k, v) in keys.into_iter().zip(values.into_iter()).rev() { + t.put(k.clone(), v.clone()).await?; + reverse_resp.push((k, v)); + } t.commit().await?; let mut t2 = client.begin_with_options(option).await?; - let bound_range: BoundRange = (k1..=k2).into(); let resp = t2 - .scan_reverse(bound_range, 2) + .scan_reverse(bound_range, 100) .await? .map(|kv| (kv.0, kv.1)) .collect::)>>();