Skip to content

Commit

Permalink
--wip-- [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
andylokandy committed Dec 16, 2023
1 parent 4db9895 commit 99e366e
Show file tree
Hide file tree
Showing 33 changed files with 918 additions and 305 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ prometheus = { version = "0.13", default-features = false }
prost = "0.12"
rand = "0.8"
regex = "1"
reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] }
semver = "1.0"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1"
take_mut = "0.2.2"
thiserror = "1"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
tonic = { version = "0.10", features = ["tls"] }
Expand All @@ -51,9 +54,7 @@ env_logger = "0.10"
fail = { version = "0.4", features = ["failpoints"] }
proptest = "1"
proptest-derive = "0.3"
reqwest = { version = "0.11", default-features = false, features = [
"native-tls-vendored",
] }
rstest = "0.18.2"
serde_json = "1"
serial_test = "0.5.0"
simple_logger = "1"
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ doc:
cargo doc --workspace --exclude tikv-client-proto --document-private-items --no-deps

tiup:
tiup playground nightly --mode tikv-slim --kv 3 --without-monitor --kv.config $(shell pwd)/config/tikv.toml --pd.config $(shell pwd)/config/pd.toml &
tiup playground nightly --mode tikv-slim --kv 1 --without-monitor --kv.config ./config/tikv.toml --pd.config ./config/pd.toml --kv.binpath ../tikv/target/debug/tikv-server &

all: generate check doc test

Expand Down
4 changes: 4 additions & 0 deletions config/tikv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ max-open-files = 10000

[raftdb]
max-open-files = 10000

[storage]
api-version = 2
enable-ttl = true
3 changes: 2 additions & 1 deletion examples/pessimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ async fn main() {
Config::default().with_security(ca, cert, key)
} else {
Config::default()
};
}
.with_default_keyspace();

// init
let client = Client::new_with_config(args.pd, config)
Expand Down
6 changes: 4 additions & 2 deletions examples/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ async fn main() -> Result<()> {
Config::default().with_security(ca, cert, key)
} else {
Config::default()
};
}
.with_default_keyspace();

// When we first create a client we receive a `Connect` structure which must be resolved before
// the client is actually connected and usable.
Expand Down Expand Up @@ -136,6 +137,7 @@ async fn main() -> Result<()> {
);
println!("Scanning batch scan from {batch_scan_keys:?} gives: {vals:?}");

// Cleanly exit.
client.delete_range("".to_owned().."".to_owned()).await?;

Ok(())
}
3 changes: 2 additions & 1 deletion examples/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ async fn main() {
Config::default().with_security(ca, cert, key)
} else {
Config::default()
};
}
.with_default_keyspace();

let txn = Client::new_with_config(args.pd, config)
.await
Expand Down
5 changes: 5 additions & 0 deletions src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ pub enum Error {
/// Wraps a `grpcio::Error`.
#[error("gRPC error: {0}")]
Grpc(#[from] tonic::transport::Error),
/// Wraps a `reqwest::Error`.
#[error("http error: {0}")]
Http(#[from] reqwest::Error),
/// Wraps a `grpcio::Error`.
#[error("gRPC api error: {0}")]
GrpcAPI(#[from] tonic::Status),
#[error("Http request failed: unknown respond {0}")]
UnknownHttpRespond(String),
/// Wraps a `grpcio::Error`.
#[error("url error: {0}")]
Url(#[from] tonic::codegen::http::uri::InvalidUri),
Expand Down
19 changes: 19 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Config {
pub cert_path: Option<PathBuf>,
pub key_path: Option<PathBuf>,
pub timeout: Duration,
pub keyspace: Option<String>,
}

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
Expand All @@ -30,6 +31,7 @@ impl Default for Config {
cert_path: None,
key_path: None,
timeout: DEFAULT_REQUEST_TIMEOUT,
keyspace: None,
}
}
}
Expand Down Expand Up @@ -83,4 +85,21 @@ impl Config {
self.timeout = timeout;
self
}

/// Set to use default keyspace.
///
/// Server should enable `api-version = 2` to use this feature.
#[must_use]
pub fn with_default_keyspace(self) -> Self {
self.with_keyspace("DEFAULT")
}

/// Set the use keyspace for the client.
///
/// Server should enable `api-version = 2` to use this feature.
#[must_use]
pub fn with_keyspace(mut self, keyspace: &str) -> Self {
self.keyspace = Some(keyspace.to_owned());
self
}
}
10 changes: 2 additions & 8 deletions src/kv/bound_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,11 @@ impl BoundRange {
pub fn into_keys(self) -> (Key, Option<Key>) {
let start = match self.from {
Bound::Included(v) => v,
Bound::Excluded(mut v) => {
v.push_zero();
v
}
Bound::Excluded(v) => v.next_key(),
Bound::Unbounded => Key::EMPTY,
};
let end = match self.to {
Bound::Included(mut v) => {
v.push_zero();
Some(v)
}
Bound::Included(v) => Some(v.next_key()),
Bound::Excluded(v) => Some(v),
Bound::Unbounded => None,
};
Expand Down
9 changes: 5 additions & 4 deletions src/kv/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct Key(
test,
proptest(strategy = "any_with::<Vec<u8>>((size_range(_PROPTEST_KEY_MAX), ()))")
)]
pub(super) Vec<u8>,
pub(crate) Vec<u8>,
);

impl AsRef<Key> for kvrpcpb::Mutation {
Expand All @@ -98,10 +98,11 @@ impl Key {

/// Push a zero to the end of the key.
///
/// Extending a zero makes the new key the smallest key that is greater than than the original one, i.e. the succeeder.
/// Extending a zero makes the new key the smallest key that is greater than than the original one.
#[inline]
pub(super) fn push_zero(&mut self) {
self.0.push(0)
pub(crate) fn next_key(mut self) -> Self {
self.0.push(0);
self
}

/// Convert the key to a lower bound. The key is treated as inclusive.
Expand Down
4 changes: 4 additions & 0 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,8 @@ impl PdClient for MockPdClient {
}

async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}

async fn get_keyspace_id(&self, _keyspace: &str) -> Result<u32> {
unimplemented!()
}
}
6 changes: 6 additions & 0 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub trait PdClient: Send + Sync + 'static {

async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool>;

async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32>;

/// In transactional API, `key` is in raw format
async fn store_for_key(self: Arc<Self>, key: &Key) -> Result<RegionStore> {
let region = self.region_for_key(key).await?;
Expand Down Expand Up @@ -267,6 +269,10 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
self.region_cache.invalidate_region_cache(ver_id).await
}

async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32> {
self.pd.get_keyspace_id(keyspace).await
}
}

impl PdRpcClient<TikvConnect, Cluster> {
Expand Down
34 changes: 28 additions & 6 deletions src/pd/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tonic::Request;
use super::timestamp::TimestampOracle;
use crate::internal_err;
use crate::proto::pdpb;
use crate::Error;
use crate::Result;
use crate::SecurityManager;
use crate::Timestamp;
Expand All @@ -24,6 +25,7 @@ use crate::Timestamp;
pub struct Cluster {
id: u64,
client: pdpb::pd_client::PdClient<Channel>,
endpoint: String,
members: pdpb::GetMembersResponse,
tso: TimestampOracle,
}
Expand Down Expand Up @@ -91,6 +93,18 @@ impl Cluster {
req.safe_point = safepoint;
req.send(&mut self.client, timeout).await
}

pub async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32> {
let resp =
reqwest::get(format!("{}/pd/api/v2/keyspaces/{keyspace}", self.endpoint)).await?;
let body = resp.json::<serde_json::Value>().await?;
let keyspace_id = body
.get("id")
.ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))?
.as_u64()
.ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))?;
Ok(keyspace_id as u32)
}
}

