Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use txn to impl cas #3936

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions src/catalog/src/kvbackend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,8 @@ mod tests {
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest,
RangeResponse,
BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use dashmap::DashMap;
Expand Down Expand Up @@ -519,13 +518,6 @@ mod tests {
unimplemented!()
}

async fn compare_and_put(
&self,
_req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
unimplemented!()
}

async fn delete_range(
&self,
_req: DeleteRangeRequest,
Expand Down
33 changes: 28 additions & 5 deletions src/common/meta/src/kv_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_error::ext::ErrorExt;
pub use txn::TxnService;

use crate::error::Error;
use crate::kv_backend::txn::{Txn, TxnOpResponse};
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
Expand Down Expand Up @@ -52,11 +53,6 @@ where

async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;

async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error>;

async fn delete_range(
&self,
req: DeleteRangeRequest,
Expand All @@ -80,6 +76,33 @@ where
})
}

/// CAS: Compares the value at the key with the given value, and if they are
/// equal, puts the new value at the key.
async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
let CompareAndPutRequest { key, expect, value } = req;
let txn = if expect.is_empty() {
Txn::put_if_not_exists(key, value)
} else {
Txn::compare_and_put(key, expect, value)
};
let txn_res = self.txn(txn).await?;

let success = txn_res.succeeded;
// The response is guaranteed to have at most one element.
let op_res = txn_res.responses.into_iter().next();
let prev_kv = match op_res {
Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
None => None,
};

Ok(CompareAndPutResponse { success, prev_kv })
}

/// Puts a value at a key. If `if_not_exists` is `true`, the operation
/// ensures the key does not exist before applying the PUT operation.
/// Otherwise, it simply applies the PUT operation without checking for
Expand Down
94 changes: 4 additions & 90 deletions src/common/meta/src/kv_backend/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ use std::any::Any;
use std::sync::Arc;

use etcd_client::{
Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse,
TxnResponse,
Client, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, TxnResponse,
};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};

use super::KvBackendRef;
use crate::error::{self, Error, Result};
Expand All @@ -28,8 +27,8 @@ use crate::kv_backend::{KvBackend, TxnService};
use crate::metrics::METRIC_META_TXN_REQUEST;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;

Expand Down Expand Up @@ -202,53 +201,6 @@ impl KvBackend for EtcdStore {
Ok(BatchGetResponse { kvs })
}

async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
let CompareAndPut {
key,
expect,
value,
put_options,
} = req.try_into()?;

let compare = if expect.is_empty() {
// create if absent
// revision 0 means key was not exist
Compare::create_revision(key.clone(), CompareOp::Equal, 0)
} else {
// compare and put
Compare::value(key.clone(), CompareOp::Equal, expect)
};
let put = TxnOp::put(key.clone(), value, put_options);
let get = TxnOp::get(key, None);
let txn = Txn::new()
.when(vec![compare])
.and_then(vec![put])
.or_else(vec![get]);

let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;

let success = txn_res.succeeded();
let op_res = txn_res
.op_responses()
.pop()
.context(error::InvalidTxnResultSnafu {
err_msg: "empty response",
})?;

let prev_kv = match op_res {
TxnOpResponse::Put(mut res) => res.take_prev_key().map(convert_key_value),
TxnOpResponse::Get(mut res) => res.take_kvs().into_iter().next().map(convert_key_value),
_ => unreachable!(),
};

Ok(CompareAndPutResponse { success, prev_kv })
}

async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let Delete { key, options } = req.try_into()?;

Expand Down Expand Up @@ -461,28 +413,6 @@ impl TryFrom<BatchDeleteRequest> for BatchDelete {
}
}

struct CompareAndPut {
key: Vec<u8>,
expect: Vec<u8>,
value: Vec<u8>,
put_options: Option<PutOptions>,
}

impl TryFrom<CompareAndPutRequest> for CompareAndPut {
type Error = Error;

fn try_from(req: CompareAndPutRequest) -> Result<Self> {
let CompareAndPutRequest { key, expect, value } = req;

Ok(CompareAndPut {
key,
expect,
value,
put_options: Some(PutOptions::default().with_prev_key()),
})
}
}

