From c14f23a545cc5cce7a5a5a4e51442e39617ec63d Mon Sep 17 00:00:00 2001 From: Ziqian Qin Date: Wed, 21 Jul 2021 15:33:42 +0800 Subject: [PATCH] The minimal region cache (#291) --- .github/workflows/ci.yml | 4 +- Cargo.toml | 5 +- Makefile | 4 +- src/backoff.rs | 5 + src/lib.rs | 1 + src/mock.rs | 63 ++-- src/pd/client.rs | 121 ++++---- src/pd/mod.rs | 2 +- src/pd/retry.rs | 105 ++++--- src/raw/client.rs | 51 ++-- src/raw/requests.rs | 39 ++- src/region.rs | 8 +- src/region_cache.rs | 494 +++++++++++++++++++++++++++++++ src/request/mod.rs | 58 ++-- src/request/plan.rs | 326 ++++++++++++++------ src/request/plan_builder.rs | 52 ++-- src/request/shard.rs | 61 +++- src/store.rs | 24 +- src/transaction/buffer.rs | 5 +- src/transaction/client.rs | 14 +- src/transaction/lock.rs | 45 +-- src/transaction/requests.rs | 75 ++++- src/transaction/transaction.rs | 82 ++--- tests/common/mod.rs | 15 +- tests/integration_tests.rs | 9 +- tikv-client-common/Cargo.toml | 3 +- tikv-client-common/src/errors.rs | 20 +- tikv-client-store/src/errors.rs | 113 +++---- tikv-client-store/src/lib.rs | 2 +- 29 files changed, 1328 insertions(+), 478 deletions(-) create mode 100644 src/region_cache.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7e74dbc7..ea999eed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,6 +20,7 @@ jobs: - uses: actions-rs/cargo@v1 with: command: check + args: --all-targets --all-features fmt: name: rustfmt @@ -49,7 +50,7 @@ jobs: - uses: actions-rs/clippy-check@v1 with: token: ${{ secrets.GITHUB_TOKEN }} - args: --all-features + args: --all-targets --all-features -- -D clippy::all name: Clippy Output unit-test: name: unit test @@ -98,7 +99,6 @@ jobs: path: | ~/.cargo/.crates.toml ~/.cargo/.crates2.json - ~/.cargo/bin ~/.cargo/registry/index ~/.cargo/registry/cache key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('Cargo.lock') }} diff --git a/Cargo.toml b/Cargo.toml index 84790ace..52e95bd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,8 @@ serde_derive = "1.0" slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } slog-term = { version = "2.4" } thiserror = "1" -tokio = { version = "1.0", features = [ "sync", "time" ] } +tokio = { version = "1", features = [ "sync", "rt-multi-thread", "macros" ] } +async-recursion = "0.3" tikv-client-common = { version = "0.1.0", path = "tikv-client-common" } tikv-client-pd = { version = "0.1.0", path = "tikv-client-pd" } @@ -50,7 +51,7 @@ proptest = "1" proptest-derive = "0.3" serial_test = "0.5.0" simple_logger = "1" -tokio = { version = "1.0", features = [ "sync", "rt-multi-thread", "macros" ] } +tokio = { version = "1", features = [ "sync", "rt-multi-thread", "macros" ] } reqwest = {version = "0.11", default-features = false, features = ["native-tls-vendored"]} serde_json = "1" diff --git a/Makefile b/Makefile index 23a8db5f..d6c7dc3f 100644 --- a/Makefile +++ b/Makefile @@ -3,9 +3,9 @@ default: check check: - cargo check --all + cargo check --all --all-targets --all-features cargo fmt -- --check - cargo clippy -- -D clippy::all + cargo clippy --all-targets --all-features -- -D clippy::all unit-test: cargo test --all diff --git a/src/backoff.rs b/src/backoff.rs index 5b957a33..dc28c7bd 100644 --- a/src/backoff.rs +++ b/src/backoff.rs @@ -76,6 +76,11 @@ impl Backoff { self.kind == BackoffKind::None } + /// Returns the number of attempts + pub fn current_attempts(&self) -> u32 { + self.current_attempts + } + /// Don't wait. Usually indicates that we should not retry a request. pub const fn no_backoff() -> Backoff { Backoff { diff --git a/src/lib.rs b/src/lib.rs index d1c6480c..e94785b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -104,6 +104,7 @@ mod pd; #[doc(hidden)] pub mod raw; mod region; +mod region_cache; mod stats; mod store; mod timestamp; diff --git a/src/mock.rs b/src/mock.rs index 12707373..70f186da 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -7,12 +7,13 @@ use crate::{ pd::{PdClient, PdRpcClient, RetryClient}, - region::{Region, RegionId}, - store::Store, + region::{RegionId, RegionWithLeader}, + store::RegionStore, Config, Error, Key, Result, Timestamp, }; use async_trait::async_trait; use derive_new::new; +use slog::{Drain, Logger}; use std::{any::Any, sync::Arc}; use tikv_client_proto::metapb; use tikv_client_store::{KvClient, KvConnect, Request}; @@ -21,8 +22,16 @@ use tikv_client_store::{KvClient, KvConnect, Request}; /// client can be tested without doing any RPC calls. pub async fn pd_rpc_client() -> PdRpcClient { let config = Config::default(); + let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); + let logger = Logger::root( + slog_term::FullFormat::new(plain) + .build() + .filter_level(slog::Level::Info) + .fuse(), + o!(), + ); PdRpcClient::new( - &config, + config.clone(), |_, _| MockKvConnect, |e, sm| { futures::future::ok(RetryClient::new_with_cluster( @@ -33,11 +42,13 @@ pub async fn pd_rpc_client() -> PdRpcClient { )) }, false, + logger, ) .await .unwrap() } +#[allow(clippy::type_complexity)] #[derive(new, Default, Clone)] pub struct MockKvClient { pub addr: String, @@ -93,27 +104,31 @@ impl MockPdClient { } } - pub fn region1() -> Region { - let mut region = Region::default(); + pub fn region1() -> RegionWithLeader { + let mut region = RegionWithLeader::default(); region.region.id = 1; region.region.set_start_key(vec![0]); region.region.set_end_key(vec![10]); - let mut leader = metapb::Peer::default(); - leader.store_id = 41; + let leader = metapb::Peer { + store_id: 41, + ..Default::default() + }; region.leader = Some(leader); region } - pub fn region2() -> Region { - let mut region = Region::default(); + pub fn region2() -> RegionWithLeader { + let mut region = RegionWithLeader::default(); region.region.id = 2; region.region.set_start_key(vec![10]); region.region.set_end_key(vec![250, 250]); - let mut leader = metapb::Peer::default(); - leader.store_id = 42; + let leader = metapb::Peer { + store_id: 42, + ..Default::default() + }; region.leader = Some(leader); region @@ -124,11 +139,11 @@ impl MockPdClient { impl PdClient for MockPdClient { type KvClient = MockKvClient; - async fn map_region_to_store(self: Arc, region: Region) -> Result { - Ok(Store::new(region, Arc::new(self.client.clone()))) + async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result { + Ok(RegionStore::new(region, Arc::new(self.client.clone()))) } - async fn region_for_key(&self, key: &Key) -> Result { + async fn region_for_key(&self, key: &Key) -> Result { let bytes: &[_] = key.into(); let region = if bytes.is_empty() || bytes[0] < 10 { Self::region1() @@ -139,11 +154,11 @@ impl PdClient for MockPdClient { Ok(region) } - async fn region_for_id(&self, id: RegionId) -> Result { + async fn region_for_id(&self, id: RegionId) -> Result { match id { 1 => Ok(Self::region1()), 2 => Ok(Self::region2()), - _ => Err(Error::RegionNotFound { region_id: id }), + _ => Err(Error::RegionNotFoundInResponse { region_id: id }), } } @@ -154,11 +169,21 @@ impl PdClient for MockPdClient { async fn update_safepoint(self: Arc, _safepoint: u64) -> Result { unimplemented!() } + + async fn update_leader( + &self, + _ver_id: crate::region::RegionVerId, + _leader: metapb::Peer, + ) -> Result<()> { + todo!() + } + + async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {} } -pub fn mock_store() -> Store { - Store { - region: Region::default(), +pub fn mock_store() -> RegionStore { + RegionStore { + region_with_leader: RegionWithLeader::default(), client: Arc::new(MockKvClient::new("foo".to_owned(), None)), } } diff --git a/src/pd/client.rs b/src/pd/client.rs index 29746bd8..2d49ac12 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -3,23 +3,21 @@ use crate::{ compat::stream_fn, kv::codec, - pd::RetryClient, - region::{Region, RegionId}, - store::Store, + pd::{retry::RetryClientTrait, RetryClient}, + region::{RegionId, RegionVerId, RegionWithLeader}, + region_cache::RegionCache, + store::RegionStore, BoundRange, Config, Key, Result, SecurityManager, Timestamp, }; use async_trait::async_trait; use futures::{prelude::*, stream::BoxStream}; use grpcio::{EnvBuilder, Environment}; -use slog::{Drain, Logger}; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - thread, -}; +use slog::Logger; +use std::{collections::HashMap, sync::Arc, thread}; use tikv_client_pd::Cluster; -use tikv_client_proto::kvrpcpb; +use tikv_client_proto::{kvrpcpb, metapb}; use tikv_client_store::{KvClient, KvConnect, TikvConnect}; +use tokio::sync::RwLock; const CQ_COUNT: usize = 1; const CLIENT_PREFIX: &str = "tikv-client"; @@ -46,25 +44,25 @@ pub trait PdClient: Send + Sync + 'static { type KvClient: KvClient + Send + Sync + 'static; /// In transactional API, `region` is decoded (keys in raw format). - async fn map_region_to_store(self: Arc, region: Region) -> Result; + async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result; /// In transactional API, the key and returned region are both decoded (keys in raw format). - async fn region_for_key(&self, key: &Key) -> Result; + async fn region_for_key(&self, key: &Key) -> Result; /// In transactional API, the returned region is decoded (keys in raw format) - async fn region_for_id(&self, id: RegionId) -> Result; + async fn region_for_id(&self, id: RegionId) -> Result; async fn get_timestamp(self: Arc) -> Result; async fn update_safepoint(self: Arc, safepoint: u64) -> Result; /// In transactional API, `key` is in raw format - async fn store_for_key(self: Arc, key: &Key) -> Result { + async fn store_for_key(self: Arc, key: &Key) -> Result { let region = self.region_for_key(key).await?; self.map_region_to_store(region).await } - async fn store_for_id(self: Arc, id: RegionId) -> Result { + async fn store_for_id(self: Arc, id: RegionId) -> Result { let region = self.region_for_id(id).await?; self.map_region_to_store(region).await } @@ -101,7 +99,10 @@ pub trait PdClient: Send + Sync + 'static { } /// Returns a Stream which iterates over the contexts for each region covered by range. - fn stores_for_range(self: Arc, range: BoundRange) -> BoxStream<'static, Result> { + fn stores_for_range( + self: Arc, + range: BoundRange, + ) -> BoxStream<'static, Result> { let (start_key, end_key) = range.into_keys(); stream_fn(Some(start_key), move |start_key| { let end_key = end_key.clone(); @@ -192,13 +193,17 @@ pub trait PdClient: Send + Sync + 'static { .boxed() } - fn decode_region(mut region: Region, enable_codec: bool) -> Result { + fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result { if enable_codec { codec::decode_bytes_in_place(&mut region.region.mut_start_key(), false)?; codec::decode_bytes_in_place(&mut region.region.mut_end_key(), false)?; } Ok(region) } + + async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>; + + async fn invalidate_region_cache(&self, ver_id: RegionVerId); } /// This client converts requests for the logical TiKV cluster into requests @@ -208,6 +213,7 @@ pub struct PdRpcClient>>, enable_codec: bool, + region_cache: RegionCache>, logger: Logger, } @@ -215,26 +221,27 @@ pub struct PdRpcClient PdClient for PdRpcClient { type KvClient = KvC::KvClient; - async fn map_region_to_store(self: Arc, region: Region) -> Result { + async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result { let store_id = region.get_store_id()?; - let store = self.pd.clone().get_store(store_id).await?; - let kv_client = self.kv_client(store.get_address())?; - Ok(Store::new(region, Arc::new(kv_client))) + let store = self.region_cache.get_store_by_id(store_id).await?; + let kv_client = self.kv_client(store.get_address()).await?; + Ok(RegionStore::new(region, Arc::new(kv_client))) } - async fn region_for_key(&self, key: &Key) -> Result { + async fn region_for_key(&self, key: &Key) -> Result { let enable_codec = self.enable_codec; let key = if enable_codec { - key.to_encoded().into() + key.to_encoded() } else { - key.clone().into() + key.clone() }; - let region = self.pd.clone().get_region(key).await?; + + let region = self.region_cache.get_region_by_key(&key).await?; Self::decode_region(region, enable_codec) } - async fn region_for_id(&self, id: RegionId) -> Result { - let region = self.pd.clone().get_region_by_id(id).await?; + async fn region_for_id(&self, id: RegionId) -> Result { + let region = self.region_cache.get_region_by_id(id).await?; Self::decode_region(region, self.enable_codec) } @@ -245,21 +252,31 @@ impl PdClient for PdRpcClient { async fn update_safepoint(self: Arc, safepoint: u64) -> Result { self.pd.clone().update_safepoint(safepoint).await } + + async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()> { + self.region_cache.update_leader(ver_id, leader).await + } + + async fn invalidate_region_cache(&self, ver_id: RegionVerId) { + self.region_cache.invalidate_region_cache(ver_id).await + } } impl PdRpcClient { pub async fn connect( pd_endpoints: &[String], - config: &Config, + config: Config, enable_codec: bool, + logger: Logger, ) -> Result { PdRpcClient::new( - config, + config.clone(), |env, security_mgr| TikvConnect::new(env, security_mgr, config.timeout), |env, security_mgr| { RetryClient::connect(env, pd_endpoints, security_mgr, config.timeout) }, enable_codec, + logger, ) .await } @@ -276,19 +293,17 @@ fn thread_name(prefix: &str) -> String { impl PdRpcClient { pub async fn new( - config: &Config, + config: Config, kv_connect: MakeKvC, pd: MakePd, enable_codec: bool, + logger: Logger, ) -> Result> where PdFut: Future>>, MakeKvC: FnOnce(Arc, Arc) -> KvC, MakePd: FnOnce(Arc, Arc) -> PdFut, { - let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); - let logger = Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!()); - info!(logger, "Logging ready!"); let env = Arc::new( EnvBuilder::new() .cq_count(CQ_COUNT) @@ -308,26 +323,30 @@ impl PdRpcClient { let pd = Arc::new(pd(env.clone(), security_mgr.clone()).await?); let kv_client_cache = Default::default(); Ok(PdRpcClient { - pd, + pd: pd.clone(), kv_client_cache, kv_connect: kv_connect(env, security_mgr), enable_codec, + region_cache: RegionCache::new(pd), logger, }) } - fn kv_client(&self, address: &str) -> Result { - if let Some(client) = self.kv_client_cache.read().unwrap().get(address) { + async fn kv_client(&self, address: &str) -> Result { + if let Some(client) = self.kv_client_cache.read().await.get(address) { return Ok(client.clone()); }; info!(self.logger, "connect to tikv endpoint: {:?}", address); - self.kv_connect.connect(address).map(|client| { - self.kv_client_cache - .write() - .unwrap() - .insert(address.to_owned(), client.clone()); - client - }) + match self.kv_connect.connect(address) { + Ok(client) => { + self.kv_client_cache + .write() + .await + .insert(address.to_owned(), client.clone()); + Ok(client) + } + Err(e) => Err(e), + } } } @@ -338,16 +357,16 @@ pub mod test { use futures::{executor, executor::block_on}; - #[test] - fn test_kv_client_caching() { + #[tokio::test] + async fn test_kv_client_caching() { let client = block_on(pd_rpc_client()); let addr1 = "foo"; let addr2 = "bar"; - let kv1 = client.kv_client(&addr1).unwrap(); - let kv2 = client.kv_client(&addr2).unwrap(); - let kv3 = client.kv_client(&addr2).unwrap(); + let kv1 = client.kv_client(addr1).await.unwrap(); + let kv2 = client.kv_client(addr2).await.unwrap(); + let kv3 = client.kv_client(addr2).await.unwrap(); assert!(kv1.addr != kv2.addr); assert_eq!(kv2.addr, kv3.addr); } @@ -395,13 +414,13 @@ pub mod test { let k3: Key = vec![11, 4].into(); let range1 = (k1, k2.clone()).into(); let mut stream = executor::block_on_stream(client.clone().stores_for_range(range1)); - assert_eq!(stream.next().unwrap().unwrap().region.id(), 1); + assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1); assert!(stream.next().is_none()); let range2 = (k2, k3).into(); let mut stream = executor::block_on_stream(client.stores_for_range(range2)); - assert_eq!(stream.next().unwrap().unwrap().region.id(), 1); - assert_eq!(stream.next().unwrap().unwrap().region.id(), 2); + assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1); + assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 2); assert!(stream.next().is_none()); } diff --git a/src/pd/mod.rs b/src/pd/mod.rs index 3e3be6e1..de376b30 100644 --- a/src/pd/mod.rs +++ b/src/pd/mod.rs @@ -2,4 +2,4 @@ mod client; mod retry; pub use client::{PdClient, PdRpcClient}; -pub use retry::RetryClient; +pub use retry::{RetryClient, RetryClientTrait}; diff --git a/src/pd/retry.rs b/src/pd/retry.rs index ba42f601..f6524cad 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -3,7 +3,7 @@ //! A utility module for managing and retrying PD requests. use crate::{ - region::{Region, RegionId, StoreId}, + region::{RegionId, RegionWithLeader, StoreId}, stats::pd_stats, Error, Result, SecurityManager, }; @@ -28,6 +28,22 @@ const RECONNECT_INTERVAL_SEC: u64 = 1; const MAX_REQUEST_COUNT: usize = 5; const LEADER_CHANGE_RETRY: usize = 10; +#[async_trait] +pub trait RetryClientTrait { + // These get_* functions will try multiple times to make a request, reconnecting as necessary. + // It does not know about encoding. Caller should take care of it. + async fn get_region(self: Arc, key: Vec) -> Result; + + async fn get_region_by_id(self: Arc, region_id: RegionId) -> Result; + + async fn get_store(self: Arc, id: StoreId) -> Result; + + async fn get_all_stores(self: Arc) -> Result>; + + async fn get_timestamp(self: Arc) -> Result; + + async fn update_safepoint(self: Arc, safepoint: u64) -> Result; +} /// Client for communication with a PD cluster. Has the facility to reconnect to the cluster. pub struct RetryClient { // Tuple is the cluster and the time of the cluster's last reconnect. @@ -104,10 +120,13 @@ impl RetryClient { timeout, }) } +} +#[async_trait] +impl RetryClientTrait for RetryClient { // These get_* functions will try multiple times to make a request, reconnecting as necessary. // It does not know about encoding. Caller should take care of it. - pub async fn get_region(self: Arc, key: Vec) -> Result { + async fn get_region(self: Arc, key: Vec) -> Result { retry!(self, "get_region", |cluster| { let key = key.clone(); async { @@ -121,16 +140,18 @@ impl RetryClient { }) } - pub async fn get_region_by_id(self: Arc, region_id: RegionId) -> Result { + async fn get_region_by_id(self: Arc, region_id: RegionId) -> Result { retry!(self, "get_region_by_id", |cluster| async { cluster .get_region_by_id(region_id, self.timeout) .await - .and_then(|resp| region_from_response(resp, || Error::RegionNotFound { region_id })) + .and_then(|resp| { + region_from_response(resp, || Error::RegionNotFoundInResponse { region_id }) + }) }) } - pub async fn get_store(self: Arc, id: StoreId) -> Result { + async fn get_store(self: Arc, id: StoreId) -> Result { retry!(self, "get_store", |cluster| async { cluster .get_store(id, self.timeout) @@ -140,7 +161,7 @@ impl RetryClient { } #[allow(dead_code)] - pub async fn get_all_stores(self: Arc) -> Result> { + async fn get_all_stores(self: Arc) -> Result> { retry!(self, "get_all_stores", |cluster| async { cluster .get_all_stores(self.timeout) @@ -149,11 +170,11 @@ impl RetryClient { }) } - pub async fn get_timestamp(self: Arc) -> Result { + async fn get_timestamp(self: Arc) -> Result { retry!(self, "get_timestamp", |cluster| cluster.get_timestamp()) } - pub async fn update_safepoint(self: Arc, safepoint: u64) -> Result { + async fn update_safepoint(self: Arc, safepoint: u64) -> Result { retry!(self, "update_gc_safepoint", |cluster| async { cluster .update_safepoint(safepoint, self.timeout) @@ -174,9 +195,9 @@ impl fmt::Debug for RetryClient { fn region_from_response( resp: pdpb::GetRegionResponse, err: impl FnOnce() -> Error, -) -> Result { +) -> Result { let region = resp.region.ok_or_else(err)?; - Ok(Region::new(region, resp.leader)) + Ok(RegionWithLeader::new(region, resp.leader)) } // A node-like thing that can be connected to. @@ -209,13 +230,16 @@ impl Reconnect for RetryClient { mod test { use super::*; use futures::{executor, future::ready}; - use std::sync::Mutex; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Mutex, + }; use tikv_client_common::internal_err; #[test] fn test_reconnect() { struct MockClient { - reconnect_count: Mutex, + reconnect_count: AtomicUsize, cluster: RwLock<((), Instant)>, } @@ -224,7 +248,8 @@ mod test { type Cl = (); async fn reconnect(&self, _: u64) -> Result<()> { - *self.reconnect_count.lock().unwrap() += 1; + self.reconnect_count + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); // Not actually unimplemented, we just don't care about the error. Err(Error::Unimplemented) } @@ -240,23 +265,35 @@ mod test { executor::block_on(async { let client = Arc::new(MockClient { - reconnect_count: Mutex::new(0), + reconnect_count: AtomicUsize::new(0), cluster: RwLock::new(((), Instant::now())), }); assert!(retry_err(client.clone()).await.is_err()); - assert_eq!(*client.reconnect_count.lock().unwrap(), MAX_REQUEST_COUNT); + assert_eq!( + client + .reconnect_count + .load(std::sync::atomic::Ordering::SeqCst), + MAX_REQUEST_COUNT + ); - *client.reconnect_count.lock().unwrap() = 0; + client + .reconnect_count + .store(0, std::sync::atomic::Ordering::SeqCst); assert!(retry_ok(client.clone()).await.is_ok()); - assert_eq!(*client.reconnect_count.lock().unwrap(), 0); + assert_eq!( + client + .reconnect_count + .load(std::sync::atomic::Ordering::SeqCst), + 0 + ); }) } #[test] fn test_retry() { struct MockClient { - cluster: RwLock<(Mutex, Instant)>, + cluster: RwLock<(AtomicUsize, Instant)>, } #[async_trait] @@ -270,15 +307,13 @@ mod test { async fn retry_max_err( client: Arc, - max_retries: Arc>, + max_retries: Arc, ) -> Result<()> { retry!(client, "test", |c| { - let mut c = c.lock().unwrap(); - *c += 1; + c.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let mut max_retries = max_retries.lock().unwrap(); - *max_retries -= 1; - if *max_retries == 0 { + let max_retries = max_retries.fetch_sub(1, Ordering::SeqCst) - 1; + if max_retries == 0 { ready(Ok(())) } else { ready(Err(internal_err!("whoops"))) @@ -288,15 +323,13 @@ mod test { async fn retry_max_ok( client: Arc, - max_retries: Arc>, + max_retries: Arc, ) -> Result<()> { retry!(client, "test", |c| { - let mut c = c.lock().unwrap(); - *c += 1; + c.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let mut max_retries = max_retries.lock().unwrap(); - *max_retries -= 1; - if *max_retries == 0 { + let max_retries = max_retries.fetch_sub(1, Ordering::SeqCst) - 1; + if max_retries == 0 { ready(Ok(())) } else { ready(Err(internal_err!("whoops"))) @@ -306,23 +339,23 @@ mod test { executor::block_on(async { let client = Arc::new(MockClient { - cluster: RwLock::new((Mutex::new(0), Instant::now())), + cluster: RwLock::new((AtomicUsize::new(0), Instant::now())), }); - let max_retries = Arc::new(Mutex::new(1000)); + let max_retries = Arc::new(AtomicUsize::new(1000)); assert!(retry_max_err(client.clone(), max_retries).await.is_err()); assert_eq!( - *client.cluster.read().await.0.lock().unwrap(), + client.cluster.read().await.0.load(Ordering::SeqCst), LEADER_CHANGE_RETRY ); let client = Arc::new(MockClient { - cluster: RwLock::new((Mutex::new(0), Instant::now())), + cluster: RwLock::new((AtomicUsize::new(0), Instant::now())), }); - let max_retries = Arc::new(Mutex::new(2)); + let max_retries = Arc::new(AtomicUsize::new(2)); assert!(retry_max_ok(client.clone(), max_retries).await.is_ok()); - assert_eq!(*client.cluster.read().await.0.lock().unwrap(), 2); + assert_eq!(client.cluster.read().await.0.load(Ordering::SeqCst), 2); }) } } diff --git a/src/raw/client.rs b/src/raw/client.rs index 51ba7bef..c5f915e3 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -7,7 +7,7 @@ use crate::{ config::Config, pd::PdRpcClient, raw::lowering::*, - request::{Collect, Plan}, + request::{Collect, CollectSingle, Plan}, BoundRange, ColumnFamily, Key, KvPair, Result, Value, }; use slog::{Drain, Logger}; @@ -81,11 +81,18 @@ impl Client { ) -> Result { let logger = optional_logger.unwrap_or_else(|| { let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); - Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!()) + Logger::root( + slog_term::FullFormat::new(plain) + .build() + .filter_level(slog::Level::Info) + .fuse(), + o!(), + ) }); debug!(logger, "creating new raw client"); let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); - let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, false).await?); + let rpc = + Arc::new(PdRpcClient::connect(&pd_endpoints, config, false, logger.clone()).await?); Ok(Client { rpc, cf: None, @@ -165,9 +172,8 @@ impl Client { debug!(self.logger, "invoking raw get request"); let request = new_raw_get_request(key.into(), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .single_region() - .await? - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .merge(CollectSingle) .post_process_default() .plan(); plan.execute().await @@ -198,8 +204,7 @@ impl Client { debug!(self.logger, "invoking raw batch_get request"); let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .multi_region() - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) .plan(); plan.execute() @@ -227,9 +232,8 @@ impl Client { debug!(self.logger, "invoking raw put request"); let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .single_region() - .await? - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .merge(CollectSingle) .extract_error() .plan(); plan.execute().await?; @@ -264,8 +268,7 @@ impl Client { self.atomic, ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .multi_region() - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() .plan(); plan.execute().await?; @@ -293,9 +296,8 @@ impl Client { debug!(self.logger, "invoking raw delete request"); let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .single_region() - .await? - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .merge(CollectSingle) .extract_error() .plan(); plan.execute().await?; @@ -325,8 +327,7 @@ impl Client { let request = new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .multi_region() - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() .plan(); plan.execute().await?; @@ -353,8 +354,7 @@ impl Client { self.assert_non_atomic()?; let request = new_raw_delete_range_request(range.into(), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .multi_region() - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() .plan(); plan.execute().await?; @@ -510,9 +510,8 @@ impl Client { self.cf.clone(), ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req) - .single_region() - .await? - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .merge(CollectSingle) .post_process_default() .plan(); plan.execute().await @@ -533,8 +532,7 @@ impl Client { let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .multi_region() - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) .plan(); let res = plan.execute().await; @@ -564,8 +562,7 @@ impl Client { self.cf.clone(), ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .multi_region() - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) .plan(); plan.execute().await diff --git a/src/raw/requests.rs b/src/raw/requests.rs index e7c17b90..372d7147 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -2,9 +2,12 @@ use super::RawRpcRequest; use crate::{ + collect_first, pd::PdClient, - request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey}, - store::{store_stream_for_keys, store_stream_for_ranges, Store}, + request::{ + Collect, CollectSingle, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey, + }, + store::{store_stream_for_keys, store_stream_for_ranges, RegionStore}, transaction::HasLocks, util::iter::FlatMapOkIterExt, ColumnFamily, KvPair, Result, Value, @@ -25,6 +28,9 @@ impl KvRequest for kvrpcpb::RawGetRequest { type Response = kvrpcpb::RawGetResponse; } +shardable_key!(kvrpcpb::RawGetRequest); +collect_first!(kvrpcpb::RawGetResponse); + impl SingleKey for kvrpcpb::RawGetRequest { fn key(&self) -> &Vec { &self.key @@ -91,6 +97,8 @@ impl KvRequest for kvrpcpb::RawPutRequest { type Response = kvrpcpb::RawPutResponse; } +shardable_key!(kvrpcpb::RawPutRequest); +collect_first!(kvrpcpb::RawPutResponse); impl SingleKey for kvrpcpb::RawPutRequest { fn key(&self) -> &Vec { &self.key @@ -120,7 +128,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, Store)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { let mut pairs = self.pairs.clone(); pairs.sort_by(|a, b| a.key.cmp(&b.key)); store_stream_for_keys( @@ -129,8 +137,8 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { ) } - fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { - self.set_context(store.region.context()?); + fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + self.set_context(store.region_with_leader.context()?); self.set_pairs(shard); Ok(()) } @@ -153,6 +161,8 @@ impl KvRequest for kvrpcpb::RawDeleteRequest { type Response = kvrpcpb::RawDeleteResponse; } +shardable_key!(kvrpcpb::RawDeleteRequest); +collect_first!(kvrpcpb::RawDeleteResponse); impl SingleKey for kvrpcpb::RawDeleteRequest { fn key(&self) -> &Vec { &self.key @@ -254,12 +264,12 @@ impl Shardable for kvrpcpb::RawBatchScanRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, Store)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { store_stream_for_ranges(self.ranges.clone(), pd_client.clone()) } - fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { - self.set_context(store.region.context()?); + fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + self.set_context(store.region_with_leader.context()?); self.set_ranges(shard); Ok(()) } @@ -297,6 +307,8 @@ impl KvRequest for kvrpcpb::RawCasRequest { type Response = kvrpcpb::RawCasResponse; } +shardable_key!(kvrpcpb::RawCasRequest); +collect_first!(kvrpcpb::RawCasResponse); impl SingleKey for kvrpcpb::RawCasRequest { fn key(&self) -> &Vec { &self.key @@ -372,8 +384,10 @@ mod test { let mut resp = kvrpcpb::RawScanResponse::default(); for i in req.start_key[0]..req.end_key[0] { - let mut kv = kvrpcpb::KvPair::default(); - kv.key = vec![i]; + let kv = kvrpcpb::KvPair { + key: vec![i], + ..Default::default() + }; resp.kvs.push(kv); } @@ -390,10 +404,9 @@ mod test { key_only: true, ..Default::default() }; - let plan = crate::request::PlanBuilder::new(client.clone(), scan) + let plan = crate::request::PlanBuilder::new(client, scan) .resolve_lock(OPTIMISTIC_BACKOFF) - .multi_region() - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) .plan(); let scan = executor::block_on(async { plan.execute().await }).unwrap(); diff --git a/src/region.rs b/src/region.rs index 0c4cb3f9..b15086b9 100644 --- a/src/region.rs +++ b/src/region.rs @@ -1,3 +1,5 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + use crate::{Error, Key, Result}; use derive_new::new; use tikv_client_proto::{kvrpcpb, metapb}; @@ -22,12 +24,14 @@ pub struct RegionVerId { /// /// In TiKV all data is partitioned by range. Each partition is called a region. #[derive(new, Clone, Default, Debug, PartialEq)] -pub struct Region { +pub struct RegionWithLeader { pub region: metapb::Region, pub leader: Option, } -impl Region { +impl Eq for RegionWithLeader {} + +impl RegionWithLeader { pub fn contains(&self, key: &Key) -> bool { let key: &[u8] = key.into(); let start_key = self.region.get_start_key(); diff --git a/src/region_cache.rs b/src/region_cache.rs new file mode 100644 index 00000000..e7584788 --- /dev/null +++ b/src/region_cache.rs @@ -0,0 +1,494 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use crate::{ + pd::{RetryClient, RetryClientTrait}, + region::{RegionId, RegionVerId, RegionWithLeader, StoreId}, + Key, Result, +}; +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + sync::Arc, +}; +use tikv_client_common::Error; +use tikv_client_pd::Cluster; +use tikv_client_proto::metapb::{self, Store}; +use tokio::sync::{Notify, RwLock}; + +const MAX_RETRY_WAITING_CONCURRENT_REQUEST: usize = 4; + +struct RegionCacheMap { + /// RegionVerID -> Region. It stores the concrete region caches. + /// RegionVerID is the unique identifer of a region *across time*. + // TODO: does it need TTL? + ver_id_to_region: HashMap, + /// Start_key -> RegionVerID + /// + /// Invariant: there are no intersecting regions in the map at any time. + key_to_ver_id: BTreeMap, + /// RegionID -> RegionVerID. Note: regions with identical ID doesn't necessarily + /// mean they are the same, they can be different regions across time. + id_to_ver_id: HashMap, + /// We don't want to spawn multiple queries querying a same region id. If a + /// request is on its way, others will wait for its completion. + on_my_way_id: HashMap>, +} + +impl RegionCacheMap { + fn new() -> RegionCacheMap { + RegionCacheMap { + ver_id_to_region: HashMap::new(), + key_to_ver_id: BTreeMap::new(), + id_to_ver_id: HashMap::new(), + on_my_way_id: HashMap::new(), + } + } +} + +pub struct RegionCache> { + region_cache: RwLock, + store_cache: RwLock>, + inner_client: Arc, +} + +impl RegionCache { + pub fn new(inner_client: Arc) -> RegionCache { + RegionCache { + region_cache: RwLock::new(RegionCacheMap::new()), + store_cache: RwLock::new(HashMap::new()), + inner_client, + } + } +} + +impl RegionCache { + // Retrieve cache entry by key. If there's no entry, query PD and update cache. + pub async fn get_region_by_key(&self, key: &Key) -> Result { + let region_cache_guard = self.region_cache.read().await; + let res = { + region_cache_guard + .key_to_ver_id + .range(..=key) + .next_back() + .map(|(x, y)| (x.clone(), y.clone())) + }; + + if let Some((_, candidate_region_ver_id)) = res { + let region = region_cache_guard + .ver_id_to_region + .get(&candidate_region_ver_id) + .unwrap(); + + if region.contains(key) { + return Ok(region.clone()); + } + } + drop(region_cache_guard); + self.read_through_region_by_key(key.clone()).await + } + + // Retrieve cache entry by RegionId. If there's no entry, query PD and update cache. + pub async fn get_region_by_id(&self, id: RegionId) -> Result { + for _ in 0..=MAX_RETRY_WAITING_CONCURRENT_REQUEST { + let region_cache_guard = self.region_cache.read().await; + + // check cache + let ver_id = region_cache_guard.id_to_ver_id.get(&id); + if let Some(ver_id) = ver_id { + let region = region_cache_guard.ver_id_to_region.get(ver_id).unwrap(); + return Ok(region.clone()); + } + + // check concurrent requests + let notify = region_cache_guard.on_my_way_id.get(&id).cloned(); + drop(region_cache_guard); + + if let Some(n) = notify { + n.notified().await; + continue; + } else { + return self.read_through_region_by_id(id).await; + } + } + Err(Error::StringError(format!( + "Concurrent PD requests failed for {} times", + MAX_RETRY_WAITING_CONCURRENT_REQUEST + ))) + } + + pub async fn get_store_by_id(&self, id: StoreId) -> Result { + let store = self.store_cache.read().await.get(&id).cloned(); + match store { + Some(store) => Ok(store), + None => self.read_through_store_by_id(id).await, + } + } + + /// Force read through (query from PD) and update cache + pub async fn read_through_region_by_key(&self, key: Key) -> Result { + let region = self.inner_client.clone().get_region(key.into()).await?; + self.add_region(region.clone()).await; + Ok(region) + } + + /// Force read through (query from PD) and update cache + async fn read_through_region_by_id(&self, id: RegionId) -> Result { + // put a notify to let others know the region id is being queried + let notify = Arc::new(Notify::new()); + { + let mut region_cache_guard = self.region_cache.write().await; + region_cache_guard.on_my_way_id.insert(id, notify.clone()); + } + + let region = self.inner_client.clone().get_region_by_id(id).await?; + self.add_region(region.clone()).await; + + // notify others + { + let mut region_cache_guard = self.region_cache.write().await; + notify.notify_waiters(); + region_cache_guard.on_my_way_id.remove(&id); + } + + Ok(region) + } + + async fn read_through_store_by_id(&self, id: StoreId) -> Result { + let store = self.inner_client.clone().get_store(id).await?; + self.store_cache.write().await.insert(id, store.clone()); + Ok(store) + } + + pub async fn add_region(&self, region: RegionWithLeader) { + // FIXME: will it be the performance bottleneck? + let mut cache = self.region_cache.write().await; + + let end_key = region.end_key(); + let mut to_be_removed: HashSet = HashSet::new(); + + if let Some(ver_id) = cache.id_to_ver_id.get(®ion.id()) { + if ver_id != ®ion.ver_id() { + to_be_removed.insert(ver_id.clone()); + } + } + + let mut search_range = { + if end_key.is_empty() { + cache.key_to_ver_id.range(..) + } else { + cache.key_to_ver_id.range(..end_key) + } + }; + while let Some((_, ver_id_in_cache)) = search_range.next_back() { + let region_in_cache = cache.ver_id_to_region.get(ver_id_in_cache).unwrap(); + + if region_in_cache.region.end_key > region.region.start_key { + to_be_removed.insert(ver_id_in_cache.clone()); + } else { + break; + } + } + + for ver_id in to_be_removed { + let region_to_remove = cache.ver_id_to_region.remove(&ver_id).unwrap(); + cache.key_to_ver_id.remove(®ion_to_remove.start_key()); + cache.id_to_ver_id.remove(®ion_to_remove.id()); + } + cache + .key_to_ver_id + .insert(region.start_key(), region.ver_id()); + cache.id_to_ver_id.insert(region.id(), region.ver_id()); + cache.ver_id_to_region.insert(region.ver_id(), region); + } + + pub async fn update_leader( + &self, + ver_id: crate::region::RegionVerId, + leader: metapb::Peer, + ) -> Result<()> { + let mut cache = self.region_cache.write().await; + let region_entry = cache + .ver_id_to_region + .get_mut(&ver_id) + .ok_or(Error::EntryNotFoundInRegionCache)?; + region_entry.leader = Some(leader); + Ok(()) + } + + pub async fn invalidate_region_cache(&self, ver_id: crate::region::RegionVerId) { + let mut cache = self.region_cache.write().await; + let region_entry = cache.ver_id_to_region.get(&ver_id); + if let Some(region) = region_entry { + let id = region.id(); + let start_key = region.start_key(); + cache.ver_id_to_region.remove(&ver_id); + cache.id_to_ver_id.remove(&id); + cache.key_to_ver_id.remove(&start_key); + } + } +} + +#[cfg(test)] +mod test { + use super::RegionCache; + use crate::{ + pd::RetryClientTrait, + region::{RegionId, RegionWithLeader}, + Key, Result, + }; + use async_trait::async_trait; + use std::{ + collections::{BTreeMap, HashMap, HashSet}, + sync::{ + atomic::{AtomicU64, Ordering::SeqCst}, + Arc, + }, + }; + use tikv_client_common::Error; + use tikv_client_proto::metapb; + use tokio::sync::Mutex; + + #[derive(Default)] + struct MockRetryClient { + pub regions: Mutex>, + pub get_region_count: AtomicU64, + } + + #[async_trait] + impl RetryClientTrait for MockRetryClient { + async fn get_region( + self: Arc, + key: Vec, + ) -> Result { + self.get_region_count.fetch_add(1, SeqCst); + self.regions + .lock() + .await + .iter() + .filter(|(_, r)| r.contains(&key.clone().into())) + .map(|(_, r)| r.clone()) + .next() + .ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned())) + } + + async fn get_region_by_id( + self: Arc, + region_id: crate::region::RegionId, + ) -> Result { + self.get_region_count.fetch_add(1, SeqCst); + self.regions + .lock() + .await + .iter() + .filter(|(id, _)| id == &®ion_id) + .map(|(_, r)| r.clone()) + .next() + .ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned())) + } + + async fn get_store( + self: Arc, + _id: crate::region::StoreId, + ) -> Result { + todo!() + } + + async fn get_all_stores(self: Arc) -> Result> { + todo!() + } + + async fn get_timestamp(self: Arc) -> Result { + todo!() + } + + async fn update_safepoint(self: Arc, _safepoint: u64) -> Result { + todo!() + } + } + + #[tokio::test] + async fn cache_is_used() -> Result<()> { + let retry_client = Arc::new(MockRetryClient::default()); + let cache = RegionCache::new(retry_client.clone()); + retry_client.regions.lock().await.insert( + 1, + RegionWithLeader { + region: metapb::Region { + id: 1, + start_key: vec![], + end_key: vec![100], + ..Default::default() + }, + leader: Some(metapb::Peer { + store_id: 1, + ..Default::default() + }), + }, + ); + retry_client.regions.lock().await.insert( + 2, + RegionWithLeader { + region: metapb::Region { + id: 2, + start_key: vec![101], + end_key: vec![], + ..Default::default() + }, + leader: Some(metapb::Peer { + store_id: 2, + ..Default::default() + }), + }, + ); + + assert_eq!(retry_client.get_region_count.load(SeqCst), 0); + + // first query, read through + assert_eq!(cache.get_region_by_id(1).await?.end_key(), vec![100].into()); + assert_eq!(retry_client.get_region_count.load(SeqCst), 1); + + // should read from cache + assert_eq!(cache.get_region_by_id(1).await?.end_key(), vec![100].into()); + assert_eq!(retry_client.get_region_count.load(SeqCst), 1); + + // invalidate, should read through + cache + .invalidate_region_cache(cache.get_region_by_id(1).await?.ver_id()) + .await; + assert_eq!(cache.get_region_by_id(1).await?.end_key(), vec![100].into()); + assert_eq!(retry_client.get_region_count.load(SeqCst), 2); + + // update leader should work + cache + .update_leader( + cache.get_region_by_id(2).await?.ver_id(), + metapb::Peer { + store_id: 102, + ..Default::default() + }, + ) + .await?; + assert_eq!( + cache.get_region_by_id(2).await?.leader.unwrap().store_id, + 102 + ); + + Ok(()) + } + + #[tokio::test] + async fn test_add_disjoint_regions() { + let retry_client = Arc::new(MockRetryClient::default()); + let cache = RegionCache::new(retry_client.clone()); + let region1 = region(1, vec![], vec![10]); + let region2 = region(2, vec![10], vec![20]); + let region3 = region(3, vec![30], vec![]); + cache.add_region(region1.clone()).await; + cache.add_region(region2.clone()).await; + cache.add_region(region3.clone()).await; + + let mut expected_cache = BTreeMap::new(); + expected_cache.insert(vec![].into(), region1); + expected_cache.insert(vec![10].into(), region2); + expected_cache.insert(vec![30].into(), region3); + + assert(&cache, &expected_cache).await + } + + #[tokio::test] + async fn test_add_intersecting_regions() { + let retry_client = Arc::new(MockRetryClient::default()); + let cache = RegionCache::new(retry_client.clone()); + + cache.add_region(region(1, vec![], vec![10])).await; + cache.add_region(region(2, vec![10], vec![20])).await; + cache.add_region(region(3, vec![30], vec![40])).await; + cache.add_region(region(4, vec![50], vec![60])).await; + cache.add_region(region(5, vec![20], vec![35])).await; + + let mut expected_cache: BTreeMap = BTreeMap::new(); + expected_cache.insert(vec![].into(), region(1, vec![], vec![10])); + expected_cache.insert(vec![10].into(), region(2, vec![10], vec![20])); + expected_cache.insert(vec![20].into(), region(5, vec![20], vec![35])); + expected_cache.insert(vec![50].into(), region(4, vec![50], vec![60])); + assert(&cache, &expected_cache).await; + + cache.add_region(region(6, vec![15], vec![25])).await; + let mut expected_cache = BTreeMap::new(); + expected_cache.insert(vec![].into(), region(1, vec![], vec![10])); + expected_cache.insert(vec![15].into(), region(6, vec![15], vec![25])); + expected_cache.insert(vec![50].into(), region(4, vec![50], vec![60])); + assert(&cache, &expected_cache).await; + + cache.add_region(region(7, vec![20], vec![])).await; + let mut expected_cache = BTreeMap::new(); + expected_cache.insert(vec![].into(), region(1, vec![], vec![10])); + expected_cache.insert(vec![20].into(), region(7, vec![20], vec![])); + assert(&cache, &expected_cache).await; + + cache.add_region(region(8, vec![], vec![15])).await; + let mut expected_cache = BTreeMap::new(); + expected_cache.insert(vec![].into(), region(8, vec![], vec![15])); + expected_cache.insert(vec![20].into(), region(7, vec![20], vec![])); + assert(&cache, &expected_cache).await; + } + + #[tokio::test] + async fn test_get_region_by_key() -> Result<()> { + let retry_client = Arc::new(MockRetryClient::default()); + let cache = RegionCache::new(retry_client.clone()); + + let region1 = region(1, vec![], vec![10]); + let region2 = region(2, vec![10], vec![20]); + let region3 = region(3, vec![30], vec![40]); + let region4 = region(4, vec![50], vec![]); + cache.add_region(region1.clone()).await; + cache.add_region(region2.clone()).await; + cache.add_region(region3.clone()).await; + cache.add_region(region4.clone()).await; + + assert_eq!( + cache.get_region_by_key(&vec![].into()).await?, + region1.clone() + ); + assert_eq!( + cache.get_region_by_key(&vec![5].into()).await?, + region1.clone() + ); + assert_eq!( + cache.get_region_by_key(&vec![10].into()).await?, + region2.clone() + ); + assert!(cache.get_region_by_key(&vec![20].into()).await.is_err()); + assert!(cache.get_region_by_key(&vec![25].into()).await.is_err()); + assert_eq!(cache.get_region_by_key(&vec![60].into()).await?, region4); + Ok(()) + } + + // a helper function to assert the cache is in expected state + async fn assert( + cache: &RegionCache, + expected_cache: &BTreeMap, + ) { + let guard = cache.region_cache.read().await; + let mut actual_keys = guard.ver_id_to_region.values().collect::>(); + let mut expected_keys = expected_cache.values().collect::>(); + actual_keys.sort_by_cached_key(|r| r.id()); + expected_keys.sort_by_cached_key(|r| r.id()); + + assert_eq!(actual_keys, expected_keys); + assert_eq!( + guard.key_to_ver_id.keys().collect::>(), + expected_cache.keys().collect::>() + ) + } + + fn region(id: RegionId, start_key: Vec, end_key: Vec) -> RegionWithLeader { + let mut region = RegionWithLeader::default(); + region.region.set_id(id); + region.region.set_start_key(start_key); + region.region.set_end_key(end_key); + // We don't care about other fields here + + region + } +} diff --git a/src/request/mod.rs b/src/request/mod.rs index 0f21efbc..4d43c00c 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -6,13 +6,13 @@ use crate::{ }; use async_trait::async_trait; use derive_new::new; -use tikv_client_store::{HasError, Request}; +use tikv_client_store::{HasKeyErrors, Request}; pub use self::{ plan::{ - Collect, CollectAndMatchKey, CollectError, DefaultProcessor, Dispatch, ExtractError, - HasKeys, Merge, MergeResponse, MultiRegion, Plan, PreserveKey, Process, ProcessResponse, - ResolveLock, RetryRegion, + Collect, CollectAndMatchKey, CollectError, CollectSingle, DefaultProcessor, Dispatch, + ExtractError, HasKeys, Merge, MergeResponse, Plan, PreserveKey, Process, ProcessResponse, + ResolveLock, RetryableMultiRegion, }, plan_builder::{PlanBuilder, SingleKey}, shard::Shardable, @@ -27,7 +27,7 @@ mod shard; #[async_trait] pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static { /// The expected response to the request. - type Response: HasError + HasLocks + Clone + Send + 'static; + type Response: HasKeyErrors + HasLocks + Clone + Send + 'static; } #[derive(Clone, Debug, new, Eq, PartialEq)] @@ -70,30 +70,29 @@ mod test { transaction::lowering::new_commit_request, Error, Key, Result, }; - use futures::executor; use grpcio::CallOption; use std::{ any::Any, iter, - sync::{Arc, Mutex}, + sync::{atomic::AtomicUsize, Arc}, }; use tikv_client_proto::{kvrpcpb, pdpb::Timestamp, tikvpb::TikvClient}; use tikv_client_store::HasRegionError; - #[test] - fn test_region_retry() { + #[tokio::test] + async fn test_region_retry() { #[derive(Clone)] struct MockRpcResponse; - impl HasError for MockRpcResponse { - fn error(&mut self) -> Option { + impl HasKeyErrors for MockRpcResponse { + fn key_errors(&mut self) -> Option> { None } } impl HasRegionError for MockRpcResponse { - fn region_error(&mut self) -> Option { - Some(Error::RegionNotFound { region_id: 1 }) + fn region_error(&mut self) -> Option { + Some(tikv_client_proto::errorpb::Error::default()) } } @@ -101,7 +100,7 @@ mod test { #[derive(Clone)] struct MockKvRequest { - test_invoking_count: Arc>, + test_invoking_count: Arc, } #[async_trait] @@ -136,11 +135,11 @@ mod test { pd_client: &std::sync::Arc, ) -> futures::stream::BoxStream< 'static, - crate::Result<(Self::Shard, crate::store::Store)>, + crate::Result<(Self::Shard, crate::store::RegionStore)>, > { // Increases by 1 for each call. - let mut test_invoking_count = self.test_invoking_count.lock().unwrap(); - *test_invoking_count += 1; + self.test_invoking_count + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); store_stream_for_keys( Some(Key::from("mock_key".to_owned())).into_iter(), pd_client.clone(), @@ -150,13 +149,13 @@ mod test { fn apply_shard( &mut self, _shard: Self::Shard, - _store: &crate::store::Store, + _store: &crate::store::RegionStore, ) -> crate::Result<()> { Ok(()) } } - let invoking_count = Arc::new(Mutex::new(0)); + let invoking_count = Arc::new(AtomicUsize::new(0)); let request = MockKvRequest { test_invoking_count: invoking_count.clone(), @@ -168,18 +167,17 @@ mod test { let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) .resolve_lock(Backoff::no_jitter_backoff(1, 1, 3)) - .multi_region() - .retry_region(Backoff::no_jitter_backoff(1, 1, 3)) + .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3)) .extract_error() .plan(); - let _ = executor::block_on(async { plan.execute().await }); + let _ = plan.execute().await; // Original call plus the 3 retries - assert_eq!(*invoking_count.lock().unwrap(), 4); + assert_eq!(invoking_count.load(std::sync::atomic::Ordering::SeqCst), 4); } - #[test] - fn test_extract_error() { + #[tokio::test] + async fn test_extract_error() { let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |_: &dyn Any| { Ok(Box::new(kvrpcpb::CommitResponse { @@ -206,18 +204,16 @@ mod test { // does not extract error let plan = crate::request::PlanBuilder::new(pd_client.clone(), req.clone()) .resolve_lock(OPTIMISTIC_BACKOFF) - .multi_region() - .retry_region(OPTIMISTIC_BACKOFF) + .retry_multi_region(OPTIMISTIC_BACKOFF) .plan(); - assert!(executor::block_on(async { plan.execute().await }).is_ok()); + assert!(plan.execute().await.is_ok()); // extract error let plan = crate::request::PlanBuilder::new(pd_client.clone(), req) .resolve_lock(OPTIMISTIC_BACKOFF) - .multi_region() - .retry_region(OPTIMISTIC_BACKOFF) + .retry_multi_region(OPTIMISTIC_BACKOFF) .extract_error() .plan(); - assert!(executor::block_on(async { plan.execute().await }).is_err()); + assert!(plan.execute().await.is_err()); } } diff --git a/src/request/plan.rs b/src/request/plan.rs index 715a9924..46f17840 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -5,15 +5,18 @@ use crate::{ pd::PdClient, request::{KvRequest, Shardable}, stats::tikv_stats, + store::RegionStore, transaction::{resolve_locks, HasLocks}, util::iter::FlatMapOkIterExt, Error, Key, KvPair, Result, Value, }; +use async_recursion::async_recursion; use async_trait::async_trait; -use futures::{prelude::*, stream::StreamExt}; +use futures::{future::try_join_all, stream::StreamExt}; use std::{marker::PhantomData, sync::Arc}; -use tikv_client_proto::kvrpcpb; -use tikv_client_store::{HasError, HasRegionError, KvClient}; +use tikv_client_proto::{errorpb::EpochNotMatch, kvrpcpb}; +use tikv_client_store::{HasKeyErrors, HasRegionError, HasRegionErrors, KvClient}; +use tokio::sync::Semaphore; /// A plan for how to execute a request. A user builds up a plan with various /// options, then exectutes it. @@ -63,42 +66,214 @@ impl HasKeys for Dispatch { } } -pub struct MultiRegion { +const MULTI_REGION_CONCURRENCY: usize = 16; + +pub struct RetryableMultiRegion { pub(super) inner: P, pub pd_client: Arc, + pub backoff: Backoff, +} + +impl RetryableMultiRegion +where + P::Result: HasKeyErrors + HasRegionError, +{ + // A plan may involve multiple shards + #[async_recursion] + async fn single_plan_handler( + pd_client: Arc, + current_plan: P, + backoff: Backoff, + permits: Arc, + ) -> Result<::Result> { + let shards = current_plan.shards(&pd_client).collect::>().await; + let mut handles = Vec::new(); + for shard in shards { + let (shard, region_store) = shard?; + let mut clone = current_plan.clone(); + clone.apply_shard(shard, ®ion_store)?; + let handle = tokio::spawn(Self::single_shard_handler( + pd_client.clone(), + clone, + region_store, + backoff.clone(), + permits.clone(), + )); + handles.push(handle); + } + Ok(try_join_all(handles) + .await? + .into_iter() + .collect::>>()? + .into_iter() + .flatten() + .collect()) + } + + #[async_recursion] + async fn single_shard_handler( + pd_client: Arc, + plan: P, + region_store: RegionStore, + mut backoff: Backoff, + permits: Arc, + ) -> Result<::Result> { + // limit concurrent requests + let permit = permits.acquire().await.unwrap(); + let mut resp = plan.execute().await?; + drop(permit); + + if let Some(e) = resp.key_errors() { + Ok(vec![Err(Error::MultipleKeyErrors(e))]) + } else if let Some(e) = resp.region_error() { + match backoff.next_delay_duration() { + Some(duration) => { + let region_error_resolved = + Self::handle_region_error(pd_client.clone(), e, region_store).await?; + // don't sleep if we have resolved the region error + if !region_error_resolved { + futures_timer::Delay::new(duration).await; + } + Self::single_plan_handler(pd_client, plan, backoff, permits).await + } + None => Err(Error::RegionError(e)), + } + } else { + Ok(vec![Ok(resp)]) + } + } + + // Returns + // 1. Ok(true): error has been resolved, retry immediately + // 2. Ok(false): backoff, and then retry + // 3. Err(Error): can't be resolved, return the error to upper level + async fn handle_region_error( + pd_client: Arc, + mut e: tikv_client_proto::errorpb::Error, + region_store: RegionStore, + ) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if e.has_not_leader() { + let not_leader = e.get_not_leader(); + if not_leader.has_leader() { + match pd_client + .update_leader( + region_store.region_with_leader.ver_id(), + not_leader.get_leader().clone(), + ) + .await + { + Ok(_) => Ok(true), + Err(e) => { + pd_client.invalidate_region_cache(ver_id).await; + Err(e) + } + } + } else { + // The peer doesn't know who is the current leader. Generally it's because + // the Raft group is in an election, but it's possible that the peer is + // isolated and removed from the Raft group. So it's necessary to reload + // the region from PD. + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } + } else if e.has_store_not_match() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.has_epoch_not_match() { + Self::on_region_epoch_not_match( + pd_client.clone(), + region_store, + e.take_epoch_not_match(), + ) + .await + } else if e.has_stale_command() || e.has_region_not_found() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.has_server_is_busy() + || e.has_raft_entry_too_large() + || e.has_max_timestamp_not_synced() + { + Err(Error::RegionError(e)) + } else { + // TODO: pass the logger around + // info!("unknwon region error: {:?}", e); + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } + } + + // Returns + // 1. Ok(true): error has been resolved, retry immediately + // 2. Ok(false): backoff, and then retry + // 3. Err(Error): can't be resolved, return the error to upper level + async fn on_region_epoch_not_match( + pd_client: Arc, + region_store: RegionStore, + error: EpochNotMatch, + ) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if error.get_current_regions().is_empty() { + pd_client.invalidate_region_cache(ver_id).await; + return Ok(true); + } + + for r in error.get_current_regions() { + if r.get_id() == region_store.region_with_leader.id() { + let returned_conf_ver = r.get_region_epoch().get_conf_ver(); + let returned_version = r.get_region_epoch().get_version(); + let current_conf_ver = region_store + .region_with_leader + .region + .get_region_epoch() + .get_conf_ver(); + let current_version = region_store + .region_with_leader + .region + .get_region_epoch() + .get_version(); + + // Find whether the current region is ahead of TiKV's. If so, backoff. + if returned_conf_ver < current_conf_ver || returned_version < current_version { + return Ok(false); + } + } + } + // TODO: finer grained processing + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } } -impl Clone for MultiRegion { +impl Clone for RetryableMultiRegion { fn clone(&self) -> Self { - MultiRegion { + RetryableMultiRegion { inner: self.inner.clone(), pd_client: self.pd_client.clone(), + backoff: self.backoff.clone(), } } } #[async_trait] -impl Plan for MultiRegion +impl Plan for RetryableMultiRegion where - P::Result: HasError, + P::Result: HasKeyErrors + HasRegionError, { type Result = Vec>; async fn execute(&self) -> Result { - Ok(self - .inner - .shards(&self.pd_client) - .and_then(move |(shard, store)| async move { - let mut clone = self.inner.clone(); - clone.apply_shard(shard, &store)?; - let mut response = clone.execute().await?; - match response.error() { - Some(e) => Err(e), - None => Ok(response), - } - }) - .collect() - .await) + // Limit the maximum concurrency of multi-region request. If there are + // too many concurrent requests, TiKV is more likely to return a "TiKV + // is busy" error + let concurrency_permits = Arc::new(Semaphore::new(MULTI_REGION_CONCURRENCY)); + Self::single_plan_handler( + self.pd_client.clone(), + self.inner.clone(), + self.backoff.clone(), + concurrency_permits.clone(), + ) + .await } } @@ -131,6 +306,25 @@ impl>>, M: Me #[derive(Clone, Copy)] pub struct Collect; +/// A merge strategy that only takes the first element. It's used for requests +/// that should have exactly one response, e.g. a get request. +#[derive(Clone, Copy)] +pub struct CollectSingle; + +#[macro_export] +macro_rules! collect_first { + ($type_: ty) => { + impl Merge<$type_> for CollectSingle { + type Out = $type_; + + fn merge(&self, mut input: Vec>) -> Result { + assert!(input.len() == 1); + input.pop().unwrap() + } + } + }; +} + /// A merge strategy to be used with /// [`preserve_keys`](super::plan_builder::PlanBuilder::preserve_keys). /// It matches the keys preserved before and the values returned in the response. @@ -178,46 +372,6 @@ impl, Pr: Process> P #[derive(Clone, Copy, Debug)] pub struct DefaultProcessor; -pub struct RetryRegion { - pub inner: P, - pub pd_client: Arc, - pub backoff: Backoff, -} - -impl Clone for RetryRegion { - fn clone(&self) -> Self { - RetryRegion { - inner: self.inner.clone(), - pd_client: self.pd_client.clone(), - backoff: self.backoff.clone(), - } - } -} - -#[async_trait] -impl Plan for RetryRegion -where - P::Result: HasError, -{ - type Result = P::Result; - - async fn execute(&self) -> Result { - let mut result = self.inner.execute().await?; - let mut clone = self.clone(); - while let Some(region_error) = result.region_error() { - match clone.backoff.next_delay_duration() { - None => return Err(region_error), - Some(delay_duration) => { - futures_timer::Delay::new(delay_duration).await; - result = clone.inner.execute().await?; - } - } - } - - Ok(result) - } -} - pub struct ResolveLock { pub inner: P, pub pd_client: Arc, @@ -299,16 +453,18 @@ impl Clone for ExtractError

{ #[async_trait] impl Plan for ExtractError

where - P::Result: HasError, + P::Result: HasKeyErrors + HasRegionErrors, { type Result = P::Result; async fn execute(&self) -> Result { let mut result = self.inner.execute().await?; - if let Some(error) = result.error() { - Err(error) - } else if let Some(error) = result.region_error() { - Err(error) + if let Some(errors) = result.key_errors() { + Err(Error::ExtractedErrors(errors)) + } else if let Some(errors) = result.region_errors() { + Err(Error::ExtractedErrors( + errors.into_iter().map(Error::RegionError).collect(), + )) } else { Ok(result) } @@ -355,9 +511,9 @@ pub trait HasKeys { #[derive(Debug, Clone)] pub struct ResponseAndKeys(Resp, Vec); -impl HasError for ResponseAndKeys { - fn error(&mut self) -> Option { - self.0.error() +impl HasKeyErrors for ResponseAndKeys { + fn key_errors(&mut self) -> Option> { + self.0.key_errors() } } @@ -368,7 +524,7 @@ impl HasLocks for ResponseAndKeys { } impl HasRegionError for ResponseAndKeys { - fn region_error(&mut self) -> Option { + fn region_error(&mut self) -> Option { self.0.region_error() } } @@ -411,7 +567,10 @@ impl Merge> for CollectAndMatc mod test { use super::*; use crate::mock::{mock_store, MockPdClient}; - use futures::stream::BoxStream; + use futures::{ + stream::{self, BoxStream}, + TryStreamExt, + }; use tikv_client_proto::kvrpcpb::BatchGetResponse; #[derive(Clone)] @@ -432,35 +591,28 @@ mod test { fn shards( &self, _: &Arc, - ) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::Store)>> { + ) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::RegionStore)>> { Box::pin(stream::iter(1..=3).map(|_| Err(Error::Unimplemented))) .map_ok(|_: u8| (42, mock_store())) .boxed() } - fn apply_shard(&mut self, _: Self::Shard, _: &crate::store::Store) -> Result<()> { + fn apply_shard(&mut self, _: Self::Shard, _: &crate::store::RegionStore) -> Result<()> { Ok(()) } } #[tokio::test] async fn test_err() { - let plan = RetryRegion { - inner: MultiRegion { - inner: ResolveLock { - inner: ErrPlan, - backoff: Backoff::no_backoff(), - pd_client: Arc::new(MockPdClient::default()), - }, + let plan = RetryableMultiRegion { + inner: ResolveLock { + inner: ErrPlan, + backoff: Backoff::no_backoff(), pd_client: Arc::new(MockPdClient::default()), }, - backoff: Backoff::no_backoff(), pd_client: Arc::new(MockPdClient::default()), + backoff: Backoff::no_backoff(), }; - plan.execute() - .await - .unwrap() - .iter() - .for_each(|r| assert!(r.is_err())); + assert!(plan.execute().await.is_err()) } } diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 5df26c94..7779017e 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -5,15 +5,15 @@ use crate::{ backoff::Backoff, pd::PdClient, request::{ - DefaultProcessor, Dispatch, ExtractError, HasKeys, KvRequest, Merge, MergeResponse, - MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion, Shardable, + DefaultProcessor, Dispatch, ExtractError, HasKeys, KvRequest, Merge, MergeResponse, Plan, + Process, ProcessResponse, ResolveLock, RetryableMultiRegion, Shardable, }, - store::Store, + store::RegionStore, transaction::HasLocks, Result, }; use std::{marker::PhantomData, sync::Arc}; -use tikv_client_store::HasError; +use tikv_client_store::{HasKeyErrors, HasRegionError, HasRegionErrors}; /// Builder type for plans (see that module for more). pub struct PlanBuilder { @@ -68,24 +68,6 @@ impl PlanBuilder { } } - /// If there is a region error, re-shard the request and re-resolve regions, then retry. - /// - /// Note that this plan must wrap a multi-region plan if the request should be re-sharded. - pub fn retry_region(self, backoff: Backoff) -> PlanBuilder, Ph> - where - P::Result: HasError, - { - PlanBuilder { - pd_client: self.pd_client.clone(), - plan: RetryRegion { - inner: self.plan, - backoff, - pd_client: self.pd_client, - }, - phantom: PhantomData, - } - } - /// Merge the results of a request. Usually used where a request is sent to multiple regions /// to combine the responses from each region. pub fn merge>(self, merge: M) -> PlanBuilder, Ph> @@ -128,15 +110,19 @@ impl PlanBuilder { impl PlanBuilder where - P::Result: HasError, + P::Result: HasKeyErrors + HasRegionError, { /// Split the request into shards sending a request to the region of each shard. - pub fn multi_region(self) -> PlanBuilder, Targetted> { + pub fn retry_multi_region( + self, + backoff: Backoff, + ) -> PlanBuilder, Targetted> { PlanBuilder { pd_client: self.pd_client.clone(), - plan: MultiRegion { + plan: RetryableMultiRegion { inner: self.plan, pd_client: self.pd_client, + backoff, }, phantom: PhantomData, } @@ -144,9 +130,12 @@ where } impl PlanBuilder, NoTarget> { - /// Target the request at a single region. + /// Target the request at a single region. *Note*: single region plan will + /// cannot automatically retry on region errors. It's only used for requests + /// that target at a specific region but not keys (e.g. ResolveLockRequest). pub async fn single_region(self) -> Result, Targetted>> { let key = self.plan.request.key(); + // TODO: retry when region error occurred let store = self.pd_client.clone().store_for_key(key.into()).await?; set_single_region_store(self.plan, store, self.pd_client) } @@ -156,7 +145,7 @@ impl PlanBuilder, NoTarget> { /// Target the request at a single region; caller supplies the store to target. pub async fn single_region_with_store( self, - store: Store, + store: RegionStore, ) -> Result, Targetted>> { set_single_region_store(self.plan, store, self.pd_client) } @@ -164,7 +153,7 @@ impl PlanBuilder, NoTarget> { impl PlanBuilder where - P::Result: HasError, + P::Result: HasKeyErrors, { pub fn preserve_keys(self) -> PlanBuilder, NoTarget> { PlanBuilder { @@ -177,7 +166,7 @@ where impl PlanBuilder where - P::Result: HasError, + P::Result: HasKeyErrors + HasRegionErrors, { pub fn extract_error(self) -> PlanBuilder, Targetted> { PlanBuilder { @@ -190,10 +179,11 @@ where fn set_single_region_store( mut plan: Dispatch, - store: Store, + store: RegionStore, pd_client: Arc, ) -> Result, Targetted>> { - plan.request.set_context(store.region.context()?); + plan.request + .set_context(store.region_with_leader.context()?); plan.kv_client = Some(store.client); Ok(PlanBuilder { plan, diff --git a/src/request/shard.rs b/src/request/shard.rs index e1300c5d..ec7ab37e 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -2,8 +2,8 @@ use crate::{ pd::PdClient, - request::{Dispatch, HasKeys, KvRequest, Plan, PreserveKey, ResolveLock, RetryRegion}, - store::Store, + request::{Dispatch, HasKeys, KvRequest, Plan, PreserveKey, ResolveLock}, + store::RegionStore, Result, }; use futures::stream::BoxStream; @@ -16,11 +16,11 @@ macro_rules! impl_inner_shardable { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, Store)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { self.inner.shards(pd_client) } - fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { + fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { self.inner.apply_shard(shard, store) } }; @@ -32,9 +32,9 @@ pub trait Shardable { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, Store)>>; + ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>>; - fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()>; + fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()>; } impl Shardable for Dispatch { @@ -43,11 +43,11 @@ impl Shardable for Dispatch { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, Store)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { self.request.shards(pd_client) } - fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { + fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { self.kv_client = Some(store.client.clone()); self.request.apply_shard(shard, store) } @@ -61,8 +61,37 @@ impl Shardable for PreserveKey

{ impl_inner_shardable!(); } -impl Shardable for RetryRegion { - impl_inner_shardable!(); +#[macro_export] +macro_rules! shardable_key { + ($type_: ty) => { + impl Shardable for $type_ { + type Shard = Vec>; + + fn shards( + &self, + pd_client: &std::sync::Arc, + ) -> futures::stream::BoxStream< + 'static, + crate::Result<(Self::Shard, crate::store::RegionStore)>, + > { + crate::store::store_stream_for_keys( + std::iter::once(self.key.clone()), + pd_client.clone(), + ) + } + + fn apply_shard( + &mut self, + mut shard: Self::Shard, + store: &crate::store::RegionStore, + ) -> crate::Result<()> { + self.set_context(store.region_with_leader.context()?); + assert!(shard.len() == 1); + self.set_key(shard.pop().unwrap()); + Ok(()) + } + } + }; } #[macro_export] @@ -76,7 +105,7 @@ macro_rules! shardable_keys { pd_client: &std::sync::Arc, ) -> futures::stream::BoxStream< 'static, - crate::Result<(Self::Shard, crate::store::Store)>, + crate::Result<(Self::Shard, crate::store::RegionStore)>, > { let mut keys = self.keys.clone(); keys.sort(); @@ -86,9 +115,9 @@ macro_rules! shardable_keys { fn apply_shard( &mut self, shard: Self::Shard, - store: &crate::store::Store, + store: &crate::store::RegionStore, ) -> crate::Result<()> { - self.set_context(store.region.context()?); + self.set_context(store.region_with_leader.context()?); self.set_keys(shard.into_iter().map(Into::into).collect()); Ok(()) } @@ -105,7 +134,7 @@ macro_rules! shardable_range { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::Store)>> { + ) -> BoxStream<'static, crate::Result<(Self::Shard, crate::store::RegionStore)>> { let start_key = self.start_key.clone().into(); let end_key = self.end_key.clone().into(); crate::store::store_stream_for_range((start_key, end_key), pd_client.clone()) @@ -114,9 +143,9 @@ macro_rules! shardable_range { fn apply_shard( &mut self, shard: Self::Shard, - store: &crate::store::Store, + store: &crate::store::RegionStore, ) -> crate::Result<()> { - self.set_context(store.region.context()?); + self.set_context(store.region_with_leader.context()?); self.set_start_key(shard.0.into()); self.set_end_key(shard.1.into()); diff --git a/src/store.rs b/src/store.rs index 3487330b..21edbe54 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,6 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use crate::{pd::PdClient, region::Region, BoundRange, Key, Result}; +use crate::{pd::PdClient, region::RegionWithLeader, BoundRange, Key, Result}; use derive_new::new; use futures::{prelude::*, stream::BoxStream}; use std::{ @@ -10,17 +10,17 @@ use std::{ use tikv_client_proto::kvrpcpb; use tikv_client_store::{KvClient, KvConnect, TikvConnect}; -#[derive(new)] -pub struct Store { - pub region: Region, +#[derive(new, Clone)] +pub struct RegionStore { + pub region_with_leader: RegionWithLeader, pub client: Arc, } pub trait KvConnectStore: KvConnect { - fn connect_to_store(&self, region: Region, address: String) -> Result { + fn connect_to_store(&self, region: RegionWithLeader, address: String) -> Result { log::info!("connect to tikv endpoint: {:?}", &address); let client = self.connect(address.as_str())?; - Ok(Store::new(region, Arc::new(client))) + Ok(RegionStore::new(region, Arc::new(client))) } } @@ -30,7 +30,7 @@ impl KvConnectStore for TikvConnect {} pub fn store_stream_for_keys( key_data: impl Iterator + Send + Sync + 'static, pd_client: Arc, -) -> BoxStream<'static, Result<(Vec, Store)>> +) -> BoxStream<'static, Result<(Vec, RegionStore)>> where PdC: PdClient, K: AsRef + Into + Send + Sync + 'static, @@ -52,12 +52,12 @@ where pub fn store_stream_for_range( range: (Vec, Vec), pd_client: Arc, -) -> BoxStream<'static, Result<((Vec, Vec), Store)>> { +) -> BoxStream<'static, Result<((Vec, Vec), RegionStore)>> { let bnd_range = BoundRange::from(range.clone()); pd_client .stores_for_range(bnd_range) .map_ok(move |store| { - let region_range = store.region.range(); + let region_range = store.region_with_leader.range(); let result_range = range_intersection( region_range, (range.0.clone().into(), range.1.clone().into()), @@ -70,12 +70,12 @@ pub fn store_stream_for_range( pub fn store_stream_for_range_by_start_key( start_key: Key, pd_client: Arc, -) -> BoxStream<'static, Result<(Vec, Store)>> { +) -> BoxStream<'static, Result<(Vec, RegionStore)>> { let bnd_range = BoundRange::range_from(start_key.clone()); pd_client .stores_for_range(bnd_range) .map_ok(move |store| { - let region_range = store.region.range(); + let region_range = store.region_with_leader.range(); ( range_intersection(region_range, (start_key.clone(), vec![].into())) .0 @@ -102,7 +102,7 @@ fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) pub fn store_stream_for_ranges( ranges: Vec, pd_client: Arc, -) -> BoxStream<'static, Result<(Vec, Store)>> { +) -> BoxStream<'static, Result<(Vec, RegionStore)>> { pd_client .clone() .group_ranges_by_region(ranges) diff --git a/src/transaction/buffer.rs b/src/transaction/buffer.rs index 583599cc..0f1d2428 100644 --- a/src/transaction/buffer.rs +++ b/src/transaction/buffer.rs @@ -401,10 +401,7 @@ mod tests { )) .unwrap() .collect::>(), - vec![KvPair( - Key::from(b"key1".to_vec()), - Value::from(b"value".to_vec()), - ),] + vec![KvPair(Key::from(b"key1".to_vec()), b"value".to_vec(),),] ); } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index f31db490..c7b83a96 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -89,11 +89,17 @@ impl Client { ) -> Result { let logger = optional_logger.unwrap_or_else(|| { let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); - Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!()) + Logger::root( + slog_term::FullFormat::new(plain) + .build() + .filter_level(slog::Level::Info) + .fuse(), + o!(), + ) }); debug!(logger, "creating new transactional client"); let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); - let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, true).await?); + let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true, logger.clone()).await?); Ok(Client { pd, logger }) } @@ -220,8 +226,7 @@ impl Client { let plan = crate::request::PlanBuilder::new(self.pd.clone(), req) .resolve_lock(OPTIMISTIC_BACKOFF) - .multi_region() - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(crate::request::Collect) .plan(); let res: Vec = plan.execute().await?; @@ -235,6 +240,7 @@ impl Client { } // resolve locks + // FIXME: (1) this is inefficient (2) when region error occurred resolve_locks(locks, self.pd.clone()).await?; // update safepoint to PD diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index c10f3837..871effc6 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -4,7 +4,7 @@ use crate::{ backoff::{Backoff, DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF}, pd::PdClient, region::RegionVerId, - request::Plan, + request::{CollectSingle, Plan}, timestamp::TimestampExt, transaction::requests, Error, Result, @@ -63,10 +63,9 @@ pub async fn resolve_locks( None => { let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version); let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) - .single_region() - .await? .resolve_lock(OPTIMISTIC_BACKOFF) - .retry_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .merge(CollectSingle) .post_process_default() .plan(); let commit_version = plan.execute().await?; @@ -102,23 +101,30 @@ async fn resolve_lock_with_retry( for i in 0..RESOLVE_LOCK_RETRY_LIMIT { debug!("resolving locks: attempt {}", (i + 1)); let store = pd_client.clone().store_for_key(key.into()).await?; - let ver_id = store.region.ver_id(); + let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version); + // The only place where single-region is used let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) .single_region_with_store(store) .await? .resolve_lock(Backoff::no_backoff()) - .retry_region(Backoff::no_backoff()) .extract_error() .plan(); match plan.execute().await { Ok(_) => { return Ok(ver_id); } - Err(e @ Error::RegionError(_)) => { - // Retry on region error - error = Some(e); - continue; + // Retry on region error + Err(Error::ExtractedErrors(mut errors)) => { + // ResolveLockResponse can have at most 1 error + match errors.pop() { + e @ Some(Error::RegionError(_)) => { + error = e; + continue; + } + Some(e) => return Err(e), + None => unreachable!(), + } } Err(e) => return Err(e), } @@ -136,20 +142,21 @@ pub trait HasLocks { mod tests { use super::*; use crate::mock::{MockKvClient, MockPdClient}; - use futures::executor; use std::any::Any; use tikv_client_proto::errorpb; - #[test] - fn test_resolve_lock_with_retry() { + #[tokio::test] + async fn test_resolve_lock_with_retry() { // Test resolve lock within retry limit fail::cfg("region-error", "9*return").unwrap(); let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |_: &dyn Any| { fail::fail_point!("region-error", |_| { - let mut resp = kvrpcpb::ResolveLockResponse::default(); - resp.region_error = Some(errorpb::Error::default()); + let resp = kvrpcpb::ResolveLockResponse { + region_error: Some(errorpb::Error::default()), + ..Default::default() + }; Ok(Box::new(resp) as Box) }); Ok(Box::new(kvrpcpb::ResolveLockResponse::default()) as Box) @@ -158,14 +165,16 @@ mod tests { let key = vec![1]; let region1 = MockPdClient::region1(); - let resolved_region = - executor::block_on(resolve_lock_with_retry(&key, 1, 2, client.clone())).unwrap(); + let resolved_region = resolve_lock_with_retry(&key, 1, 2, client.clone()) + .await + .unwrap(); assert_eq!(region1.ver_id(), resolved_region); // Test resolve lock over retry limit fail::cfg("region-error", "10*return").unwrap(); let key = vec![100]; - executor::block_on(resolve_lock_with_retry(&key, 3, 4, client)) + resolve_lock_with_retry(&key, 3, 4, client) + .await .expect_err("should return error"); } } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 1de00ec1..072c9e39 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -1,11 +1,13 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ + collect_first, pd::PdClient, request::{ - Collect, DefaultProcessor, HasKeys, KvRequest, Merge, Process, Shardable, SingleKey, + Collect, CollectSingle, DefaultProcessor, HasKeys, KvRequest, Merge, Process, Shardable, + SingleKey, }, - store::{store_stream_for_keys, store_stream_for_range_by_start_key, Store}, + store::{store_stream_for_keys, store_stream_for_range_by_start_key, RegionStore}, timestamp::TimestampExt, transaction::HasLocks, util::iter::FlatMapOkIterExt, @@ -13,7 +15,10 @@ use crate::{ }; use futures::stream::BoxStream; use std::{collections::HashMap, iter, sync::Arc}; -use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; +use tikv_client_proto::{ + kvrpcpb::{self, TxnHeartBeatResponse}, + pdpb::Timestamp, +}; // implement HasLocks for a response type that has a `pairs` field, // where locks can be extracted from both the `pairs` and `error` fields @@ -67,6 +72,8 @@ impl KvRequest for kvrpcpb::GetRequest { type Response = kvrpcpb::GetResponse; } +shardable_key!(kvrpcpb::GetRequest); +collect_first!(kvrpcpb::GetResponse); impl SingleKey for kvrpcpb::GetRequest { fn key(&self) -> &Vec { &self.key @@ -154,6 +161,10 @@ pub fn new_resolve_lock_request( req } +// Note: ResolveLockRequest is a special one: it can be sent to a specified +// region without keys. So it's not Shardable. And we don't automatically retry +// on its region errors (in the Plan level). The region error must be manually +// handled (in the upper level). impl KvRequest for kvrpcpb::ResolveLockRequest { type Response = kvrpcpb::ResolveLockResponse; } @@ -170,6 +181,8 @@ impl KvRequest for kvrpcpb::CleanupRequest { type Response = kvrpcpb::CleanupResponse; } +shardable_key!(kvrpcpb::CleanupRequest); +collect_first!(kvrpcpb::CleanupResponse); impl SingleKey for kvrpcpb::CleanupRequest { fn key(&self) -> &Vec { &self.key @@ -225,14 +238,14 @@ impl Shardable for kvrpcpb::PrewriteRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, Store)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { let mut mutations = self.mutations.clone(); mutations.sort_by(|a, b| a.key.cmp(&b.key)); store_stream_for_keys(mutations.into_iter(), pd_client.clone()) } - fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { - self.set_context(store.region.context()?); + fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + self.set_context(store.region_with_leader.context()?); // Only need to set secondary keys if we're sending the primary key. if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) { @@ -348,14 +361,14 @@ impl Shardable for kvrpcpb::PessimisticLockRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, Store)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { let mut mutations = self.mutations.clone(); mutations.sort_by(|a, b| a.key.cmp(&b.key)); store_stream_for_keys(mutations.into_iter(), pd_client.clone()) } - fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { - self.set_context(store.region.context()?); + fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + self.set_context(store.region_with_leader.context()?); self.set_mutations(shard); Ok(()) } @@ -428,12 +441,12 @@ impl Shardable for kvrpcpb::ScanLockRequest { fn shards( &self, pd_client: &Arc, - ) -> BoxStream<'static, Result<(Self::Shard, Store)>> { + ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { store_stream_for_range_by_start_key(self.start_key.clone().into(), pd_client.clone()) } - fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> { - self.set_context(store.region.context()?); + fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { + self.set_context(store.region_with_leader.context()?); self.set_start_key(shard); Ok(()) } @@ -466,6 +479,26 @@ impl KvRequest for kvrpcpb::TxnHeartBeatRequest { type Response = kvrpcpb::TxnHeartBeatResponse; } +impl Shardable for kvrpcpb::TxnHeartBeatRequest { + type Shard = Vec>; + + fn shards( + &self, + pd_client: &Arc, + ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone()) + } + + fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { + self.set_context(store.region_with_leader.context()?); + assert!(shard.len() == 1); + self.primary_lock = shard.pop().unwrap(); + Ok(()) + } +} + +collect_first!(TxnHeartBeatResponse); + impl SingleKey for kvrpcpb::TxnHeartBeatRequest { fn key(&self) -> &Vec { &self.primary_lock @@ -484,6 +517,24 @@ impl KvRequest for kvrpcpb::CheckTxnStatusRequest { type Response = kvrpcpb::CheckTxnStatusResponse; } +impl Shardable for kvrpcpb::CheckTxnStatusRequest { + type Shard = Vec>; + + fn shards( + &self, + pd_client: &Arc, + ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { + crate::store::store_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone()) + } + + fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { + self.set_context(store.region_with_leader.context()?); + assert!(shard.len() == 1); + self.set_primary_key(shard.pop().unwrap()); + Ok(()) + } +} + impl SingleKey for kvrpcpb::CheckTxnStatusRequest { fn key(&self) -> &Vec { &self.primary_key diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 075eb9d9..3610fa3b 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1,9 +1,11 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ - backoff::Backoff, + backoff::{Backoff, DEFAULT_REGION_BACKOFF}, pd::{PdClient, PdRpcClient}, - request::{Collect, CollectAndMatchKey, CollectError, Plan, PlanBuilder, RetryOptions}, + request::{ + Collect, CollectAndMatchKey, CollectError, CollectSingle, Plan, PlanBuilder, RetryOptions, + }, timestamp::TimestampExt, transaction::{buffer::Buffer, lowering::*}, BoundRange, Error, Key, KvPair, Result, Value, @@ -119,10 +121,9 @@ impl Transaction { .get_or_else(key, |key| async move { let request = new_get_request(key, timestamp); let plan = PlanBuilder::new(rpc, request) - .single_region() - .await? .resolve_lock(retry_options.lock_backoff) - .retry_region(retry_options.region_backoff) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .merge(CollectSingle) .post_process_default() .plan(); plan.execute().await @@ -252,8 +253,7 @@ impl Transaction { let request = new_batch_get_request(keys, timestamp); let plan = PlanBuilder::new(rpc, request) .resolve_lock(retry_options.lock_backoff) - .multi_region() - .retry_region(retry_options.region_backoff) + .retry_multi_region(retry_options.region_backoff) .merge(Collect) .plan(); plan.execute() @@ -522,8 +522,8 @@ impl Transaction { } } TransactionKind::Pessimistic(_) => { - let keys: Vec = keys.into_iter().map(|k| k.into()).collect(); - self.pessimistic_lock(keys.into_iter(), false).await?; + self.pessimistic_lock(keys.into_iter().map(|k| k.into()), false) + .await?; } } Ok(()) @@ -665,10 +665,9 @@ impl Transaction { self.start_instant.elapsed().as_millis() as u64 + DEFAULT_LOCK_TTL, ); let plan = PlanBuilder::new(self.rpc.clone(), request) - .single_region() - .await? .resolve_lock(self.options.retry_options.lock_backoff.clone()) - .retry_region(self.options.retry_options.region_backoff.clone()) + .retry_multi_region(self.options.retry_options.region_backoff.clone()) + .merge(CollectSingle) .post_process_default() .plan(); plan.execute().await @@ -693,8 +692,7 @@ impl Transaction { let request = new_scan_request(new_range, timestamp, new_limit, key_only); let plan = PlanBuilder::new(rpc, request) .resolve_lock(retry_options.lock_backoff) - .multi_region() - .retry_region(retry_options.region_backoff) + .retry_multi_region(retry_options.region_backoff) .merge(Collect) .plan(); plan.execute() @@ -750,8 +748,7 @@ impl Transaction { let plan = PlanBuilder::new(self.rpc.clone(), request) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .preserve_keys() - .multi_region() - .retry_region(self.options.retry_options.region_backoff.clone()) + .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectAndMatchKey) .plan(); let pairs = plan.execute().await; @@ -826,9 +823,8 @@ impl Transaction { start_instant.elapsed().as_millis() as u64 + DEFAULT_LOCK_TTL, ); let plan = PlanBuilder::new(rpc.clone(), request) - .single_region() - .await? - .retry_region(region_backoff.clone()) + .retry_multi_region(region_backoff.clone()) + .merge(CollectSingle) .plan(); plan.execute().await?; } @@ -1045,6 +1041,7 @@ impl HeartbeatOption { /// The `commit` phase is to mark all written data as successfully committed. /// /// The committer implements `prewrite`, `commit` and `rollback` functions. +#[allow(clippy::too_many_arguments)] #[derive(new)] struct Committer { primary_key: Option, @@ -1128,8 +1125,7 @@ impl Committer { let plan = PlanBuilder::new(self.rpc.clone(), request) .resolve_lock(self.options.retry_options.lock_backoff.clone()) - .multi_region() - .retry_region(self.options.retry_options.region_backoff.clone()) + .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectError) .extract_error() .plan(); @@ -1169,8 +1165,7 @@ impl Committer { ); let plan = PlanBuilder::new(self.rpc.clone(), req) .resolve_lock(self.options.retry_options.lock_backoff.clone()) - .multi_region() - .retry_region(self.options.retry_options.region_backoff.clone()) + .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() .plan(); plan.execute() @@ -1207,8 +1202,7 @@ impl Committer { }; let plan = PlanBuilder::new(self.rpc, req) .resolve_lock(self.options.retry_options.lock_backoff) - .multi_region() - .retry_region(self.options.retry_options.region_backoff) + .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); plan.execute().await?; @@ -1229,8 +1223,7 @@ impl Committer { let req = new_batch_rollback_request(keys, self.start_version); let plan = PlanBuilder::new(self.rpc, req) .resolve_lock(self.options.retry_options.lock_backoff) - .multi_region() - .retry_region(self.options.retry_options.region_backoff) + .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); plan.execute().await?; @@ -1239,8 +1232,7 @@ impl Committer { let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts); let plan = PlanBuilder::new(self.rpc, req) .resolve_lock(self.options.retry_options.lock_backoff) - .multi_region() - .retry_region(self.options.retry_options.region_backoff) + .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); plan.execute().await?; @@ -1301,18 +1293,23 @@ mod tests { #[tokio::test] async fn test_optimistic_heartbeat() -> Result<(), io::Error> { let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); - let logger = Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!()); - info!(logger, "Testing: test_optimistic_heartbeat"); + let logger = Logger::root( + slog_term::FullFormat::new(plain) + .build() + .filter_level(slog::Level::Info) + .fuse(), + o!(), + ); let scenario = FailScenario::setup(); fail::cfg("after-prewrite", "sleep(1500)").unwrap(); let heartbeats = Arc::new(AtomicUsize::new(0)); let heartbeats_cloned = heartbeats.clone(); let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( move |req: &dyn Any| { - if let Some(_) = req.downcast_ref::() { + if req.downcast_ref::().is_some() { heartbeats_cloned.fetch_add(1, Ordering::SeqCst); Ok(Box::new(kvrpcpb::TxnHeartBeatResponse::default()) as Box) - } else if let Some(_) = req.downcast_ref::() { + } else if req.downcast_ref::().is_some() { Ok(Box::new(kvrpcpb::PrewriteResponse::default()) as Box) } else { Ok(Box::new(kvrpcpb::CommitResponse::default()) as Box) @@ -1341,18 +1338,27 @@ mod tests { #[tokio::test] async fn test_pessimistic_heartbeat() -> Result<(), io::Error> { let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); - let logger = Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!()); - info!(logger, "Testing: test_pessimistic_heartbeat"); + + let logger = Logger::root( + slog_term::FullFormat::new(plain) + .build() + .filter_level(slog::Level::Info) + .fuse(), + o!(), + ); let heartbeats = Arc::new(AtomicUsize::new(0)); let heartbeats_cloned = heartbeats.clone(); let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( move |req: &dyn Any| { - if let Some(_) = req.downcast_ref::() { + if req.downcast_ref::().is_some() { heartbeats_cloned.fetch_add(1, Ordering::SeqCst); Ok(Box::new(kvrpcpb::TxnHeartBeatResponse::default()) as Box) - } else if let Some(_) = req.downcast_ref::() { + } else if req.downcast_ref::().is_some() { Ok(Box::new(kvrpcpb::PrewriteResponse::default()) as Box) - } else if let Some(_) = req.downcast_ref::() { + } else if req + .downcast_ref::() + .is_some() + { Ok(Box::new(kvrpcpb::PessimisticLockResponse::default()) as Box) } else { Ok(Box::new(kvrpcpb::CommitResponse::default()) as Box) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index ed41b53b..32c77c92 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -25,11 +25,6 @@ pub async fn clear_tikv() { // To test with multiple regions, prewrite some data. Tests that hope to test // with multiple regions should use keys in the corresponding ranges. pub async fn init() -> Result<()> { - // ignore SetLoggerError - let _ = simple_logger::SimpleLogger::new() - .with_level(log::LevelFilter::Warn) - .init(); - if env::var(ENV_ENABLE_MULIT_REGION).is_ok() { // 1000 keys: 0..1000 let keys_1 = std::iter::successors(Some(0u32), |x| Some(x + 1)) @@ -88,8 +83,14 @@ async fn ensure_region_split( pub fn pd_addrs() -> Vec { env::var(ENV_PD_ADDRS) - .expect(&format!("Expected {}:", ENV_PD_ADDRS)) - .split(",") + .unwrap_or_else(|_| { + info!( + "Environment variable {} is not found. Using {:?} as default.", + ENV_PD_ADDRS, "127.0.0.1:2379" + ); + "127.0.0.1:2379".to_owned() + }) + .split(',') .map(From::from) .collect() } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index c3f14862..a0bc6807 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -309,8 +309,8 @@ async fn txn_read() -> Result<()> { .collect::>(); let mut txn = client.begin_pessimistic().await?; - let res = txn.batch_get(keys.clone()).await?.collect::>(); - assert_eq!(res.len(), keys.len()); + let res = txn.batch_get(keys.clone()).await?; + assert_eq!(res.count(), keys.len()); let res = txn.batch_get_for_update(keys.clone()).await?; assert_eq!(res.len(), keys.len()); @@ -322,7 +322,6 @@ async fn txn_read() -> Result<()> { // FIXME: the test is temporarily ingnored since it's easy to fail when scheduling is frequent. #[tokio::test] #[serial] -#[ignore] async fn txn_bank_transfer() -> Result<()> { init().await?; let client = TransactionClient::new(pd_addrs(), None).await?; @@ -699,7 +698,7 @@ async fn txn_get_for_update() -> Result<()> { assert!(t1.get_for_update(key1.clone()).await?.unwrap() == value1); t1.commit().await?; - assert!(t2.batch_get(keys.clone()).await?.collect::>().len() == 0); + assert!(t2.batch_get(keys.clone()).await?.count() == 0); let res: HashMap<_, _> = t2 .batch_get_for_update(keys.clone()) .await? @@ -713,7 +712,7 @@ async fn txn_get_for_update() -> Result<()> { assert!(t3.get_for_update(key1).await?.is_none()); assert!(t3.commit().await.is_err()); - assert!(t4.batch_get_for_update(keys).await?.len() == 0); + assert!(t4.batch_get_for_update(keys).await?.is_empty()); assert!(t4.commit().await.is_err()); Ok(()) diff --git a/tikv-client-common/Cargo.toml b/tikv-client-common/Cargo.toml index 61f3244c..8abd30f2 100644 --- a/tikv-client-common/Cargo.toml +++ b/tikv-client-common/Cargo.toml @@ -15,6 +15,7 @@ lazy_static = "1" log = "0.4" regex = "1" tikv-client-proto = { version = "0.1.0", path = "../tikv-client-proto" } +tokio = "1" [dev-dependencies] clap = "2" @@ -22,4 +23,4 @@ fail = { version = "0.4", features = [ "failpoints" ] } proptest = "1" proptest-derive = "0.3" tempfile = "3" -tokio = "1.0" +tokio = "1" diff --git a/tikv-client-common/src/errors.rs b/tikv-client-common/src/errors.rs index 5a740aef..3e82a2e3 100644 --- a/tikv-client-common/src/errors.rs +++ b/tikv-client-common/src/errors.rs @@ -33,6 +33,10 @@ pub enum Error { "The operation is not supported in current mode, please consider using RawClient with or without atomic mode" )] UnsupportedMode, + #[error("There is no current_regions in the EpochNotMatch error")] + NoCurrentRegions, + #[error("The specified entry is not found in the region cache")] + EntryNotFoundInRegionCache, /// Wraps a `std::io::Error`. #[error("IO error: {0}")] Io(#[from] std::io::Error), @@ -51,18 +55,24 @@ pub enum Error { /// Wraps `tikv_client_proto::kvrpcpb::KeyError` #[error("{0:?}")] KeyError(tikv_client_proto::kvrpcpb::KeyError), - /// Multiple errors + /// Multiple errors generated from the ExtractError plan. #[error("Multiple errors: {0:?}")] - MultipleErrors(Vec), + ExtractedErrors(Vec), + /// Multiple key errors + #[error("Multiple key errors: {0:?}")] + MultipleKeyErrors(Vec), /// Invalid ColumnFamily #[error("Unsupported column family {}", _0)] ColumnFamilyError(String), + /// Can't join tokio tasks + #[error("Failed to join tokio tasks")] + JoinError(#[from] tokio::task::JoinError), /// No region is found for the given key. #[error("Region is not found for key: {:?}", key)] RegionForKeyNotFound { key: Vec }, - /// No region is found for the given id. - #[error("Region {} is not found", region_id)] - RegionNotFound { region_id: u64 }, + /// No region is found for the given id. note: distinguish it with the RegionNotFound error in errorpb. + #[error("Region {} is not found in the response", region_id)] + RegionNotFoundInResponse { region_id: u64 }, /// No leader is found for the given id. #[error("Leader of region {} is not found", region_id)] LeaderNotFound { region_id: u64 }, diff --git a/tikv-client-store/src/errors.rs b/tikv-client-store/src/errors.rs index 08f0c1d5..7337fa0e 100644 --- a/tikv-client-store/src/errors.rs +++ b/tikv-client-store/src/errors.rs @@ -4,18 +4,30 @@ use crate::Error; use std::fmt::Display; use tikv_client_proto::kvrpcpb; +// Those that can have a single region error pub trait HasRegionError { - fn region_error(&mut self) -> Option; + fn region_error(&mut self) -> Option; } -pub trait HasError: HasRegionError { - fn error(&mut self) -> Option; +// Those that can have multiple region errors +pub trait HasRegionErrors { + fn region_errors(&mut self) -> Option>; +} + +pub trait HasKeyErrors { + fn key_errors(&mut self) -> Option>; +} + +impl HasRegionErrors for T { + fn region_errors(&mut self) -> Option> { + self.region_error().map(|e| vec![e]) + } } macro_rules! has_region_error { ($type:ty) => { impl HasRegionError for $type { - fn region_error(&mut self) -> Option { + fn region_error(&mut self) -> Option { if self.has_region_error() { Some(self.take_region_error().into()) } else { @@ -56,10 +68,10 @@ has_region_error!(kvrpcpb::RawCasResponse); macro_rules! has_key_error { ($type:ty) => { - impl HasError for $type { - fn error(&mut self) -> Option { + impl HasKeyErrors for $type { + fn key_errors(&mut self) -> Option> { if self.has_error() { - Some(self.take_error().into()) + Some(vec![self.take_error().into()]) } else { None } @@ -81,14 +93,14 @@ has_key_error!(kvrpcpb::CheckSecondaryLocksResponse); macro_rules! has_str_error { ($type:ty) => { - impl HasError for $type { - fn error(&mut self) -> Option { + impl HasKeyErrors for $type { + fn key_errors(&mut self) -> Option> { if self.get_error().is_empty() { None } else { - Some(Error::KvError { + Some(vec![Error::KvError { message: self.take_error(), - }) + }]) } } } @@ -105,67 +117,67 @@ has_str_error!(kvrpcpb::RawCasResponse); has_str_error!(kvrpcpb::ImportResponse); has_str_error!(kvrpcpb::DeleteRangeResponse); -impl HasError for kvrpcpb::ScanResponse { - fn error(&mut self) -> Option { +impl HasKeyErrors for kvrpcpb::ScanResponse { + fn key_errors(&mut self) -> Option> { extract_errors(self.pairs.iter_mut().map(|pair| pair.error.take())) } } -impl HasError for kvrpcpb::BatchGetResponse { - fn error(&mut self) -> Option { +impl HasKeyErrors for kvrpcpb::BatchGetResponse { + fn key_errors(&mut self) -> Option> { extract_errors(self.pairs.iter_mut().map(|pair| pair.error.take())) } } -impl HasError for kvrpcpb::RawBatchGetResponse { - fn error(&mut self) -> Option { +impl HasKeyErrors for kvrpcpb::RawBatchGetResponse { + fn key_errors(&mut self) -> Option> { extract_errors(self.pairs.iter_mut().map(|pair| pair.error.take())) } } -impl HasError for kvrpcpb::RawScanResponse { - fn error(&mut self) -> Option { +impl HasKeyErrors for kvrpcpb::RawScanResponse { + fn key_errors(&mut self) -> Option> { extract_errors(self.kvs.iter_mut().map(|pair| pair.error.take())) } } -impl HasError for kvrpcpb::RawBatchScanResponse { - fn error(&mut self) -> Option { +impl HasKeyErrors for kvrpcpb::RawBatchScanResponse { + fn key_errors(&mut self) -> Option> { extract_errors(self.kvs.iter_mut().map(|pair| pair.error.take())) } } -impl HasError for kvrpcpb::PrewriteResponse { - fn error(&mut self) -> Option { +impl HasKeyErrors for kvrpcpb::PrewriteResponse { + fn key_errors(&mut self) -> Option> { extract_errors(self.take_errors().into_iter().map(Some)) } } -impl HasError for kvrpcpb::PessimisticLockResponse { - fn error(&mut self) -> Option { +impl HasKeyErrors for kvrpcpb::PessimisticLockResponse { + fn key_errors(&mut self) -> Option> { extract_errors(self.take_errors().into_iter().map(Some)) } } -impl HasError for kvrpcpb::PessimisticRollbackResponse { - fn error(&mut self) -> Option { +impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse { + fn key_errors(&mut self) -> Option> { extract_errors(self.take_errors().into_iter().map(Some)) } } -impl HasError for Result { - fn error(&mut self) -> Option { +impl HasKeyErrors for Result { + fn key_errors(&mut self) -> Option> { match self { - Ok(x) => x.error(), - Err(e) => Some(Error::StringError(e.to_string())), + Ok(x) => x.key_errors(), + Err(e) => Some(vec![Error::StringError(e.to_string())]), } } } -impl HasError for Vec { - fn error(&mut self) -> Option { +impl HasKeyErrors for Vec { + fn key_errors(&mut self) -> Option> { for t in self { - if let Some(e) = t.error() { + if let Some(e) = t.key_errors() { return Some(e); } } @@ -175,37 +187,36 @@ impl HasError for Vec { } impl HasRegionError for Result { - fn region_error(&mut self) -> Option { + fn region_error(&mut self) -> Option { self.as_mut().ok().and_then(|t| t.region_error()) } } -impl HasRegionError for Vec { - fn region_error(&mut self) -> Option { - for t in self { - if let Some(e) = t.region_error() { - return Some(e); - } +impl HasRegionErrors for Vec { + fn region_errors(&mut self) -> Option> { + let errors: Vec<_> = self.iter_mut().filter_map(|x| x.region_error()).collect(); + if errors.is_empty() { + None + } else { + Some(errors) } - - None } } -fn extract_errors(error_iter: impl Iterator>) -> Option { +fn extract_errors( + error_iter: impl Iterator>, +) -> Option> { let errors: Vec = error_iter.flatten().map(Into::into).collect(); if errors.is_empty() { None - } else if errors.len() == 1 { - Some(errors.into_iter().next().unwrap()) } else { - Some(Error::MultipleErrors(errors)) + Some(errors) } } #[cfg(test)] mod test { - use super::HasError; + use super::HasKeyErrors; use tikv_client_common::{internal_err, Error}; use tikv_client_proto::kvrpcpb; #[test] @@ -215,7 +226,7 @@ mod test { error: None, commit_version: 0, }); - assert!(resp.error().is_none()); + assert!(resp.key_errors().is_none()); let mut resp: Result<_, Error> = Ok(kvrpcpb::CommitResponse { region_error: None, @@ -232,9 +243,9 @@ mod test { }), commit_version: 0, }); - assert!(resp.error().is_some()); + assert!(resp.key_errors().is_some()); let mut resp: Result = Err(internal_err!("some error")); - assert!(resp.error().is_some()); + assert!(resp.key_errors().is_some()); } } diff --git a/tikv-client-store/src/lib.rs b/tikv-client-store/src/lib.rs index 714ea9b2..5df938ff 100644 --- a/tikv-client-store/src/lib.rs +++ b/tikv-client-store/src/lib.rs @@ -7,7 +7,7 @@ mod request; #[doc(inline)] pub use crate::{ client::{KvClient, KvConnect, TikvConnect}, - errors::{HasError, HasRegionError}, + errors::{HasKeyErrors, HasRegionError, HasRegionErrors}, request::Request, }; pub use tikv_client_common::{security::SecurityManager, Error, Result};