/// An object for connecting and reconnecting to a PD cluster.
Expand All @@ -109,12 +123,13 @@ impl Connection {
timeout: Duration,
) -> Result<Cluster> {
let members = self.validate_endpoints(endpoints, timeout).await?;
let (client, members) = self.try_connect_leader(&members, timeout).await?;
let (client, endpoint, members) = self.try_connect_leader(&members, timeout).await?;
let id = members.header.as_ref().unwrap().cluster_id;
let tso = TimestampOracle::new(id, &client)?;
let cluster = Cluster {
id,
client,
endpoint,
members,
tso,
};
Expand All @@ -125,11 +140,13 @@ impl Connection {
pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> {
warn!("updating pd client");
let start = Instant::now();
let (client, members) = self.try_connect_leader(&cluster.members, timeout).await?;
let (client, endpoint, members) =
self.try_connect_leader(&cluster.members, timeout).await?;
let tso = TimestampOracle::new(cluster.id, &client)?;
*cluster = Cluster {
id: cluster.id,
client,
endpoint,
members,
tso,
};
Expand Down Expand Up @@ -239,7 +256,11 @@ impl Connection {
&self,
previous: &pdpb::GetMembersResponse,
timeout: Duration,
) -> Result<(pdpb::pd_client::PdClient<Channel>, pdpb::GetMembersResponse)> {
) -> Result<(
pdpb::pd_client::PdClient<Channel>,
String,
pdpb::GetMembersResponse,
)> {
let previous_leader = previous.leader.as_ref().unwrap();
let members = &previous.members;
let cluster_id = previous.header.as_ref().unwrap().cluster_id;
Expand Down Expand Up @@ -269,9 +290,10 @@ impl Connection {
if let Some(resp) = resp {
let leader = resp.leader.as_ref().unwrap();
for ep in &leader.client_urls {
let r = self.try_connect(ep.as_str(), cluster_id, timeout).await;
if r.is_ok() {
return r;
if let Ok((client, members)) =
self.try_connect(ep.as_str(), cluster_id, timeout).await
{
return Ok((client, ep.to_string(), members));
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/pd/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub trait RetryClientTrait {
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp>;

async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool>;

async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32>;
}
/// Client for communication with a PD cluster. Has the facility to reconnect to the cluster.
pub struct RetryClient<Cl = Cluster> {
Expand Down Expand Up @@ -197,6 +199,12 @@ impl RetryClientTrait for RetryClient<Cluster> {
.map(|resp| resp.new_safe_point == safepoint)
})
}

async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32> {
retry_mut!(self, "get_keyspace_id", |cluster| async {
cluster.get_keyspace_id(keyspace).await
})
}
}

impl fmt::Debug for RetryClient {
Expand Down
Loading

0 comments on commit 99e366e

Please sign in to comment.