Skip to content

Commit

Permalink
chore: minor refactor on etcd kvbackend (#3940)
Browse files Browse the repository at this point in the history
* chore: minor refactor on etcd kvbackend

* chore: avoid clone
  • Loading branch information
fengjiachun authored May 14, 2024
1 parent c04d024 commit 72897a2
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 40 deletions.
17 changes: 6 additions & 11 deletions src/common/meta/src/kv_backend/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;

fn convert_key_value(kv: etcd_client::KeyValue) -> KeyValue {
let (key, value) = kv.into_key_value();
KeyValue { key, value }
}

pub struct EtcdStore {
client: Client,
// Maximum number of operations permitted in a transaction.
Expand Down Expand Up @@ -123,7 +118,7 @@ impl KvBackend for EtcdStore {
let kvs = res
.take_kvs()
.into_iter()
.map(convert_key_value)
.map(KeyValue::from)
.collect::<Vec<_>>();

Ok(RangeResponse {
Expand All @@ -146,7 +141,7 @@ impl KvBackend for EtcdStore {
.await
.context(error::EtcdFailedSnafu)?;

let prev_kv = res.take_prev_key().map(convert_key_value);
let prev_kv = res.take_prev_key().map(KeyValue::from);
Ok(PutResponse { prev_kv })
}

Expand All @@ -165,7 +160,7 @@ impl KvBackend for EtcdStore {
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Put(mut put_res) => {
if let Some(prev_kv) = put_res.take_prev_key().map(convert_key_value) {
if let Some(prev_kv) = put_res.take_prev_key().map(KeyValue::from) {
prev_kvs.push(prev_kv);
}
}
Expand Down Expand Up @@ -194,7 +189,7 @@ impl KvBackend for EtcdStore {
TxnOpResponse::Get(get_res) => get_res,
_ => unreachable!(),
};
kvs.extend(get_res.take_kvs().into_iter().map(convert_key_value));
kvs.extend(get_res.take_kvs().into_iter().map(KeyValue::from));
}
}

Expand All @@ -214,7 +209,7 @@ impl KvBackend for EtcdStore {
let prev_kvs = res
.take_prev_kvs()
.into_iter()
.map(convert_key_value)
.map(KeyValue::from)
.collect::<Vec<_>>();

Ok(DeleteRangeResponse {
Expand Down Expand Up @@ -242,7 +237,7 @@ impl KvBackend for EtcdStore {
delete_res
.take_prev_kvs()
.into_iter()
.map(convert_key_value)
.map(KeyValue::from)
.for_each(|kv| {
prev_kvs.push(kv);
});
Expand Down
9 changes: 3 additions & 6 deletions src/common/meta/src/kv_backend/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,18 +268,15 @@ impl<T: ErrorExt + Send + Sync> TxnService for MemoryKvBackend<T> {

let do_txn = |txn_op| match txn_op {
TxnOp::Put(key, value) => {
kvs.insert(key.clone(), value);
kvs.insert(key, value);
TxnOpResponse::ResponsePut(PutResponse { prev_kv: None })
}

TxnOp::Get(key) => {
let value = kvs.get(&key);
let value = kvs.get(&key).cloned();
let kvs = value
.map(|value| KeyValue { key, value })
.into_iter()
.map(|value| KeyValue {
key: key.clone(),
value: value.clone(),
})
.collect();
TxnOpResponse::ResponseGet(RangeResponse { kvs, more: false })
}
Expand Down
37 changes: 19 additions & 18 deletions src/common/meta/src/kv_backend/txn/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use etcd_client::{
use super::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse, TxnResponse};
use crate::error::{self, Result};
use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
use crate::rpc::KeyValue;

impl From<Txn> for EtcdTxn {
fn from(txn: Txn) -> Self {
Expand Down Expand Up @@ -65,7 +66,7 @@ impl From<Compare> for EtcdCompare {
};
match cmp.target {
Some(target) => EtcdCompare::value(cmp.key, etcd_cmp, target),
// create revision 0 means key was not exist
// create revision 0 means key does not exist
None => EtcdCompare::create_revision(cmp.key, etcd_cmp, 0),
}
}
Expand All @@ -86,28 +87,28 @@ impl TryFrom<EtcdTxnOpResponse> for TxnOpResponse {

fn try_from(op_resp: EtcdTxnOpResponse) -> Result<Self> {
match op_resp {
EtcdTxnOpResponse::Put(res) => {
let prev_kv = res.prev_key().cloned().map(Into::into);
let put_res = PutResponse { prev_kv };
Ok(TxnOpResponse::ResponsePut(put_res))
EtcdTxnOpResponse::Put(mut res) => {
let prev_kv = res.take_prev_key().map(KeyValue::from);
Ok(TxnOpResponse::ResponsePut(PutResponse { prev_kv }))
}
EtcdTxnOpResponse::Get(res) => {
let kvs = res.kvs().iter().cloned().map(Into::into).collect();
let range_res = RangeResponse { kvs, more: false };
Ok(TxnOpResponse::ResponseGet(range_res))
EtcdTxnOpResponse::Get(mut res) => {
let kvs = res.take_kvs().into_iter().map(KeyValue::from).collect();
Ok(TxnOpResponse::ResponseGet(RangeResponse {
kvs,
more: false,
}))
}
EtcdTxnOpResponse::Delete(res) => {
EtcdTxnOpResponse::Delete(mut res) => {
let deleted = res.deleted();
let prev_kvs = res
.prev_kvs()
.iter()
.cloned()
.map(Into::into)
.take_prev_kvs()
.into_iter()
.map(KeyValue::from)
.collect::<Vec<_>>();
let delete_res = DeleteRangeResponse {
Ok(TxnOpResponse::ResponseDelete(DeleteRangeResponse {
deleted,
prev_kvs,
deleted: res.deleted(),
};
Ok(TxnOpResponse::ResponseDelete(delete_res))
}))
}
EtcdTxnOpResponse::Txn(_) => error::EtcdTxnOpResponseSnafu {
err_msg: "nested txn is not supported",
Expand Down
7 changes: 2 additions & 5 deletions src/log-store/src/raft_engine/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl TxnService for RaftEngineBackend {
let do_txn = |txn_op| match txn_op {
TxnOp::Put(key, value) => {
batch
.put(SYSTEM_NAMESPACE, key.clone(), value)
.put(SYSTEM_NAMESPACE, key, value)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Expand All @@ -113,11 +113,8 @@ impl TxnService for RaftEngineBackend {
TxnOp::Get(key) => {
let value = engine_get(&engine, &key)?.map(|kv| kv.value);
let kvs = value
.map(|value| KeyValue { key, value })
.into_iter()
.map(|value| KeyValue {
key: key.clone(),
value,
})
.collect();
Ok(TxnOpResponse::ResponseGet(RangeResponse {
kvs,
Expand Down

0 comments on commit 72897a2

Please sign in to comment.