Skip to content

Commit

Permalink
--wip-- [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
andylokandy committed Dec 14, 2023
1 parent 4db9895 commit 80bbd88
Show file tree
Hide file tree
Showing 16 changed files with 567 additions and 128 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ regex = "1"
semver = "1.0"
serde = "1.0"
serde_derive = "1.0"
take_mut = "0.2.2"
thiserror = "1"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
tonic = { version = "0.10", features = ["tls"] }
Expand All @@ -54,6 +55,7 @@ proptest-derive = "0.3"
reqwest = { version = "0.11", default-features = false, features = [
"native-tls-vendored",
] }
rstest = "0.18.2"
serde_json = "1"
serial_test = "0.5.0"
simple_logger = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/kv/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct Key(
test,
proptest(strategy = "any_with::<Vec<u8>>((size_range(_PROPTEST_KEY_MAX), ()))")
)]
pub(super) Vec<u8>,
pub(crate) Vec<u8>,
);

impl AsRef<Key> for kvrpcpb::Mutation {
Expand Down
156 changes: 108 additions & 48 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::metapb;
use crate::raw::lowering::*;
use crate::request::APIVersion;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::EncodeVersion;
use crate::request::KeyMode;
use crate::request::Plan;
use crate::request::TruncateVersion;
use crate::Backoff;
use crate::BoundRange;
use crate::ColumnFamily;
Expand All @@ -41,6 +45,7 @@ pub struct Client<PdC: PdClient = PdRpcClient> {
backoff: Backoff,
/// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
atomic: bool,
api_version: APIVersion,
}

impl Clone for Client {
Expand All @@ -50,6 +55,7 @@ impl Clone for Client {
cf: self.cf.clone(),
backoff: self.backoff.clone(),
atomic: self.atomic,
api_version: self.api_version,
}
}
}
Expand Down Expand Up @@ -100,12 +106,13 @@ impl Client<PdRpcClient> {
config: Config,
) -> Result<Self> {
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config, false).await?);
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), false).await?);
Ok(Client {
rpc,
cf: None,
backoff: DEFAULT_REGION_BACKOFF,
atomic: false,
api_version: APIVersion::V1,
})
}

Expand Down Expand Up @@ -140,6 +147,7 @@ impl Client<PdRpcClient> {
cf: Some(cf),
backoff: self.backoff.clone(),
atomic: self.atomic,
api_version: self.api_version,
}
}

Expand Down Expand Up @@ -168,6 +176,7 @@ impl Client<PdRpcClient> {
cf: self.cf.clone(),
backoff,
atomic: self.atomic,
api_version: self.api_version,
}
}

Expand All @@ -185,6 +194,20 @@ impl Client<PdRpcClient> {
cf: self.cf.clone(),
backoff: self.backoff.clone(),
atomic: true,
api_version: self.api_version,
}
}

