Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement the complete keyspace feature #439

Merged
merged 1 commit into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ prometheus = { version = "0.13", default-features = false }
prost = "0.12"
rand = "0.8"
regex = "1"
reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] }
semver = "1.0"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1"
take_mut = "0.2.2"
thiserror = "1"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
tonic = { version = "0.10", features = ["tls"] }
Expand All @@ -51,9 +54,7 @@ env_logger = "0.10"
fail = { version = "0.4", features = ["failpoints"] }
proptest = "1"
proptest-derive = "0.3"
reqwest = { version = "0.11", default-features = false, features = [
"native-tls-vendored",
] }
rstest = "0.18.2"
serde_json = "1"
serial_test = "0.5.0"
simple_logger = "1"
Expand Down
4 changes: 4 additions & 0 deletions config/tikv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ max-open-files = 10000

[raftdb]
max-open-files = 10000

[storage]
andylokandy marked this conversation as resolved.
Show resolved Hide resolved
api-version = 2
enable-ttl = true
4 changes: 3 additions & 1 deletion examples/pessimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ async fn main() {
Config::default().with_security(ca, cert, key)
} else {
Config::default()
};
}
// This example uses the default keyspace, so api-v2 must be enabled on the server.
.with_default_keyspace();
andylokandy marked this conversation as resolved.
Show resolved Hide resolved

// init
let client = Client::new_with_config(args.pd, config)
Expand Down
8 changes: 6 additions & 2 deletions examples/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ async fn main() -> Result<()> {
Config::default().with_security(ca, cert, key)
} else {
Config::default()
};
}
// This example uses the default keyspace, so api-v2 must be enabled on the server.
.with_default_keyspace();

// When we first create a client we receive a `Connect` structure which must be resolved before
// the client is actually connected and usable.
Expand Down Expand Up @@ -136,6 +138,8 @@ async fn main() -> Result<()> {
);
println!("Scanning batch scan from {batch_scan_keys:?} gives: {vals:?}");

// Cleanly exit.
// Delete all keys in the whole range.
client.delete_range("".to_owned().."".to_owned()).await?;
andylokandy marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}
4 changes: 3 additions & 1 deletion examples/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ async fn main() {
Config::default().with_security(ca, cert, key)
} else {
Config::default()
};
}
// This example uses the default keyspace, so api-v2 must be enabled on the server.
.with_default_keyspace();