struct Delete {
key: Vec<u8>,
options: Option<DeleteOptions>,
Expand Down Expand Up @@ -597,22 +527,6 @@ mod tests {
let _ = batch_delete.options.unwrap();
}

#[test]
fn test_parse_compare_and_put() {
let req = CompareAndPutRequest {
key: b"test_key".to_vec(),
expect: b"test_expect".to_vec(),
value: b"test_value".to_vec(),
};

let compare_and_put: CompareAndPut = req.try_into().unwrap();

assert_eq!(b"test_key".to_vec(), compare_and_put.key);
assert_eq!(b"test_expect".to_vec(), compare_and_put.expect);
assert_eq!(b"test_value".to_vec(), compare_and_put.value);
let _ = compare_and_put.put_options.unwrap();
}

#[test]
fn test_parse_delete() {
let req = DeleteRangeRequest {
Expand Down
40 changes: 2 additions & 38 deletions src/common/meta/src/kv_backend/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::any::Any;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::marker::PhantomData;
Expand All @@ -29,8 +28,8 @@ use crate::kv_backend::{KvBackend, TxnService};
use crate::metrics::METRIC_META_TXN_REQUEST;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;

Expand Down Expand Up @@ -190,41 +189,6 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
Ok(BatchGetResponse { kvs })
}

async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
let CompareAndPutRequest { key, expect, value } = req;

let mut kvs = self.kvs.write().unwrap();

let existed = kvs.entry(key);
let (success, prev_kv) = match existed {
Entry::Vacant(e) => {
let expected = expect.is_empty();
if expected {
let _ = e.insert(value);
}
(expected, None)
}
Entry::Occupied(mut existed) => {
let expected = existed.get() == &expect;
let prev_kv = if expected {
let _ = existed.insert(value);
None
} else {
Some(KeyValue {
key: existed.key().clone(),
value: existed.get().clone(),
})
};
(expected, prev_kv)
}
};

Ok(CompareAndPutResponse { success, prev_kv })
}

async fn delete_range(
&self,
req: DeleteRangeRequest,
Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/kv_backend/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ impl Txn {
}

/// Builds a transaction that puts a value at a key if the key exists and the value
/// is equal to `old_value`.
pub fn compare_and_put(key: Vec<u8>, old_value: Vec<u8>, value: Vec<u8>) -> Self {
/// is equal to `expect`.
pub fn compare_and_put(key: Vec<u8>, expect: Vec<u8>, value: Vec<u8>) -> Self {
Self::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
old_value,
expect,
)])
.and_then(vec![TxnOp::Put(key.clone(), value)])
.or_else(vec![TxnOp::Get(key)])
Expand Down
41 changes: 3 additions & 38 deletions src/log-store/src/raft_engine/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnRes
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_meta::util::get_next_prefix_key;
Expand Down Expand Up @@ -277,42 +277,6 @@ impl KvBackend for RaftEngineBackend {
Ok(response)
}

async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
let CompareAndPutRequest { key, expect, value } = req;

let mut batch = LogBatch::with_capacity(1);
let engine = self.engine.write().unwrap();
let existing = engine_get(&engine, &key)?;
let eq = existing
.as_ref()
.map(|kv| kv.value == expect)
.unwrap_or_else(|| {
// if the associated value of key does not exist and expect is empty,
// then we still consider them as equal.
expect.is_empty()
});

if eq {
batch
.put(SYSTEM_NAMESPACE, key, value)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
engine
.write(&mut batch, false)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
}
Ok(CompareAndPutResponse {
success: eq,
prev_kv: existing,
})
}

async fn delete_range(
&self,
req: DeleteRangeRequest,
Expand Down Expand Up @@ -436,6 +400,7 @@ mod tests {
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
};
use common_meta::rpc::store::{CompareAndPutRequest, CompareAndPutResponse};
use common_test_util::temp_dir::create_temp_dir;
use raft_engine::{Config, ReadableSize, RecoveryMode};

Expand Down
Loading