Skip to content

Commit

Permalink
add ttl for raw client
Browse files Browse the repository at this point in the history
Signed-off-by: andylokandy <[email protected]>
  • Loading branch information
andylokandy authored and Dillen Meijboom committed Oct 27, 2022
1 parent 027a7df commit 72d1520
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 20 deletions.
25 changes: 25 additions & 0 deletions src/kv/kvpair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,25 @@ use proptest_derive::Arbitrary;
use std::{fmt, str};
use tikv_client_proto::kvrpcpb;

/// A key/value pair with TTL (in seconds).
///
/// # Examples
/// ```rust
/// # use tikv_client::{Key, Value, KvPair};
/// let key = "key".to_owned();
/// let value = "value".to_owned();
/// let pair = KvPair::new(key, value);
/// let pair_with_ttl = pair.with_ttl(60);
/// ```
pub struct KvPairWithTTL(pub KvPair, pub u64);

/// Convert `Into<KvPair>` to a `KvPairWithTTL` with no TTL.
impl<K: Into<KvPair>> From<K> for KvPairWithTTL {
fn from(pair: K) -> Self {
Self(pair.into(), 0)
}
}

/// A key/value pair.
///
/// # Examples
Expand Down Expand Up @@ -78,6 +97,12 @@ impl KvPair {
pub fn set_value(&mut self, v: impl Into<Value>) {
self.1 = v.into();
}

/// Convert the `KvPair` into a `KvPairWithTTL` with the given TTL (in seconds).
#[inline]
pub fn with_ttl(self, ttl: u64) -> KvPairWithTTL {
KvPairWithTTL(self, ttl)
}
}

impl<K, V> From<(K, V)> for KvPair
Expand Down
2 changes: 1 addition & 1 deletion src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod value;

pub use bound_range::{BoundRange, IntoOwnedRange};
pub use key::Key;
pub use kvpair::KvPair;
pub use kvpair::{KvPair, KvPairWithTTL};
pub use value::Value;

