Skip to content

Commit

Permalink
Merge #1791
Browse files Browse the repository at this point in the history
1791: feat(io-engine): add persistent store transaction API r=dsharma-dc a=dsharma-dc

This change adds API to do Compare-and-Swap operations to the persistent store via io-engine.

Co-authored-by: Diwakar Sharma <[email protected]>
  • Loading branch information
mayastor-bors and dsharma-dc committed Jan 15, 2025
2 parents c990c0e + 478fe11 commit ce061d6
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 8 deletions.
Binary file removed io-engine/local-randrw-0-verify.state
Binary file not shown.
25 changes: 24 additions & 1 deletion io-engine/src/persistent_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ use crate::{
core::Reactor,
store::{
etcd::Etcd,
store_defs::{DeleteWait, GetWait, PutWait, Store, StoreError, StoreKey, StoreValue},
store_defs::{
DeleteWait, GetWait, PutWait, Store, StoreError, StoreKey, StoreValue, TxnWait,
},
},
};
use etcd_client::{Compare, TxnOp, TxnResponse};
use futures::channel::oneshot;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
Expand Down Expand Up @@ -193,6 +196,26 @@ impl PersistentStore {
})?
}

/// Executes a transaction for the given key.
pub async fn txn(
key: &impl StoreKey,
cmps: Vec<Compare>,
ops_success: Vec<TxnOp>,
ops_failure: Option<Vec<TxnOp>>,
) -> Result<TxnResponse, StoreError> {
let key_string = key.to_string();
let rx = Self::execute_store_op(async move {
info!("Executing transaction for key {}.", key_string);
Self::backing_store()
.txn_kv(&key_string, cmps, ops_success, ops_failure)
.await
});

rx.await.context(TxnWait {
key: key.to_string(),
})?
}

/// Retrieves a value, with the given key, from the store.
pub async fn get(key: &impl StoreKey) -> Result<Value, StoreError> {
let key_string = key.to_string();
Expand Down
23 changes: 21 additions & 2 deletions io-engine/src/store/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
use crate::store::store_defs::{
Connect, Delete, DeserialiseValue, Get, Put, SerialiseValue, Store, StoreError,
StoreError::MissingEntry, StoreKey, StoreValue, ValueString,
StoreError::MissingEntry, StoreKey, StoreValue, Txn as TxnErr, ValueString,
};
use async_trait::async_trait;
use etcd_client::Client;
use etcd_client::{Client, Compare, Txn, TxnOp, TxnResponse};
use serde_json::Value;
use snafu::ResultExt;

Expand Down Expand Up @@ -49,6 +49,25 @@ impl Store for Etcd {
Ok(())
}

/// Executes a transaction for the given key. If the compares succeed, then
/// ops_success will be executed atomically, otherwise ops_failure will be
/// executed atomically.
async fn txn_kv<K: StoreKey>(
&mut self,
key: &K,
cmps: Vec<Compare>,
ops_success: Vec<TxnOp>,
ops_failure: Option<Vec<TxnOp>>,
) -> Result<TxnResponse, StoreError> {
let fops = ops_failure.map_or(vec![], |v| v);
self.0
.txn(Txn::new().when(cmps).and_then(ops_success).or_else(fops))
.await
.context(TxnErr {
key: key.to_string(),
})
}

/// 'Get' the value for the given key from etcd.
async fn get_kv<K: StoreKey>(&mut self, key: &K) -> Result<Value, StoreError> {
let resp = self.0.get(key.to_string(), None).await.context(Get {
Expand Down
22 changes: 21 additions & 1 deletion io-engine/src/store/store_defs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Definition of a trait for a key-value store together with its error codes.
use async_trait::async_trait;
use etcd_client::Error;
use etcd_client::{Compare, Error, TxnOp, TxnResponse};
use serde_json::{Error as SerdeError, Value};
use snafu::Snafu;

Expand Down Expand Up @@ -56,6 +56,18 @@ pub enum StoreError {
key: String,
source: futures::channel::oneshot::Canceled,
},
/// Failed to complete a 'transaction'.
#[snafu(display("Failed to do 'transaction' for key {}. Error {}", key, source))]
Txn { key: String, source: Error },
/// Failed to wait for 'transaction' operations.
#[snafu(display(
"Failed to wait for 'transaction' operations to complete for key {}.",
key,
))]
TxnWait {
key: String,
source: futures::channel::oneshot::Canceled,
},
/// Failed to 'watch' an entry in the store.
#[snafu(display("Failed to 'watch' entry with key {}. Error {}", key, source))]
Watch { key: String, source: Error },
Expand Down Expand Up @@ -93,6 +105,14 @@ pub trait Store: Sync + Send + Clone {
value: &V,
) -> Result<(), StoreError>;

async fn txn_kv<K: StoreKey>(
&mut self,
key: &K,
cmps: Vec<Compare>,
ops_success: Vec<TxnOp>,
ops_failure: Option<Vec<TxnOp>>,
) -> Result<TxnResponse, StoreError>;

/// Get an entry from the store.
async fn get_kv<K: StoreKey>(&mut self, key: &K) -> Result<Value, StoreError>;

Expand Down
106 changes: 102 additions & 4 deletions io-engine/tests/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ use common::compose::{
},
GrpcConnect, RpcHandle,
},
Binary, Builder, ComposeTest, ContainerSpec,
Binary, Builder, ComposeTest, ContainerSpec, MayastorTest,
};
use etcd_client::Client;

