From 61f44259465219323dac3eeb86d37c5f37731900 Mon Sep 17 00:00:00 2001 From: andylokandy Date: Wed, 8 Dec 2021 00:16:22 +0800 Subject: [PATCH] add ttl for raw client Signed-off-by: andylokandy --- src/kv/kvpair.rs | 25 +++++++++++++++ src/kv/mod.rs | 2 +- src/raw/client.rs | 29 ++++++++++++++--- src/raw/lowering.rs | 20 +++++++++--- src/raw/requests.rs | 53 +++++++++++++++++++++++++++++--- src/request/plan.rs | 2 +- src/transaction/requests.rs | 8 ++--- tikv-client-store/src/errors.rs | 2 ++ tikv-client-store/src/request.rs | 5 +++ 9 files changed, 126 insertions(+), 20 deletions(-) diff --git a/src/kv/kvpair.rs b/src/kv/kvpair.rs index c4ce4443..eb51a205 100644 --- a/src/kv/kvpair.rs +++ b/src/kv/kvpair.rs @@ -6,6 +6,25 @@ use proptest_derive::Arbitrary; use std::{fmt, str}; use tikv_client_proto::kvrpcpb; +/// A key/value pair with TTL (in seconds). +/// +/// # Examples +/// ```rust +/// # use tikv_client::{Key, Value, KvPair}; +/// let key = "key".to_owned(); +/// let value = "value".to_owned(); +/// let pair = KvPair::new(key, value); +/// let pair_with_ttl = pair.with_ttl(60); +/// ``` +pub struct KvPairWithTTL(pub KvPair, pub u64); + +/// Convert `Into` to a `KvPairWithTTL` with no TTL. +impl> From for KvPairWithTTL { + fn from(pair: K) -> Self { + Self(pair.into(), 0) + } +} + /// A key/value pair. /// /// # Examples @@ -78,6 +97,12 @@ impl KvPair { pub fn set_value(&mut self, v: impl Into) { self.1 = v.into(); } + + /// Convert the `KvPair` into a `KvPairWithTTL` with the given TTL (in seconds). + #[inline] + pub fn with_ttl(self, ttl: u64) -> KvPairWithTTL { + KvPairWithTTL(self, ttl) + } } impl From<(K, V)> for KvPair diff --git a/src/kv/mod.rs b/src/kv/mod.rs index e3942164..81903f3d 100644 --- a/src/kv/mod.rs +++ b/src/kv/mod.rs @@ -9,7 +9,7 @@ mod value; pub use bound_range::{BoundRange, IntoOwnedRange}; pub use key::Key; -pub use kvpair::KvPair; +pub use kvpair::{KvPair, KvPairWithTTL}; pub use value::Value; struct HexRepr<'a>(pub &'a [u8]); diff --git a/src/raw/client.rs b/src/raw/client.rs index b9ac740d..4926440b 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -10,6 +10,7 @@ use tikv_client_proto::metapb; use crate::{ backoff::DEFAULT_REGION_BACKOFF, config::Config, + kv::KvPairWithTTL, pd::{PdClient, PdRpcClient}, raw::lowering::*, request::{Collect, CollectSingle, Plan}, @@ -245,6 +246,17 @@ impl Client { .map(|r| r.into_iter().map(Into::into).collect()) } + pub async fn get_key_ttl_secs(&self, key: impl Into) -> Result> { + debug!(self.logger, "invoking raw get_key_ttl_secs request"); + let request = new_raw_get_key_ttl_request(key.into(), self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .merge(CollectSingle) + .post_process_default() + .plan(); + plan.execute().await + } + /// Create a new 'put' request. /// /// Once resolved this request will result in the setting of the value associated with the given key. @@ -262,18 +274,25 @@ impl Client { /// # }); /// ``` pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { - self.put_opt(key, value, DEFAULT_REGION_BACKOFF).await + self.put_opt(key, value, DEFAULT_REGION_BACKOFF, 0).await } - /// Same as [`put`](Client::put) but with custom [`backoff`](crate::Backoff) strategy. + /// Same as [`put`](Client::put) but with custom [`backoff`](crate::Backoff) strategy and ttl. pub async fn put_opt( &self, key: impl Into, value: impl Into, backoff: Backoff, + ttl_secs: u64, ) -> Result<()> { debug!(self.logger, "invoking raw put request"); - let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic); + let request = new_raw_put_request( + key.into(), + value.into(), + self.cf.clone(), + ttl_secs, + self.atomic, + ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(backoff) .merge(CollectSingle) @@ -307,10 +326,10 @@ impl Client { self.batch_put_opt(pairs, DEFAULT_REGION_BACKOFF).await } - /// Same as [`batch_put`](Client::batch_put) but with custom [`backoff`](crate::Backoff) strategy. + /// Same as [`batch_put`](Client::batch_put) but with custom [`backoff`](crate::Backoff) strategy and the optionally add a TTL to the key value pairs. pub async fn batch_put_opt( &self, - pairs: impl IntoIterator>, + pairs: impl IntoIterator>, backoff: Backoff, ) -> Result<()> { debug!(self.logger, "invoking raw batch_put request"); diff --git a/src/raw/lowering.rs b/src/raw/lowering.rs index 32327a80..ced86de4 100644 --- a/src/raw/lowering.rs +++ b/src/raw/lowering.rs @@ -8,7 +8,7 @@ use std::{iter::Iterator, ops::Range, sync::Arc}; use tikv_client_proto::{kvrpcpb, metapb}; -use crate::{raw::requests, BoundRange, ColumnFamily, Key, KvPair, Value}; +use crate::{kv::KvPairWithTTL, raw::requests, BoundRange, ColumnFamily, Key, Value}; pub fn new_raw_get_request(key: Key, cf: Option) -> kvrpcpb::RawGetRequest { requests::new_raw_get_request(key.into(), cf) @@ -21,21 +21,33 @@ pub fn new_raw_batch_get_request( requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf) } +pub fn new_raw_get_key_ttl_request( + key: Key, + cf: Option, +) -> kvrpcpb::RawGetKeyTtlRequest { + requests::new_raw_get_key_ttl_request(key.into(), cf) +} + pub fn new_raw_put_request( key: Key, value: Value, cf: Option, + ttl: u64, atomic: bool, ) -> kvrpcpb::RawPutRequest { - requests::new_raw_put_request(key.into(), value, cf, atomic) + requests::new_raw_put_request(key.into(), value, cf, ttl, atomic) } pub fn new_raw_batch_put_request( - pairs: impl Iterator, + pairs_with_ttl: impl Iterator, cf: Option, atomic: bool, ) -> kvrpcpb::RawBatchPutRequest { - requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic) + let (pairs, ttls) = pairs_with_ttl + .into_iter() + .map(|pair_with_ttl| (pair_with_ttl.0.into(), pair_with_ttl.1)) + .unzip(); + requests::new_raw_batch_put_request(pairs, cf, ttls, atomic) } pub fn new_raw_delete_request( diff --git a/src/raw/requests.rs b/src/raw/requests.rs index bd678aaf..111e70d7 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -10,7 +10,7 @@ use tikv_client_store::Request; use super::RawRpcRequest; use crate::{ - collect_first, + collect_single, pd::PdClient, request::{ plan::ResponseWithShard, Collect, CollectSingle, DefaultProcessor, KvRequest, Merge, @@ -35,7 +35,7 @@ impl KvRequest for kvrpcpb::RawGetRequest { } shardable_key!(kvrpcpb::RawGetRequest); -collect_first!(kvrpcpb::RawGetResponse); +collect_single!(kvrpcpb::RawGetResponse); impl SingleKey for kvrpcpb::RawGetRequest { fn key(&self) -> &Vec { @@ -84,16 +84,55 @@ impl Merge for Collect { } } +pub fn new_raw_get_key_ttl_request( + key: Vec, + cf: Option, +) -> kvrpcpb::RawGetKeyTtlRequest { + let mut req = kvrpcpb::RawGetKeyTtlRequest::default(); + req.set_key(key); + req.maybe_set_cf(cf); + + req +} + +impl KvRequest for kvrpcpb::RawGetKeyTtlRequest { + type Response = kvrpcpb::RawGetKeyTtlResponse; +} + +shardable_key!(kvrpcpb::RawGetKeyTtlRequest); +collect_single!(kvrpcpb::RawGetKeyTtlResponse); + +impl SingleKey for kvrpcpb::RawGetKeyTtlRequest { + fn key(&self) -> &Vec { + &self.key + } +} + +impl Process for DefaultProcessor { + type Out = Option; + + fn process(&self, input: Result) -> Result { + let input = input?; + Ok(if input.not_found { + None + } else { + Some(input.ttl) + }) + } +} + pub fn new_raw_put_request( key: Vec, value: Vec, cf: Option, + ttl: u64, atomic: bool, ) -> kvrpcpb::RawPutRequest { let mut req = kvrpcpb::RawPutRequest::default(); req.set_key(key); req.set_value(value); req.maybe_set_cf(cf); + req.set_ttl(ttl); req.set_for_cas(atomic); req @@ -104,7 +143,7 @@ impl KvRequest for kvrpcpb::RawPutRequest { } shardable_key!(kvrpcpb::RawPutRequest); -collect_first!(kvrpcpb::RawPutResponse); +collect_single!(kvrpcpb::RawPutResponse); impl SingleKey for kvrpcpb::RawPutRequest { fn key(&self) -> &Vec { &self.key @@ -114,11 +153,13 @@ impl SingleKey for kvrpcpb::RawPutRequest { pub fn new_raw_batch_put_request( pairs: Vec, cf: Option, + ttls: Vec, atomic: bool, ) -> kvrpcpb::RawBatchPutRequest { let mut req = kvrpcpb::RawBatchPutRequest::default(); req.set_pairs(pairs); req.maybe_set_cf(cf); + req.set_ttls(ttls); req.set_for_cas(atomic); req @@ -168,7 +209,7 @@ impl KvRequest for kvrpcpb::RawDeleteRequest { } shardable_key!(kvrpcpb::RawDeleteRequest); -collect_first!(kvrpcpb::RawDeleteResponse); +collect_single!(kvrpcpb::RawDeleteResponse); impl SingleKey for kvrpcpb::RawDeleteRequest { fn key(&self) -> &Vec { &self.key @@ -314,7 +355,7 @@ impl KvRequest for kvrpcpb::RawCasRequest { } shardable_key!(kvrpcpb::RawCasRequest); -collect_first!(kvrpcpb::RawCasResponse); +collect_single!(kvrpcpb::RawCasResponse); impl SingleKey for kvrpcpb::RawCasRequest { fn key(&self) -> &Vec { &self.key @@ -445,6 +486,7 @@ macro_rules! impl_raw_rpc_request { impl_raw_rpc_request!(RawGetRequest); impl_raw_rpc_request!(RawBatchGetRequest); +impl_raw_rpc_request!(RawGetKeyTtlRequest); impl_raw_rpc_request!(RawPutRequest); impl_raw_rpc_request!(RawBatchPutRequest); impl_raw_rpc_request!(RawDeleteRequest); @@ -456,6 +498,7 @@ impl_raw_rpc_request!(RawCasRequest); impl HasLocks for kvrpcpb::RawGetResponse {} impl HasLocks for kvrpcpb::RawBatchGetResponse {} +impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {} impl HasLocks for kvrpcpb::RawPutResponse {} impl HasLocks for kvrpcpb::RawBatchPutResponse {} impl HasLocks for kvrpcpb::RawDeleteResponse {} diff --git a/src/request/plan.rs b/src/request/plan.rs index ce785262..53ef909a 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -333,7 +333,7 @@ pub struct Collect; pub struct CollectSingle; #[macro_export] -macro_rules! collect_first { +macro_rules! collect_single { ($type_: ty) => { impl Merge<$type_> for CollectSingle { type Out = $type_; diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 16243d83..05e697e3 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -1,7 +1,7 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ - collect_first, + collect_single, pd::PdClient, request::{ Collect, CollectSingle, CollectWithShard, DefaultProcessor, KvRequest, Merge, Process, @@ -75,7 +75,7 @@ impl KvRequest for kvrpcpb::GetRequest { } shardable_key!(kvrpcpb::GetRequest); -collect_first!(kvrpcpb::GetResponse); +collect_single!(kvrpcpb::GetResponse); impl SingleKey for kvrpcpb::GetRequest { fn key(&self) -> &Vec { &self.key @@ -191,7 +191,7 @@ impl KvRequest for kvrpcpb::CleanupRequest { } shardable_key!(kvrpcpb::CleanupRequest); -collect_first!(kvrpcpb::CleanupResponse); +collect_single!(kvrpcpb::CleanupResponse); impl SingleKey for kvrpcpb::CleanupRequest { fn key(&self) -> &Vec { &self.key @@ -515,7 +515,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest { } } -collect_first!(TxnHeartBeatResponse); +collect_single!(TxnHeartBeatResponse); impl SingleKey for kvrpcpb::TxnHeartBeatRequest { fn key(&self) -> &Vec { diff --git a/tikv-client-store/src/errors.rs b/tikv-client-store/src/errors.rs index c1e915ac..bc3da0d8 100644 --- a/tikv-client-store/src/errors.rs +++ b/tikv-client-store/src/errors.rs @@ -57,6 +57,7 @@ has_region_error!(kvrpcpb::DeleteRangeResponse); has_region_error!(kvrpcpb::GcResponse); has_region_error!(kvrpcpb::RawGetResponse); has_region_error!(kvrpcpb::RawBatchGetResponse); +has_region_error!(kvrpcpb::RawGetKeyTtlResponse); has_region_error!(kvrpcpb::RawPutResponse); has_region_error!(kvrpcpb::RawBatchPutResponse); has_region_error!(kvrpcpb::RawDeleteResponse); @@ -109,6 +110,7 @@ macro_rules! has_str_error { } has_str_error!(kvrpcpb::RawGetResponse); +has_str_error!(kvrpcpb::RawGetKeyTtlResponse); has_str_error!(kvrpcpb::RawPutResponse); has_str_error!(kvrpcpb::RawBatchPutResponse); has_str_error!(kvrpcpb::RawDeleteResponse); diff --git a/tikv-client-store/src/request.rs b/tikv-client-store/src/request.rs index 290f142a..fa70f017 100644 --- a/tikv-client-store/src/request.rs +++ b/tikv-client-store/src/request.rs @@ -47,6 +47,11 @@ macro_rules! impl_request { impl_request!(RawGetRequest, raw_get_async_opt, "raw_get"); impl_request!(RawBatchGetRequest, raw_batch_get_async_opt, "raw_batch_get"); +impl_request!( + RawGetKeyTtlRequest, + raw_get_key_ttl_async_opt, + "raw_get_key_ttl" +); impl_request!(RawPutRequest, raw_put_async_opt, "raw_put"); impl_request!(RawBatchPutRequest, raw_batch_put_async_opt, "raw_batch_put"); impl_request!(RawDeleteRequest, raw_delete_async_opt, "raw_delete");