struct HexRepr<'a>(pub &'a [u8]);
Expand Down
29 changes: 24 additions & 5 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tikv_client_proto::metapb;
use crate::{
backoff::DEFAULT_REGION_BACKOFF,
config::Config,
kv::KvPairWithTTL,
pd::{PdClient, PdRpcClient},
raw::lowering::*,
request::{Collect, CollectSingle, Plan},
Expand Down Expand Up @@ -245,6 +246,17 @@ impl<PdC: PdClient> Client<PdC> {
.map(|r| r.into_iter().map(Into::into).collect())
}

pub async fn get_key_ttl_secs(&self, key: impl Into<Key>) -> Result<Option<u64>> {
debug!(self.logger, "invoking raw get_key_ttl_secs request");
let request = new_raw_get_key_ttl_request(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.post_process_default()
.plan();
plan.execute().await
}

/// Create a new 'put' request.
///
/// Once resolved this request will result in the setting of the value associated with the given key.
Expand All @@ -262,18 +274,25 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
self.put_opt(key, value, DEFAULT_REGION_BACKOFF).await
self.put_opt(key, value, DEFAULT_REGION_BACKOFF, 0).await
}

/// Same as [`put`](Client::put) but with custom [`backoff`](crate::Backoff) strategy.
/// Same as [`put`](Client::put) but with custom [`backoff`](crate::Backoff) strategy and ttl.
pub async fn put_opt(
&self,
key: impl Into<Key>,
value: impl Into<Value>,
backoff: Backoff,
ttl_secs: u64,
) -> Result<()> {
debug!(self.logger, "invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let request = new_raw_put_request(
key.into(),
value.into(),
self.cf.clone(),
ttl_secs,
self.atomic,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(backoff)
.merge(CollectSingle)
Expand Down Expand Up @@ -307,10 +326,10 @@ impl<PdC: PdClient> Client<PdC> {
self.batch_put_opt(pairs, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`batch_put`](Client::batch_put) but with custom [`backoff`](crate::Backoff) strategy.
/// Same as [`batch_put`](Client::batch_put) but with custom [`backoff`](crate::Backoff) strategy and the optionally add a TTL to the key value pairs.
pub async fn batch_put_opt(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
pairs: impl IntoIterator<Item = impl Into<KvPairWithTTL>>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_put request");
Expand Down
20 changes: 16 additions & 4 deletions src/raw/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{iter::Iterator, ops::Range, sync::Arc};

use tikv_client_proto::{kvrpcpb, metapb};

use crate::{raw::requests, BoundRange, ColumnFamily, Key, KvPair, Value};
use crate::{kv::KvPairWithTTL, raw::requests, BoundRange, ColumnFamily, Key, Value};

pub fn new_raw_get_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
requests::new_raw_get_request(key.into(), cf)
Expand All @@ -21,21 +21,33 @@ pub fn new_raw_batch_get_request(
requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf)
}

pub fn new_raw_get_key_ttl_request(
key: Key,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawGetKeyTtlRequest {
requests::new_raw_get_key_ttl_request(key.into(), cf)
}

pub fn new_raw_put_request(
key: Key,
value: Value,
cf: Option<ColumnFamily>,
ttl: u64,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
requests::new_raw_put_request(key.into(), value, cf, atomic)
requests::new_raw_put_request(key.into(), value, cf, ttl, atomic)
}

pub fn new_raw_batch_put_request(
pairs: impl Iterator<Item = KvPair>,
pairs_with_ttl: impl Iterator<Item = KvPairWithTTL>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic)
let (pairs, ttls) = pairs_with_ttl
.into_iter()
.map(|pair_with_ttl| (pair_with_ttl.0.into(), pair_with_ttl.1))
.unzip();
requests::new_raw_batch_put_request(pairs, cf, ttls, atomic)
}

pub fn new_raw_delete_request(
Expand Down
53 changes: 48 additions & 5 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tikv_client_store::Request;

use super::RawRpcRequest;
use crate::{
collect_first,
collect_single,
pd::PdClient,
request::{
plan::ResponseWithShard, Collect, CollectSingle, DefaultProcessor, KvRequest, Merge,
Expand All @@ -35,7 +35,7 @@ impl KvRequest for kvrpcpb::RawGetRequest {
}

shardable_key!(kvrpcpb::RawGetRequest);
collect_first!(kvrpcpb::RawGetResponse);
collect_single!(kvrpcpb::RawGetResponse);

impl SingleKey for kvrpcpb::RawGetRequest {
fn key(&self) -> &Vec<u8> {
Expand Down Expand Up @@ -84,16 +84,55 @@ impl Merge<kvrpcpb::RawBatchGetResponse> for Collect {
}
}

pub fn new_raw_get_key_ttl_request(
key: Vec<u8>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawGetKeyTtlRequest {
let mut req = kvrpcpb::RawGetKeyTtlRequest::default();
req.set_key(key);
req.maybe_set_cf(cf);

req
}

impl KvRequest for kvrpcpb::RawGetKeyTtlRequest {
type Response = kvrpcpb::RawGetKeyTtlResponse;
}

shardable_key!(kvrpcpb::RawGetKeyTtlRequest);
collect_single!(kvrpcpb::RawGetKeyTtlResponse);

impl SingleKey for kvrpcpb::RawGetKeyTtlRequest {
fn key(&self) -> &Vec<u8> {
&self.key
}
}

impl Process<kvrpcpb::RawGetKeyTtlResponse> for DefaultProcessor {
type Out = Option<u64>;

fn process(&self, input: Result<kvrpcpb::RawGetKeyTtlResponse>) -> Result<Self::Out> {
let input = input?;
Ok(if input.not_found {
None
} else {
Some(input.ttl)
})
}
}

pub fn new_raw_put_request(
key: Vec<u8>,
value: Vec<u8>,
cf: Option<ColumnFamily>,
ttl: u64,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
let mut req = kvrpcpb::RawPutRequest::default();
req.set_key(key);
req.set_value(value);
req.maybe_set_cf(cf);
req.set_ttl(ttl);
req.set_for_cas(atomic);

req
Expand All @@ -104,7 +143,7 @@ impl KvRequest for kvrpcpb::RawPutRequest {
}

shardable_key!(kvrpcpb::RawPutRequest);
collect_first!(kvrpcpb::RawPutResponse);
collect_single!(kvrpcpb::RawPutResponse);
impl SingleKey for kvrpcpb::RawPutRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand All @@ -114,11 +153,13 @@ impl SingleKey for kvrpcpb::RawPutRequest {
pub fn new_raw_batch_put_request(
pairs: Vec<kvrpcpb::KvPair>,
cf: Option<ColumnFamily>,
ttls: Vec<u64>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
let mut req = kvrpcpb::RawBatchPutRequest::default();
req.set_pairs(pairs);
req.maybe_set_cf(cf);
req.set_ttls(ttls);
req.set_for_cas(atomic);

req
Expand Down Expand Up @@ -168,7 +209,7 @@ impl KvRequest for kvrpcpb::RawDeleteRequest {
}

shardable_key!(kvrpcpb::RawDeleteRequest);
collect_first!(kvrpcpb::RawDeleteResponse);
collect_single!(kvrpcpb::RawDeleteResponse);
impl SingleKey for kvrpcpb::RawDeleteRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -314,7 +355,7 @@ impl KvRequest for kvrpcpb::RawCasRequest {
}

shardable_key!(kvrpcpb::RawCasRequest);
collect_first!(kvrpcpb::RawCasResponse);
collect_single!(kvrpcpb::RawCasResponse);
impl SingleKey for kvrpcpb::RawCasRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -445,6 +486,7 @@ macro_rules! impl_raw_rpc_request {

impl_raw_rpc_request!(RawGetRequest);
impl_raw_rpc_request!(RawBatchGetRequest);
impl_raw_rpc_request!(RawGetKeyTtlRequest);
impl_raw_rpc_request!(RawPutRequest);
impl_raw_rpc_request!(RawBatchPutRequest);
impl_raw_rpc_request!(RawDeleteRequest);
Expand All @@ -456,6 +498,7 @@ impl_raw_rpc_request!(RawCasRequest);

impl HasLocks for kvrpcpb::RawGetResponse {}
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {}
impl HasLocks for kvrpcpb::RawPutResponse {}
impl HasLocks for kvrpcpb::RawBatchPutResponse {}
impl HasLocks for kvrpcpb::RawDeleteResponse {}
Expand Down
2 changes: 1 addition & 1 deletion src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ pub struct Collect;
pub struct CollectSingle;

#[macro_export]
macro_rules! collect_first {
macro_rules! collect_single {
($type_: ty) => {
impl Merge<$type_> for CollectSingle {
type Out = $type_;
Expand Down
8 changes: 4 additions & 4 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{
collect_first,
collect_single,
pd::PdClient,
request::{
Collect, CollectSingle, CollectWithShard, DefaultProcessor, KvRequest, Merge, Process,
Expand Down Expand Up @@ -75,7 +75,7 @@ impl KvRequest for kvrpcpb::GetRequest {
}

shardable_key!(kvrpcpb::GetRequest);
collect_first!(kvrpcpb::GetResponse);
collect_single!(kvrpcpb::GetResponse);
impl SingleKey for kvrpcpb::GetRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -191,7 +191,7 @@ impl KvRequest for kvrpcpb::CleanupRequest {
}

shardable_key!(kvrpcpb::CleanupRequest);
collect_first!(kvrpcpb::CleanupResponse);
collect_single!(kvrpcpb::CleanupResponse);
impl SingleKey for kvrpcpb::CleanupRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -515,7 +515,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest {
}
}

collect_first!(TxnHeartBeatResponse);
collect_single!(TxnHeartBeatResponse);

impl SingleKey for kvrpcpb::TxnHeartBeatRequest {
fn key(&self) -> &Vec<u8> {
Expand Down
2 changes: 2 additions & 0 deletions tikv-client-store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ has_region_error!(kvrpcpb::DeleteRangeResponse);
has_region_error!(kvrpcpb::GcResponse);
has_region_error!(kvrpcpb::RawGetResponse);
has_region_error!(kvrpcpb::RawBatchGetResponse);
has_region_error!(kvrpcpb::RawGetKeyTtlResponse);
has_region_error!(kvrpcpb::RawPutResponse);
has_region_error!(kvrpcpb::RawBatchPutResponse);
has_region_error!(kvrpcpb::RawDeleteResponse);
Expand Down Expand Up @@ -109,6 +110,7 @@ macro_rules! has_str_error {
}

has_str_error!(kvrpcpb::RawGetResponse);
has_str_error!(kvrpcpb::RawGetKeyTtlResponse);
has_str_error!(kvrpcpb::RawPutResponse);
has_str_error!(kvrpcpb::RawBatchPutResponse);
has_str_error!(kvrpcpb::RawDeleteResponse);
Expand Down
5 changes: 5 additions & 0 deletions tikv-client-store/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ macro_rules! impl_request {

impl_request!(RawGetRequest, raw_get_async_opt, "raw_get");
impl_request!(RawBatchGetRequest, raw_batch_get_async_opt, "raw_batch_get");
impl_request!(
RawGetKeyTtlRequest,
raw_get_key_ttl_async_opt,
"raw_get_key_ttl"
);
impl_request!(RawPutRequest, raw_put_async_opt, "raw_put");
impl_request!(RawBatchPutRequest, raw_batch_put_async_opt, "raw_batch_put");
impl_request!(RawDeleteRequest, raw_delete_async_opt, "raw_delete");
Expand Down

0 comments on commit 72d1520

Please sign in to comment.