/// Set the API version to use.
#[must_use]
pub fn with_keyspace(&self, keyspace: String) -> Self {
// FIXME
Client {
rpc: self.rpc.clone(),
cf: self.cf.clone(),
backoff: self.backoff.clone(),
atomic: self.atomic,
api_version: APIVersion::V2 { keyspace_id: 0 },
}
}
}
Expand All @@ -210,8 +233,9 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking raw get request");
let request = new_raw_get_request(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
let key = key.into().encode_version(self.api_version, KeyMode::Raw);
let request = new_raw_get_request(key, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.post_process_default()
Expand Down Expand Up @@ -242,14 +266,19 @@ impl<PdC: PdClient> Client<PdC> {
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
debug!("invoking raw batch_get request");
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
let keys = keys
.into_iter()
.map(|k| k.into().encode_version(self.api_version, KeyMode::Raw));
let request = new_raw_batch_get_request(keys, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.merge(Collect)
.plan();
plan.execute()
.await
.map(|r| r.into_iter().map(Into::into).collect())
plan.execute().await.map(|r| {
r.into_iter()
.map(|pair| pair.truncate_version(self.api_version))
.collect()
})
}

/// Create a new 'put' request.
Expand All @@ -270,8 +299,9 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!("invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
let key = key.into().encode_version(self.api_version, KeyMode::Raw);
let request = new_raw_put_request(key, value.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.extract_error()
Expand Down Expand Up @@ -302,12 +332,11 @@ impl<PdC: PdClient> Client<PdC> {
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
debug!("invoking raw batch_put request");
let request = new_raw_batch_put_request(
pairs.into_iter().map(Into::into),
self.cf.clone(),
self.atomic,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
let pairs = pairs
.into_iter()
.map(|pair| pair.into().encode_version(self.api_version, KeyMode::Raw));
let request = new_raw_batch_put_request(pairs, self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.plan();
Expand All @@ -334,8 +363,9 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
debug!("invoking raw delete request");
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
let key = key.into().encode_version(self.api_version, KeyMode::Raw);
let request = new_raw_delete_request(key, self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.extract_error()
Expand Down Expand Up @@ -364,9 +394,11 @@ impl<PdC: PdClient> Client<PdC> {
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
debug!("invoking raw batch_delete request");
self.assert_non_atomic()?;
let request =
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
let keys = keys
.into_iter()
.map(|k| k.into().encode_version(self.api_version, KeyMode::Raw));
let request = new_raw_batch_delete_request(keys, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.plan();
Expand All @@ -392,8 +424,9 @@ impl<PdC: PdClient> Client<PdC> {
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
debug!("invoking raw delete_range request");
self.assert_non_atomic()?;
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
let range = range.into().encode_version(self.api_version, KeyMode::Raw);
let request = new_raw_delete_range_request(range, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.plan();
Expand Down Expand Up @@ -543,13 +576,14 @@ impl<PdC: PdClient> Client<PdC> {
) -> Result<(Option<Value>, bool)> {
debug!("invoking raw compare_and_swap request");
self.assert_atomic()?;
let key = key.into().encode_version(self.api_version, KeyMode::Raw);
let req = new_cas_request(
key.into(),
key,
new_value.into(),
previous_value.into(),
self.cf.clone(),
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.post_process_default()
Expand All @@ -563,16 +597,29 @@ impl<PdC: PdClient> Client<PdC> {
copr_version_req: impl Into<String>,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
) -> Result<Vec<(Vec<u8>, Vec<Range<Key>>)>> {
) -> Result<Vec<(Vec<Range<Key>>, Vec<u8>)>> {
let copr_version_req = copr_version_req.into();
semver::VersionReq::from_str(&copr_version_req)?;
let ranges = ranges
.into_iter()
.map(|range| range.into().encode_version(self.api_version, KeyMode::Raw));
let api_version = self.api_version;
let request_builder = move |region, ranges: Vec<Range<Key>>| {
request_builder(
region,
ranges
.into_iter()
.map(|range| range.truncate_version(api_version))
.collect(),
)
};
let req = new_raw_coprocessor_request(
copr_name.into(),
copr_version_req,
ranges.into_iter().map(Into::into),
ranges,
request_builder,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req)
.preserve_shard()
.retry_multi_region(self.backoff.clone())
.post_process_default()
Expand All @@ -592,8 +639,9 @@ impl<PdC: PdClient> Client<PdC> {
max_limit: MAX_RAW_KV_SCAN_LIMIT,
});
}

let mut cur_range = range.into().encode_version(self.api_version, KeyMode::Raw);
let mut result = Vec::new();
let mut cur_range = range.into();
let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed();
let mut region_store =
scan_regions
Expand All @@ -603,22 +651,25 @@ impl<PdC: PdClient> Client<PdC> {
range: (cur_range.clone()),
})??;
let mut cur_limit = limit;

while cur_limit > 0 {
let request =
new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region_with_store(region_store.clone())
.await?
.plan()
.execute()
.await?;
let resp =
crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.single_region_with_store(region_store.clone())
.await?
.plan()
.execute()
.await?;
let mut region_scan_res = resp
.kvs
.into_iter()
.map(Into::into)
.collect::<Vec<KvPair>>();
let res_len = region_scan_res.len();
result.append(&mut region_scan_res);

// if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
if res_len < cur_limit as usize {
region_store = match scan_regions.next().await {
Expand All @@ -637,8 +688,13 @@ impl<PdC: PdClient> Client<PdC> {
break;
}
}

// limit is a soft limit, so we need check the number of results
result.truncate(limit as usize);

// truncate the version of keys
let result = result.truncate_version(self.api_version);

Ok(result)
}

Expand All @@ -655,17 +711,20 @@ impl<PdC: PdClient> Client<PdC> {
});
}

let request = new_raw_batch_scan_request(
ranges.into_iter().map(Into::into),
each_limit,
key_only,
self.cf.clone(),
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
let ranges = ranges
.into_iter()
.map(|range| range.into().encode_version(self.api_version, KeyMode::Raw));

let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.merge(Collect)
.plan();
plan.execute().await
plan.execute().await.map(|r| {
r.into_iter()
.map(|pair| pair.truncate_version(self.api_version))
.collect()
})
}

fn assert_non_atomic(&self) -> Result<()> {
Expand Down Expand Up @@ -718,6 +777,7 @@ mod tests {
cf: Some(ColumnFamily::Default),
backoff: DEFAULT_REGION_BACKOFF,
atomic: false,
api_version: APIVersion::V1,
};
let resps = client
.coprocessor(
Expand All @@ -729,25 +789,25 @@ mod tests {
.await?;
let resps: Vec<_> = resps
.into_iter()
.map(|(data, ranges)| (String::from_utf8(data).unwrap(), ranges))
.map(|(ranges, data)| (ranges, String::from_utf8(data).unwrap()))
.collect();
assert_eq!(
resps,
vec![
(
vec![Key::from(vec![5])..Key::from(vec![10])],
"1:[Key(05)..Key(0A)]".to_string(),
vec![Key::from(vec![5])..Key::from(vec![10])]
),
(
"2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]".to_string(),
vec![
Key::from(vec![10])..Key::from(vec![15]),
Key::from(vec![20])..Key::from(vec![250, 250])
]
],
"2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]".to_string(),
),
(
vec![Key::from(vec![250, 250])..Key::from(vec![])],
"3:[Key(FAFA)..Key()]".to_string(),
vec![Key::from(vec![250, 250])..Key::from(vec![])]
)
]
);
Expand Down
Loading

0 comments on commit 80bbd88

Please sign in to comment.