Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
andylokandy committed Dec 18, 2023
1 parent c9f6cc5 commit f07fbc8
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 250 deletions.
93 changes: 46 additions & 47 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::metapb;
use crate::raw::lowering::*;
use crate::request::APIVersion;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::EncodeVersion;
use crate::request::EncodeKeyspace;
use crate::request::KeyMode;
use crate::request::Keyspace;
use crate::request::Plan;
use crate::request::TruncateVersion;
use crate::request::TruncateKeyspace;
use crate::Backoff;
use crate::BoundRange;
use crate::ColumnFamily;
Expand All @@ -45,7 +45,7 @@ pub struct Client<PdC: PdClient = PdRpcClient> {
backoff: Backoff,
/// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
atomic: bool,
api_version: APIVersion,
keyspace: Keyspace,
}

impl Clone for Client {
Expand All @@ -55,7 +55,7 @@ impl Clone for Client {
cf: self.cf.clone(),
backoff: self.backoff.clone(),
atomic: self.atomic,
api_version: self.api_version,
keyspace: self.keyspace,
}
}
}
Expand Down Expand Up @@ -109,19 +109,19 @@ impl Client<PdRpcClient> {
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let rpc =
Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), enable_codec).await?);
let api_version = match config.keyspace {
let keyspace = match config.keyspace {
Some(keyspace) => {
let keyspace_id = rpc.get_keyspace_id(&keyspace).await?;
APIVersion::V2 { keyspace_id }
Keyspace::Enable { keyspace_id }
}
None => APIVersion::V1,
None => Keyspace::Disable,
};
Ok(Client {
rpc,
cf: None,
backoff: DEFAULT_REGION_BACKOFF,
atomic: false,
api_version,
keyspace,
})
}

Expand Down Expand Up @@ -156,7 +156,7 @@ impl Client<PdRpcClient> {
cf: Some(cf),
backoff: self.backoff.clone(),
atomic: self.atomic,
api_version: self.api_version,
keyspace: self.keyspace,
}
}

Expand Down Expand Up @@ -185,7 +185,7 @@ impl Client<PdRpcClient> {
cf: self.cf.clone(),
backoff,
atomic: self.atomic,
api_version: self.api_version,
keyspace: self.keyspace,
}
}