use io_engine::bdev::nexus::{ChildInfo, NexusInfo};
use etcd_client::{Client, Compare, CompareOp, PutOptions, TxnOp, TxnOpResponse};

use io_engine::{
bdev::nexus::{ChildInfo, NexusInfo},
core::MayastorCliArgs,
persistent_store::{PersistentStore, PersistentStoreBuilder},
};
use std::{convert::TryFrom, thread::sleep, time::Duration};
use url::Url;

pub mod common;
use once_cell::sync::OnceCell;

static MAYASTOR: OnceCell<MayastorTest> = OnceCell::new();
static ETCD_ENDPOINT: &str = "0.0.0.0:2379";
static CHILD1_UUID: &str = "d61b2fdf-1be8-457a-a481-70a42d0a2223";
static CHILD2_UUID: &str = "094ae8c6-46aa-4139-b4f2-550d39645db3";
Expand Down Expand Up @@ -519,3 +524,96 @@ pub(crate) fn uuid(uri: &str) -> String {
}
panic!("URI does not contain a uuid query parameter.");
}

// This test does a successful etcd transaction that upon successful CompareOp, modifies
// an existing key and adds a new key. Then does another transaction which is
// fails the CompareOp and hence that transaction is expected to fail.
#[tokio::test]
async fn pstor_txn_api() {
let dummy_key1 = "dummy_key_1".to_string();
let dummy_key2 = "dummy_key_2".to_string();
let pre_txn_value_k1 = "pre_txn_value_key1".to_string();
let post_txn_value_k1 = "post_txn_value_key1".to_string();
let post_txn_value_k2 = "post_txn_value_key2".to_string();
common::composer_init();

let _test = Builder::new()
.name("etcd_txn_test")
.add_container_spec(
ContainerSpec::from_binary(
"etcd",
Binary::from_path(env!("ETCD_BIN")).with_args(vec![
"--data-dir",
"/tmp/etcd-data",
"--advertise-client-urls",
"http://0.0.0.0:2379",
"--listen-client-urls",
"http://0.0.0.0:2379",
]),
)
.with_portmap("2379", "2379")
.with_portmap("2380", "2380"),
)
.build()
.await
.unwrap();

PersistentStoreBuilder::new()
.with_endpoint(ETCD_ENDPOINT)
.connect()
.await;

// create the mayastor test instance
let ms = MayastorTest::new(MayastorCliArgs {
log_components: vec!["all".into()],
reactor_mask: "0x3".to_string(),
no_pci: true,
..Default::default()
});

let _ms = MAYASTOR.get_or_init(|| ms);
MAYASTOR
.get()
.unwrap()
.spawn(async move {
let _ = PersistentStore::put(&dummy_key1, &pre_txn_value_k1).await;
let expect = serde_json::to_vec(&pre_txn_value_k1).unwrap();

// Transaction - expected to succeed.
let cmp = vec![Compare::value(
dummy_key1.to_string(),
CompareOp::Equal,
expect,
)];
let ops = vec![
// Test to validate prev_kv for one of the key modified by transaction.
TxnOp::put(
dummy_key1.to_string(),
post_txn_value_k1,
Some(PutOptions::new().with_prev_key()),
),
TxnOp::put(dummy_key2.to_string(), post_txn_value_k2, None),
];
let txn_resp = PersistentStore::txn(&dummy_key1, cmp.clone(), ops, None)
.await
.unwrap();
assert!(txn_resp.succeeded());

match &txn_resp.op_responses()[0] {
TxnOpResponse::Put(p) => {
let prev_val = p.prev_key().unwrap().value_str().unwrap().trim_matches('"');
assert_eq!(prev_val, pre_txn_value_k1);
}
_ => unreachable!("Unexpected txnop response"),
}

// A new transaction - expected to fail. Execute fops upon compare failure.
let ops = vec![TxnOp::delete(dummy_key1.to_string(), None)];
let fops = vec![TxnOp::delete(dummy_key2.to_string(), None)];
let txn_resp = PersistentStore::txn(&dummy_key1, cmp, ops, Some(fops))
.await
.unwrap();
assert!(!txn_resp.succeeded());
})
.await;
}

0 comments on commit ce061d6

Please sign in to comment.