-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix batch_put ttl issue #457
Merged
pingyu
merged 7 commits into
tikv:master
from
limbooverlambda:limbooverlambda/fix-batch-put-ttl-issue
Jul 1, 2024
Merged
Changes from 6 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
767e930
fixing the shard issue with batch_put
limbooverlambda d6bd8a4
PR feedback
limbooverlambda 463024c
more make check fixes
limbooverlambda 390e7c7
removing redundant map
limbooverlambda 9c907c9
Merge branch 'master' into limbooverlambda/fix-batch-put-ttl-issue
pingyu 9d76994
more PR feedback
limbooverlambda 74133f9
slight formatting change and remove another redundant clone
limbooverlambda File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -7,10 +7,12 @@ use std::time::Duration; | |||||
|
||||||
use async_trait::async_trait; | ||||||
use futures::stream::BoxStream; | ||||||
|
||||||
use tonic::transport::Channel; | ||||||
|
||||||
use super::RawRpcRequest; | ||||||
use crate::collect_single; | ||||||
use crate::kv::KvPairTTL; | ||||||
use crate::pd::PdClient; | ||||||
use crate::proto::kvrpcpb; | ||||||
use crate::proto::metapb; | ||||||
|
@@ -190,23 +192,28 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest { | |||||
} | ||||||
|
||||||
impl Shardable for kvrpcpb::RawBatchPutRequest { | ||||||
type Shard = Vec<kvrpcpb::KvPair>; | ||||||
type Shard = Vec<(kvrpcpb::KvPair, u64)>; | ||||||
|
||||||
fn shards( | ||||||
&self, | ||||||
pd_client: &Arc<impl PdClient>, | ||||||
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { | ||||||
let mut pairs = self.pairs.clone(); | ||||||
pairs.sort_by(|a, b| a.key.cmp(&b.key)); | ||||||
store_stream_for_keys( | ||||||
pairs.into_iter().map(Into::<KvPair>::into), | ||||||
pd_client.clone(), | ||||||
) | ||||||
let kvs = self.pairs.clone(); | ||||||
let ttls = self.ttls.clone(); | ||||||
let mut kv_ttl: Vec<KvPairTTL> = kvs | ||||||
.into_iter() | ||||||
.zip(ttls) | ||||||
.map(|(kv, ttl)| KvPairTTL(kv, ttl)) | ||||||
.collect(); | ||||||
kv_ttl.sort_by(|a, b| a.0.key.clone().cmp(&b.0.key)); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. |
||||||
store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) | ||||||
} | ||||||
|
||||||
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { | ||||||
let (pairs, ttls) = shard.into_iter().unzip(); | ||||||
self.set_leader(&store.region_with_leader)?; | ||||||
self.pairs = shard; | ||||||
self.pairs = pairs; | ||||||
self.ttls = ttls; | ||||||
Ok(()) | ||||||
} | ||||||
} | ||||||
|
@@ -531,21 +538,35 @@ impl_raw_rpc_request!(RawDeleteRangeRequest); | |||||
impl_raw_rpc_request!(RawCasRequest); | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawGetResponse {} | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawBatchGetResponse {} | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {} | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawPutResponse {} | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawBatchPutResponse {} | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawDeleteResponse {} | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawBatchDeleteResponse {} | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawScanResponse {} | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawBatchScanResponse {} | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawDeleteRangeResponse {} | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawCasResponse {} | ||||||
|
||||||
impl HasLocks for kvrpcpb::RawCoprocessorResponse {} | ||||||
|
||||||
#[cfg(test)] | ||||||
mod test { | ||||||
use std::any::Any; | ||||||
use std::collections::HashMap; | ||||||
use std::ops::Deref; | ||||||
use std::sync::Mutex; | ||||||
|
||||||
use super::*; | ||||||
use crate::backoff::DEFAULT_REGION_BACKOFF; | ||||||
|
@@ -555,7 +576,6 @@ mod test { | |||||
use crate::proto::kvrpcpb; | ||||||
use crate::request::Keyspace; | ||||||
use crate::request::Plan; | ||||||
use crate::Key; | ||||||
|
||||||
#[rstest::rstest] | ||||||
#[case(Keyspace::Disable)] | ||||||
|
@@ -600,4 +620,58 @@ mod test { | |||||
assert_eq!(scan.len(), 49); | ||||||
// FIXME test the keys returned. | ||||||
} | ||||||
|
||||||
#[tokio::test] | ||||||
async fn test_raw_batch_put() -> Result<()> { | ||||||
let region1_kvs = vec![KvPair(vec![9].into(), vec![12])]; | ||||||
let region1_ttls = vec![0]; | ||||||
let region2_kvs = vec![ | ||||||
KvPair(vec![11].into(), vec![12]), | ||||||
KvPair("FFF".to_string().as_bytes().to_vec().into(), vec![12]), | ||||||
]; | ||||||
let region2_ttls = vec![0, 1]; | ||||||
|
||||||
let expected_map = HashMap::from([ | ||||||
(region1_kvs.clone(), region1_ttls.clone()), | ||||||
(region2_kvs.clone(), region2_ttls.clone()), | ||||||
]); | ||||||
|
||||||
let pairs: Vec<kvrpcpb::KvPair> = [region1_kvs, region2_kvs] | ||||||
.concat() | ||||||
.into_iter() | ||||||
.map(|kv| kv.into()) | ||||||
.collect(); | ||||||
let ttls = [region1_ttls, region2_ttls].concat(); | ||||||
let cf = ColumnFamily::Default; | ||||||
|
||||||
let actual_map: Arc<Mutex<HashMap<Vec<KvPair>, Vec<u64>>>> = | ||||||
Arc::new(Mutex::new(HashMap::new())); | ||||||
let fut_actual_map = actual_map.clone(); | ||||||
let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( | ||||||
move |req: &dyn Any| { | ||||||
let req: &kvrpcpb::RawBatchPutRequest = req.downcast_ref().unwrap(); | ||||||
let kv_pair = req | ||||||
.pairs | ||||||
.clone() | ||||||
.into_iter() | ||||||
.map(|p| p.into()) | ||||||
.collect::<Vec<KvPair>>(); | ||||||
let ttls = req.ttls.clone(); | ||||||
fut_actual_map.lock().unwrap().insert(kv_pair, ttls); | ||||||
let resp = kvrpcpb::RawBatchPutResponse::default(); | ||||||
Ok(Box::new(resp) as Box<dyn Any>) | ||||||
}, | ||||||
))); | ||||||
|
||||||
let batch_put_request = | ||||||
new_raw_batch_put_request(pairs.clone(), ttls.clone(), Some(cf), false); | ||||||
let keyspace = Keyspace::Enable { keyspace_id: 0 }; | ||||||
let plan = crate::request::PlanBuilder::new(client, keyspace, batch_put_request) | ||||||
.resolve_lock(OPTIMISTIC_BACKOFF, keyspace) | ||||||
.retry_multi_region(DEFAULT_REGION_BACKOFF) | ||||||
.plan(); | ||||||
let _ = plan.execute().await; | ||||||
assert_eq!(actual_map.lock().unwrap().deref(), &expected_map); | ||||||
Ok(()) | ||||||
} | ||||||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.