Expand All @@ -203,7 +203,7 @@ impl Client<PdRpcClient> {
cf: self.cf.clone(),
backoff: self.backoff.clone(),
atomic: true,
api_version: self.api_version,
keyspace: self.keyspace,
}
}
}
Expand All @@ -229,9 +229,9 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking raw get request");
let key = key.into().encode_version(self.api_version, KeyMode::Raw);
let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let request = new_raw_get_request(key, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.post_process_default()
Expand Down Expand Up @@ -264,15 +264,15 @@ impl<PdC: PdClient> Client<PdC> {
debug!("invoking raw batch_get request");
let keys = keys
.into_iter()
.map(|k| k.into().encode_version(self.api_version, KeyMode::Raw));
.map(|k| k.into().encode_keyspace(self.keyspace, KeyMode::Raw));
let request = new_raw_batch_get_request(keys, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.retry_multi_region(self.backoff.clone())
.merge(Collect)
.plan();
plan.execute().await.map(|r| {
r.into_iter()
.map(|pair| pair.truncate_version(self.api_version))
.map(|pair| pair.truncate_keyspace(self.keyspace))
.collect()
})
}
Expand All @@ -295,9 +295,9 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!("invoking raw put request");
let key = key.into().encode_version(self.api_version, KeyMode::Raw);
let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let request = new_raw_put_request(key, value.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.extract_error()
Expand Down Expand Up @@ -330,9 +330,9 @@ impl<PdC: PdClient> Client<PdC> {
debug!("invoking raw batch_put request");
let pairs = pairs
.into_iter()
.map(|pair| pair.into().encode_version(self.api_version, KeyMode::Raw));
.map(|pair| pair.into().encode_keyspace(self.keyspace, KeyMode::Raw));
let request = new_raw_batch_put_request(pairs, self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.plan();
Expand All @@ -359,9 +359,9 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
debug!("invoking raw delete request");
let key = key.into().encode_version(self.api_version, KeyMode::Raw);
let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let request = new_raw_delete_request(key, self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.extract_error()
Expand Down Expand Up @@ -392,9 +392,9 @@ impl<PdC: PdClient> Client<PdC> {
self.assert_non_atomic()?;
let keys = keys
.into_iter()
.map(|k| k.into().encode_version(self.api_version, KeyMode::Raw));
.map(|k| k.into().encode_keyspace(self.keyspace, KeyMode::Raw));
let request = new_raw_batch_delete_request(keys, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.plan();
Expand All @@ -420,9 +420,9 @@ impl<PdC: PdClient> Client<PdC> {
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
debug!("invoking raw delete_range request");
self.assert_non_atomic()?;
let range = range.into().encode_version(self.api_version, KeyMode::Raw);
let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let request = new_raw_delete_range_request(range, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.plan();
Expand Down Expand Up @@ -572,14 +572,14 @@ impl<PdC: PdClient> Client<PdC> {
) -> Result<(Option<Value>, bool)> {
debug!("invoking raw compare_and_swap request");
self.assert_atomic()?;
let key = key.into().encode_version(self.api_version, KeyMode::Raw);
let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let req = new_cas_request(
key,
new_value.into(),
previous_value.into(),
self.cf.clone(),
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.post_process_default()
Expand All @@ -598,14 +598,14 @@ impl<PdC: PdClient> Client<PdC> {
semver::VersionReq::from_str(&copr_version_req)?;
let ranges = ranges
.into_iter()
.map(|range| range.into().encode_version(self.api_version, KeyMode::Raw));
let api_version = self.api_version;
.map(|range| range.into().encode_keyspace(self.keyspace, KeyMode::Raw));
let keyspace = self.keyspace;
let request_builder = move |region, ranges: Vec<Range<Key>>| {
request_builder(
region,
ranges
.into_iter()
.map(|range| range.truncate_version(api_version))
.map(|range| range.truncate_keyspace(keyspace))
.collect(),
)
};
Expand All @@ -615,7 +615,7 @@ impl<PdC: PdClient> Client<PdC> {
ranges,
request_builder,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
.preserve_shard()
.retry_multi_region(self.backoff.clone())
.post_process_default()
Expand All @@ -624,7 +624,7 @@ impl<PdC: PdClient> Client<PdC> {
.execute()
.await?
.into_iter()
.map(|(ranges, data)| (ranges.truncate_version(api_version), data))
.map(|(ranges, data)| (ranges.truncate_keyspace(keyspace), data))
.collect())
}

Expand All @@ -641,7 +641,7 @@ impl<PdC: PdClient> Client<PdC> {
});
}

let mut cur_range = range.into().encode_version(self.api_version, KeyMode::Raw);
let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let mut result = Vec::new();
let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed();
let mut region_store =
Expand All @@ -656,13 +656,12 @@ impl<PdC: PdClient> Client<PdC> {
while cur_limit > 0 {
let request =
new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
let resp =
crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.single_region_with_store(region_store.clone())
.await?
.plan()
.execute()
.await?;
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.single_region_with_store(region_store.clone())
.await?
.plan()
.execute()
.await?;
let mut region_scan_res = resp
.kvs
.into_iter()
Expand Down Expand Up @@ -694,7 +693,7 @@ impl<PdC: PdClient> Client<PdC> {
result.truncate(limit as usize);

// truncate the version of keys
let result = result.truncate_version(self.api_version);
let result = result.truncate_keyspace(self.keyspace);

Ok(result)
}
Expand All @@ -714,16 +713,16 @@ impl<PdC: PdClient> Client<PdC> {

let ranges = ranges
.into_iter()
.map(|range| range.into().encode_version(self.api_version, KeyMode::Raw));
.map(|range| range.into().encode_keyspace(self.keyspace, KeyMode::Raw));

let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.retry_multi_region(self.backoff.clone())
.merge(Collect)
.plan();
plan.execute().await.map(|r| {
r.into_iter()
.map(|pair| pair.truncate_version(self.api_version))
.map(|pair| pair.truncate_keyspace(self.keyspace))
.collect()
})
}
Expand Down Expand Up @@ -778,7 +777,7 @@ mod tests {
cf: Some(ColumnFamily::Default),
backoff: DEFAULT_REGION_BACKOFF,
atomic: false,
api_version: APIVersion::V2 { keyspace_id: 0 },
keyspace: Keyspace::Enable { keyspace_id: 0 },
};
let resps = client
.coprocessor(
Expand Down
12 changes: 6 additions & 6 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,15 +503,15 @@ mod test {
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb;
use crate::request::APIVersion;
use crate::request::Keyspace;
use crate::request::Plan;
use crate::Key;

#[rstest::rstest]
#[case(APIVersion::V1)]
#[case(APIVersion::V2 { keyspace_id: 0 })]
#[case(Keyspace::Disable)]
#[case(Keyspace::Enable { keyspace_id: 0 })]
#[tokio::test]
async fn test_raw_scan(#[case] api_version: APIVersion) {
async fn test_raw_scan(#[case] keyspace: Keyspace) {
let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
|req: &dyn Any| {
let req: &kvrpcpb::RawScanRequest = req.downcast_ref().unwrap();
Expand Down Expand Up @@ -540,8 +540,8 @@ mod test {
key_only: true,
..Default::default()
};
let plan = crate::request::PlanBuilder::new(client, api_version, scan)
.resolve_lock(OPTIMISTIC_BACKOFF, api_version)
let plan = crate::request::PlanBuilder::new(client, keyspace, scan)
.resolve_lock(OPTIMISTIC_BACKOFF, keyspace)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
.plan();
Expand Down
Loading

0 comments on commit f07fbc8

Please sign in to comment.