let txn = Client::new_with_config(args.pd, config)
.await
Expand Down
5 changes: 5 additions & 0 deletions src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ pub enum Error {
/// Wraps a `grpcio::Error`.
#[error("gRPC error: {0}")]
Grpc(#[from] tonic::transport::Error),
/// Wraps a `reqwest::Error`.
#[error("http error: {0}")]
Http(#[from] reqwest::Error),
/// Wraps a `grpcio::Error`.
#[error("gRPC api error: {0}")]
GrpcAPI(#[from] tonic::Status),
#[error("Http request failed: unknown respond {0}")]
UnknownHttpRespond(String),
/// Wraps a `grpcio::Error`.
#[error("url error: {0}")]
Url(#[from] tonic::codegen::http::uri::InvalidUri),
Expand Down
19 changes: 19 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Config {
pub cert_path: Option<PathBuf>,
pub key_path: Option<PathBuf>,
pub timeout: Duration,
pub keyspace: Option<String>,
}

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
Expand All @@ -30,6 +31,7 @@ impl Default for Config {
cert_path: None,
key_path: None,
timeout: DEFAULT_REQUEST_TIMEOUT,
keyspace: None,
}
}
}
Expand Down Expand Up @@ -83,4 +85,21 @@ impl Config {
self.timeout = timeout;
self
}

/// Set to use default keyspace.
///
/// Server should enable `storage.api-version = 2` to use this feature.
#[must_use]
pub fn with_default_keyspace(self) -> Self {
self.with_keyspace("DEFAULT")
}

/// Set the use keyspace for the client.
///
/// Server should enable `storage.api-version = 2` to use this feature.
#[must_use]
pub fn with_keyspace(mut self, keyspace: &str) -> Self {
self.keyspace = Some(keyspace.to_owned());
self
}
}
10 changes: 2 additions & 8 deletions src/kv/bound_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,11 @@ impl BoundRange {
pub fn into_keys(self) -> (Key, Option<Key>) {
let start = match self.from {
Bound::Included(v) => v,
Bound::Excluded(mut v) => {
v.push_zero();
v
}
Bound::Excluded(v) => v.next_key(),
Bound::Unbounded => Key::EMPTY,
};
let end = match self.to {
Bound::Included(mut v) => {
v.push_zero();
Some(v)
}
Bound::Included(v) => Some(v.next_key()),
Bound::Excluded(v) => Some(v),
Bound::Unbounded => None,
};
Expand Down
9 changes: 5 additions & 4 deletions src/kv/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct Key(
test,
proptest(strategy = "any_with::<Vec<u8>>((size_range(_PROPTEST_KEY_MAX), ()))")
)]
pub(super) Vec<u8>,
pub(crate) Vec<u8>,
);

impl AsRef<Key> for kvrpcpb::Mutation {
Expand All @@ -98,10 +98,11 @@ impl Key {

/// Push a zero to the end of the key.
///
/// Extending a zero makes the new key the smallest key that is greater than than the original one, i.e. the succeeder.
/// Extending a zero makes the new key the smallest key that is greater than than the original one.
#[inline]
pub(super) fn push_zero(&mut self) {
self.0.push(0)
pub(crate) fn next_key(mut self) -> Self {
self.0.push(0);
self
}

/// Convert the key to a lower bound. The key is treated as inclusive.
Expand Down
5 changes: 1 addition & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@

pub mod backoff;
#[doc(hidden)]
pub mod proto; // export `proto` to enable user customized codec
#[doc(hidden)]
pub mod raw;
pub mod request;
#[doc(hidden)]
Expand All @@ -106,6 +104,7 @@ mod compat;
mod config;
mod kv;
mod pd;
mod proto;
mod region;
mod region_cache;
mod stats;
Expand Down Expand Up @@ -146,8 +145,6 @@ pub use crate::raw::Client as RawClient;
#[doc(inline)]
pub use crate::raw::ColumnFamily;
#[doc(inline)]
pub use crate::request::codec;
#[doc(inline)]
pub use crate::request::RetryOptions;
#[doc(inline)]
pub use crate::timestamp::Timestamp;
Expand Down
21 changes: 4 additions & 17 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::proto::metapb::RegionEpoch;
use crate::proto::metapb::{self};
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::request::codec::ApiV1TxnCodec;
use crate::store::KvConnect;
use crate::store::RegionStore;
use crate::store::Request;
Expand All @@ -31,7 +30,7 @@ use crate::Timestamp;

/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
/// client can be tested without doing any RPC calls.
pub async fn pd_rpc_client() -> PdRpcClient<ApiV1TxnCodec, MockKvConnect, MockCluster> {
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
let config = Config::default();
PdRpcClient::new(
config.clone(),
Expand All @@ -44,7 +43,6 @@ pub async fn pd_rpc_client() -> PdRpcClient<ApiV1TxnCodec, MockKvConnect, MockCl
))
},
false,
Some(ApiV1TxnCodec::default()),
)
.await
.unwrap()
Expand Down Expand Up @@ -73,18 +71,9 @@ pub struct MockKvConnect;

pub struct MockCluster;

#[derive(new)]
pub struct MockPdClient {
client: MockKvClient,
codec: ApiV1TxnCodec,
}

impl MockPdClient {
pub fn new(client: MockKvClient) -> MockPdClient {
MockPdClient {
client,
codec: ApiV1TxnCodec::default(),
}
}
}

#[async_trait]
Expand Down Expand Up @@ -113,7 +102,6 @@ impl MockPdClient {
pub fn default() -> MockPdClient {
MockPdClient {
client: MockKvClient::default(),
codec: ApiV1TxnCodec::default(),
}
}

Expand Down Expand Up @@ -177,7 +165,6 @@ impl MockPdClient {

#[async_trait]
impl PdClient for MockPdClient {
type Codec = ApiV1TxnCodec;
type KvClient = MockKvClient;

async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
Expand Down Expand Up @@ -228,7 +215,7 @@ impl PdClient for MockPdClient {

async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}

fn get_codec(&self) -> &Self::Codec {
&self.codec
async fn get_keyspace_id(&self, _keyspace: &str) -> Result<u32> {
unimplemented!()
}
}
Loading