Skip to content

Commit

Permalink
DynamoDB ConditionalPut (#5247)
Browse files Browse the repository at this point in the history
* Parse Dynamo CondititionalPut

* Add etag sort key

* Conditional Put

* Speedup repeated test runs

* Clippy
  • Loading branch information
tustvold authored Jan 4, 2024
1 parent 2f383e7 commit cf61bb8
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 47 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ jobs:
AWS_ENDPOINT: http://localhost:4566
AWS_ALLOW_HTTP: true
AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000
AWS_CONDITIONAL_PUT: dynamo:test-table:2000
HTTP_URL: "http://localhost:8080"
GOOGLE_BUCKET: test-bucket
GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
Expand All @@ -137,7 +138,7 @@ jobs:
docker run -d -p 4566:4566 localstack/localstack:3.0.1
docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=key,KeyType=HASH --attribute-definitions AttributeName=key,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
- name: Configure Azurite (Azure emulation)
# the magical connection string is from
Expand Down
155 changes: 122 additions & 33 deletions object_store/src/aws/dynamo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

//! A DynamoDB based lock system
use std::borrow::Cow;
use std::collections::HashMap;
use std::future::Future;
use std::time::{Duration, Instant};

use chrono::Utc;
Expand Down Expand Up @@ -61,16 +63,24 @@ const STORE: &str = "DynamoDB";
///
/// The DynamoDB schema is as follows:
///
/// * A string hash key named `"key"`
/// * A string partition key named `"path"`
/// * A string sort key named `"etag"`
/// * A numeric [TTL] attribute named `"ttl"`
/// * A numeric attribute named `"generation"`
/// * A numeric attribute named `"timeout"`
///
/// To perform a conditional operation on an object with a given `path` and `etag` (if exists),
/// An appropriate DynamoDB table can be created with the CLI as follows:
///
/// ```bash
/// $ aws dynamodb create-table --table-name <TABLE_NAME> --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S
/// $ aws dynamodb update-time-to-live --table-name <TABLE_NAME> --time-to-live-specification Enabled=true,AttributeName=ttl
/// ```
///
/// To perform a conditional operation on an object with a given `path` and `etag` (`*` if creating),
/// the commit protocol is as follows:
///
/// 1. Perform HEAD request on `path` and error on precondition mismatch
/// 2. Create record in DynamoDB with key `{path}#{etag}` with the configured timeout
/// 2. Create record in DynamoDB with given `path` and `etag` with the configured timeout
/// 1. On Success: Perform operation with the configured timeout
/// 2. On Conflict:
/// 1. Periodically re-perform HEAD request on `path` and error on precondition mismatch
Expand Down Expand Up @@ -154,6 +164,16 @@ impl DynamoCommit {
self
}

/// Parse [`DynamoCommit`] from a string
pub(crate) fn from_str(value: &str) -> Option<Self> {
Some(match value.split_once(':') {
Some((table_name, timeout)) => {
Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?)
}
None => Self::new(value.trim().to_string()),
})
}

/// Returns the name of the DynamoDB table.
pub(crate) fn table_name(&self) -> &str {
&self.table_name
Expand All @@ -165,23 +185,41 @@ impl DynamoCommit {
from: &Path,
to: &Path,
) -> Result<()> {
check_not_exists(client, to).await?;
self.conditional_op(client, to, None, || async {
client.copy_request(from, to).send().await?;
Ok(())
})
.await
}

#[allow(clippy::future_not_send)] // Generics confound this lint
pub(crate) async fn conditional_op<F, Fut, T>(
&self,
client: &S3Client,
to: &Path,
etag: Option<&str>,
op: F,
) -> Result<T>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<T, Error>>,
{
check_precondition(client, to, etag).await?;

let mut previous_lease = None;

loop {
let existing = previous_lease.as_ref();
match self.try_lock(client, to.as_ref(), existing).await? {
match self.try_lock(client, to.as_ref(), etag, existing).await? {
TryLockResult::Ok(lease) => {
let fut = client.copy_request(from, to).send();
let expiry = lease.acquire + lease.timeout;
return match tokio::time::timeout_at(expiry.into(), fut).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(e.into()),
return match tokio::time::timeout_at(expiry.into(), op()).await {
Ok(Ok(v)) => Ok(v),
Ok(Err(e)) => Err(e),
Err(_) => Err(Error::Generic {
store: "DynamoDB",
source: format!(
"Failed to perform copy operation in {} milliseconds",
"Failed to perform conditional operation in {} milliseconds",
self.timeout
)
.into(),
Expand All @@ -193,7 +231,7 @@ impl DynamoCommit {
let expiry = conflict.timeout * self.max_clock_skew_rate;
loop {
interval.tick().await;
check_not_exists(client, to).await?;
check_precondition(client, to, etag).await?;
if conflict.acquire.elapsed() > expiry {
previous_lease = Some(conflict);
break;
Expand All @@ -205,8 +243,11 @@ impl DynamoCommit {
}

/// Retrieve a lock, returning an error if it doesn't exist
async fn get_lock(&self, s3: &S3Client, key: &str) -> Result<Lease> {
let key_attributes = [("key", AttributeValue::String(key))];
async fn get_lock(&self, s3: &S3Client, path: &str, etag: Option<&str>) -> Result<Lease> {
let key_attributes = [
("path", AttributeValue::from(path)),
("etag", AttributeValue::from(etag.unwrap_or("*"))),
];
let req = GetItem {
table_name: &self.table_name,
key: Map(&key_attributes),
Expand All @@ -216,7 +257,7 @@ impl DynamoCommit {
let resp = self
.request(s3, credential.as_deref(), "DynamoDB_20120810.GetItem", req)
.await
.map_err(|e| e.error(STORE, key.to_string()))?;
.map_err(|e| e.error(STORE, path.to_string()))?;

let body = resp.bytes().await.map_err(|e| Error::Generic {
store: STORE,
Expand All @@ -230,7 +271,7 @@ impl DynamoCommit {
})?;

extract_lease(&response.item).ok_or_else(|| Error::NotFound {
path: key.into(),
path: path.into(),
source: "DynamoDB GetItem returned no items".to_string().into(),
})
}
Expand All @@ -239,7 +280,8 @@ impl DynamoCommit {
async fn try_lock(
&self,
s3: &S3Client,
key: &str,
path: &str,
etag: Option<&str>,
existing: Option<&Lease>,
) -> Result<TryLockResult> {
let attributes;
Expand All @@ -257,12 +299,13 @@ impl DynamoCommit {

let ttl = (Utc::now() + self.ttl).timestamp();
let items = [
("key", AttributeValue::String(key)),
("path", AttributeValue::from(path)),
("etag", AttributeValue::from(etag.unwrap_or("*"))),
("generation", AttributeValue::Number(next_gen)),
("timeout", AttributeValue::Number(self.timeout)),
("ttl", AttributeValue::Number(ttl as _)),
];
let names = [("#pk", "key")];
let names = [("#pk", "path")];

let req = PutItem {
table_name: &self.table_name,
Expand Down Expand Up @@ -302,7 +345,9 @@ impl DynamoCommit {
// <https://aws.amazon.com/about-aws/whats-new/2023/06/amazon-dynamodb-cost-failed-conditional-writes/>
// <https://repost.aws/questions/QUNfADrK4RT6WHe61RzTK8aw/dynamodblocal-support-for-returnvaluesonconditioncheckfailure-for-single-write-operations>
// <https://github.com/localstack/localstack/issues/9040>
None => Ok(TryLockResult::Conflict(self.get_lock(s3, key).await?)),
None => Ok(TryLockResult::Conflict(
self.get_lock(s3, path, etag).await?,
)),
},
_ => Err(Error::Generic {
store: STORE,
Expand Down Expand Up @@ -347,19 +392,37 @@ enum TryLockResult {
Conflict(Lease),
}

/// Returns an [`Error::AlreadyExists`] if `path` exists
async fn check_not_exists(client: &S3Client, path: &Path) -> Result<()> {
/// Validates that `path` has the given `etag` or doesn't exist if `None`
async fn check_precondition(client: &S3Client, path: &Path, etag: Option<&str>) -> Result<()> {
let options = GetOptions {
head: true,
..Default::default()
};
match client.get_opts(path, options).await {
Ok(_) => Err(Error::AlreadyExists {
path: path.to_string(),
source: "Already Exists".to_string().into(),
}),
Err(Error::NotFound { .. }) => Ok(()),
Err(e) => Err(e),

match etag {
Some(expected) => match client.get_opts(path, options).await {
Ok(r) => match r.meta.e_tag {
Some(actual) if expected == actual => Ok(()),
actual => Err(Error::Precondition {
path: path.to_string(),
source: format!("{} does not match {expected}", actual.unwrap_or_default())
.into(),
}),
},
Err(Error::NotFound { .. }) => Err(Error::Precondition {
path: path.to_string(),
source: format!("Object at location {path} not found").into(),
}),
Err(e) => Err(e),
},
None => match client.get_opts(path, options).await {
Ok(_) => Err(Error::AlreadyExists {
path: path.to_string(),
source: "Already Exists".to_string().into(),
}),
Err(Error::NotFound { .. }) => Ok(()),
Err(e) => Err(e),
},
}
}

Expand Down Expand Up @@ -493,11 +556,17 @@ impl<'a, K: Serialize, V: Serialize> Serialize for Map<'a, K, V> {
#[derive(Debug, Serialize, Deserialize)]
enum AttributeValue<'a> {
#[serde(rename = "S")]
String(&'a str),
String(Cow<'a, str>),
#[serde(rename = "N", with = "number")]
Number(u64),
}

impl<'a> From<&'a str> for AttributeValue<'a> {
fn from(value: &'a str) -> Self {
Self::String(Cow::Borrowed(value))
}
}

/// Numbers are serialized as strings
mod number {
use serde::{Deserialize, Deserializer, Serializer};
Expand All @@ -518,10 +587,11 @@ pub(crate) use tests::integration_test;

#[cfg(test)]
mod tests {

use super::*;
use crate::aws::AmazonS3;
use crate::ObjectStore;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};

#[test]
fn test_attribute_serde() {
Expand All @@ -544,24 +614,43 @@ mod tests {
let _ = integration.delete(&dst).await; // Delete if present

// Create a lock if not already exists
let existing = match d.try_lock(client, dst.as_ref(), None).await.unwrap() {
let existing = match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
TryLockResult::Conflict(l) => l,
TryLockResult::Ok(l) => l,
};

// Should not be able to acquire a lock again
let r = d.try_lock(client, dst.as_ref(), None).await;
let r = d.try_lock(client, dst.as_ref(), None, None).await;
assert!(matches!(r, Ok(TryLockResult::Conflict(_))));

// But should still be able to reclaim lock and perform copy
d.copy_if_not_exists(client, &src, &dst).await.unwrap();

match d.try_lock(client, dst.as_ref(), None).await.unwrap() {
match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
TryLockResult::Conflict(new) => {
// Should have incremented generation to do so
assert_eq!(new.generation, existing.generation + 1);
}
_ => panic!("Should conflict"),
}

let rng = thread_rng();
let etag = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
let t = Some(etag.as_str());

let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
TryLockResult::Ok(l) => l,
_ => panic!("should not conflict"),
};

match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
TryLockResult::Conflict(c) => assert_eq!(l.generation, c.generation),
_ => panic!("should conflict"),
}

match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() {
TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation + 1),
_ => panic!("should not conflict"),
}
}
}
18 changes: 16 additions & 2 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,26 @@ impl ObjectStore for AmazonS3 {
r => r,
}
}
(PutMode::Update(v), Some(S3ConditionalPut::ETagMatch)) => {
(PutMode::Create, Some(S3ConditionalPut::Dynamo(d))) => {
d.conditional_op(&self.client, location, None, move || request.do_put())
.await
}
(PutMode::Update(v), Some(put)) => {
let etag = v.e_tag.ok_or_else(|| Error::Generic {
store: STORE,
source: "ETag required for conditional put".to_string().into(),
})?;
request.header(&IF_MATCH, etag.as_str()).do_put().await
match put {
S3ConditionalPut::ETagMatch => {
request.header(&IF_MATCH, etag.as_str()).do_put().await
}
S3ConditionalPut::Dynamo(d) => {
d.conditional_op(&self.client, location, Some(&etag), move || {
request.do_put()
})
.await
}
}
}
}
}
Expand Down
Loading

0 comments on commit cf61bb8

Please sign in to comment.