From 35149f2b5eb9721e491e29f973bbdc366e0244c0 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 10 Oct 2023 14:49:02 +0100 Subject: [PATCH] DynamoDB lock (#4880) --- .github/workflows/object_store.yml | 2 + object_store/src/aws/client.rs | 43 ++-- object_store/src/aws/copy.rs | 53 +++++ object_store/src/aws/dynamo.rs | 333 +++++++++++++++++++++++++++++ object_store/src/aws/mod.rs | 59 ++--- 5 files changed, 434 insertions(+), 56 deletions(-) create mode 100644 object_store/src/aws/dynamo.rs diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index c28f8037a307..97d5ddac7edc 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -112,6 +112,7 @@ jobs: AWS_SECRET_ACCESS_KEY: test AWS_ENDPOINT: http://localhost:4566 AWS_ALLOW_HTTP: true + AWS_COPY_IF_NOT_EXISTS: dynamo:test-table HTTP_URL: "http://localhost:8080" GOOGLE_BUCKET: test-bucket GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" @@ -136,6 +137,7 @@ jobs: docker run -d -p 4566:4566 localstack/localstack:2.0 docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2 aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket + aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=key,KeyType=HASH --attribute-definitions AttributeName=key,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 - name: Configure Azurite (Azure emulation) # the magical connection string is from diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 1c35586f8bc9..6c5c96fc33c4 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -17,9 +17,7 @@ use crate::aws::checksum::Checksum; use crate::aws::credential::{AwsCredential, CredentialExt}; -use crate::aws::{ - AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET, -}; +use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET}; use crate::client::get::GetClient; use crate::client::list::ListClient; use crate::client::list_response::ListResponse; @@ -200,7 +198,7 @@ impl From for Error { #[derive(Debug)] pub struct S3Config { pub region: String, - pub endpoint: String, + pub endpoint: Option, pub bucket: String, pub bucket_endpoint: String, pub credentials: AwsCredentialProvider, @@ -208,7 +206,6 @@ pub struct S3Config { pub client_options: ClientOptions, pub sign_payload: bool, pub checksum: Option, - pub copy_if_not_exists: Option, } impl S3Config { @@ -219,8 +216,8 @@ impl S3Config { #[derive(Debug)] pub(crate) struct S3Client { - config: S3Config, - client: ReqwestClient, + pub config: S3Config, + pub client: ReqwestClient, } impl S3Client { @@ -229,12 +226,7 @@ impl S3Client { Ok(Self { config, client }) } - /// Returns the config - pub fn config(&self) -> &S3Config { - &self.config - } - - async fn get_credential(&self) -> Result> { + pub async fn get_credential(&self) -> Result> { self.config.credentials.get_credential().await } @@ -250,7 +242,7 @@ impl S3Client { let mut builder = self.client.request(Method::PUT, url); let mut payload_sha256 = None; - if let Some(checksum) = self.config().checksum { + if let Some(checksum) = self.config.checksum { let digest = checksum.digest(&bytes); builder = builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest)); @@ -264,7 +256,7 @@ impl S3Client { false => builder.body(bytes), }; - if let Some(value) = self.config().client_options.get_content_type(path) { + if let Some(value) = self.config.client_options.get_content_type(path) { builder = builder.header(CONTENT_TYPE, value); } @@ -377,7 +369,7 @@ impl S3Client { // Compute checksum - S3 *requires* this for DeleteObjects requests, so we default to // their algorithm if the user hasn't specified one. - let checksum = self.config().checksum.unwrap_or(Checksum::SHA256); + let checksum = self.config.checksum.unwrap_or(Checksum::SHA256); let digest = checksum.digest(&body); builder = builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest)); let payload_sha256 = if checksum == Checksum::SHA256 { @@ -427,11 +419,13 @@ impl S3Client { } /// Make an S3 Copy request + /// + /// `header` is an optional header-value pair to include in the request pub async fn copy_request( &self, from: &Path, to: &Path, - overwrite: bool, + header: Option<(&str, &str)>, ) -> Result<()> { let credential = self.get_credential().await?; let url = self.config.path_url(to); @@ -442,19 +436,8 @@ impl S3Client { .request(Method::PUT, url) .header("x-amz-copy-source", source); - if !overwrite { - match &self.config.copy_if_not_exists { - Some(S3CopyIfNotExists::Header(k, v)) => { - builder = builder.header(k, v); - } - None => { - return Err(crate::Error::NotSupported { - source: "S3 does not support copy-if-not-exists" - .to_string() - .into(), - }) - } - } + if let Some((k, v)) = header { + builder = builder.header(k, v); } builder diff --git a/object_store/src/aws/copy.rs b/object_store/src/aws/copy.rs index da4e2809be1a..25d967c6a2ff 100644 --- a/object_store/src/aws/copy.rs +++ b/object_store/src/aws/copy.rs @@ -39,12 +39,64 @@ pub enum S3CopyIfNotExists { /// /// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists Header(String, String), + /// The name of a DynamoDB table to use for coordination + /// + /// Encoded as `dynamodb:` ignoring whitespace + /// + /// This will use the same region, credentials and endpoint as configured for S3 + /// + /// ## Limitations + /// + /// Only conditional operations, e.g. `copy_if_not_exists` will be synchronized, and can + /// therefore race with non-conditional operations, e.g. `put`, `copy`, or conditional + /// operations performed by writers not configured to synchronize with DynamoDB. + /// + /// Workloads making use of this mechanism **must** ensure: + /// + /// * Conditional and non-conditional operations are not performed on the same paths + /// * Conditional operations are only performed via similarly configured clients + /// + /// Additionally as the locking mechanism relies on timeouts to detect stale locks, + /// performance will be poor for systems that frequently rewrite the same path, instead + /// being optimised for systems that primarily create files with paths never used before. + /// + /// ## Locking Protocol + /// + /// The DynamoDB schema is as follows: + /// + /// * A string hash key named `"key"` + /// * A numeric [TTL] attribute named `"ttl"` + /// * A numeric attribute named `"generation"` + /// + /// The lock procedure is as follows: + /// + /// * Error if file exists in S3 + /// * Create a corresponding record in DynamoDB with the path as the `"key"` + /// * On Success: Create object in S3 + /// * On Conflict: + /// * Periodically check if file exists in S3 + /// * After a 60 second timeout attempt to "claim" the lock by incrementing `"generation"` + /// * GOTO start + /// + /// This is inspired by the [DynamoDB Lock Client] but simplified for the more limited + /// requirements of synchronizing object storage. + /// + /// The major changes are: + /// + /// * Uses a monotonic generation count instead of a UUID rvn + /// * Relies on [TTL] to eventually clean up old locks + /// * Uses a hard-coded lease duration of 20 seconds + /// + /// [TTL]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html + /// [DynamoDB Lock Client]: https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/ + Dynamo(String), } impl std::fmt::Display for S3CopyIfNotExists { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Header(k, v) => write!(f, "header: {}: {}", k, v), + Self::Dynamo(table) => write!(f, "dynamo: {table}"), } } } @@ -57,6 +109,7 @@ impl S3CopyIfNotExists { let (k, v) = value.split_once(':')?; Some(Self::Header(k.trim().to_string(), v.trim().to_string())) } + "dynamo" => Some(Self::Dynamo(value.trim().to_string())), _ => None, } } diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs new file mode 100644 index 000000000000..bb4327d8e537 --- /dev/null +++ b/object_store/src/aws/dynamo.rs @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A DynamoDB based lock system + +use crate::aws::client::S3Client; +use crate::aws::credential::CredentialExt; +use crate::client::get::GetClientExt; +use crate::client::retry::Error as RetryError; +use crate::client::retry::RetryExt; +use crate::path::Path; +use crate::{Error, Result}; +use chrono::Utc; +use reqwest::StatusCode; +use serde::ser::SerializeMap; +use serde::{Deserialize, Serialize, Serializer}; +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +/// The timeout for a lease operation +const LEASE_TIMEOUT: Duration = Duration::from_secs(20); + +/// The length of time a lease is valid for +/// +/// This should be a multiple of [`LEASE_TIMEOUT`] where the multiple determines the maximum +/// clock skew rate tolerated by the system +const LEASE_EXPIRY: Duration = Duration::from_secs(60); + +/// The TTL offset to encode in DynamoDB +/// +/// This should be significantly larger than [`LEASE_EXPIRY`] to allow for clock skew +const LEASE_TTL: Duration = Duration::from_secs(60 * 60); + +/// The interval to check for creation whilst waiting for a lease to expire +const TEST_INTERVAL: Duration = Duration::from_secs(1); + +/// The exception returned by DynamoDB on conflict +const CONFLICT: &str = "com.amazonaws.dynamodb.v20120810#ConditionalCheckFailedException"; + +pub(crate) async fn copy_if_not_exists( + client: &S3Client, + table_name: &str, + from: &Path, + to: &Path, +) -> Result<()> { + check_not_exists(client, to).await?; + + let mut previous_lease = None; + + loop { + match try_lock(&client, table_name, to.as_ref(), previous_lease.as_ref()).await? { + TryLockResult::Ok(lease) => { + let fut = client.copy_request(from, to, None); + return match tokio::time::timeout_at(lease.timeout().into(), fut).await { + Ok(r) => r, + Err(_) => Err(Error::Generic { + store: "DynamoDB", + source: format!( + "Failed to perform copy operation in {} seconds", + LEASE_TIMEOUT.as_secs() + ) + .into(), + }), + }; + } + TryLockResult::Conflict(conflict) => { + let mut interval = tokio::time::interval(TEST_INTERVAL); + loop { + interval.tick().await; + check_not_exists(client, to).await?; + if conflict.expired() { + previous_lease = Some(conflict); + break; + } + } + } + } + } +} + +/// Returns an [`Error::AlreadyExists`] if `path` exists +async fn check_not_exists(client: &S3Client, path: &Path) -> Result<()> { + match client.head(path).await { + Ok(_) => Err(Error::AlreadyExists { + path: path.to_string(), + source: "Already Exists".to_string().into(), + }), + Err(Error::NotFound { .. }) => Ok(()), + Err(e) => Err(e), + } +} + +#[derive(Debug)] +enum TryLockResult { + /// Successfully acquired a lease + Ok(Lease), + /// An existing lease was found + Conflict(Lease), +} + +async fn try_lock( + s3: &S3Client, + table_name: &str, + key: &str, + existing: Option<&Lease>, +) -> Result { + let attributes; + let (next_gen, condition_expression, expression_attribute_values) = match existing { + None => (0_usize, "attribute_not_exists(#pk)", Map(&[])), + Some(existing) => { + attributes = [(":g", AttributeValue::Number(existing.generation))]; + ( + existing.generation.checked_add(1).unwrap(), + "attribute_exists(#pk) AND generation == :g", + Map(attributes.as_slice()), + ) + } + }; + + let ttl = (Utc::now() + LEASE_TTL).timestamp(); + let items = [ + ("key", AttributeValue::String(&key)), + ("generation", AttributeValue::Number(next_gen)), + ("ttl", AttributeValue::Number(ttl as _)), + ]; + let names = [("#pk", "key")]; + + let req = PutItem { + table_name, + condition_expression, + expression_attribute_values, + expression_attribute_names: Map(&names), + item: Map(&items), + return_values: None, + return_values_on_condition_check_failure: Some(ReturnValues::AllOld), + }; + + let credential = s3.get_credential().await?; + + let acquire = Instant::now(); + + let builder = match &s3.config.endpoint { + None => s3.client.post(&format!( + "https://dynamodb.{}.amazonaws.com", + s3.config.region + )), + Some(e) => s3.client.post(e), + }; + + let response = builder + .json(&req) + .header("X-Amz-Target", "DynamoDB_20120810.PutItem") + .with_aws_sigv4(&credential, &s3.config.region, "dynamodb", true, None) + .send_retry(&s3.config.retry_config) + .await; + + match response { + Ok(_) => Ok(TryLockResult::Ok(Lease { + acquire, + generation: next_gen, + })), + Err(e) => match try_extract_lease(&e) { + Some(lease) => Ok(TryLockResult::Conflict(lease)), + None => Err(Error::Generic { + store: "DynamoDB", + source: Box::new(e), + }), + }, + } +} + +/// If [`RetryError`] corresponds to [`CONFLICT`] extracts the pre-existing [`Lease`] +fn try_extract_lease(e: &RetryError) -> Option { + match e { + RetryError::Client { + status: StatusCode::BAD_REQUEST, + body: Some(b), + } => { + let resp: ErrorResponse<'_> = serde_json::from_str(b).ok()?; + if resp.error != CONFLICT { + return None; + } + + match resp.item.get("generation") { + Some(AttributeValue::Number(generation)) => Some(Lease { + acquire: Instant::now(), + generation: *generation, + }), + _ => None, + } + } + _ => None, + } +} + +/// A lock lease +#[derive(Debug, Clone)] +pub struct Lease { + acquire: Instant, + generation: usize, +} + +impl Lease { + /// Returns true if the lease has expired and can be reclaimed + pub fn expired(&self) -> bool { + self.acquire.elapsed() > LEASE_EXPIRY + } + + /// The timeout for this lease + pub fn timeout(&self) -> Instant { + self.acquire + LEASE_TIMEOUT + } +} + +/// A DynamoDB [PutItem] payload +/// +/// [PutItem]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html +#[derive(Serialize)] +#[serde(rename_all = "PascalCase")] +struct PutItem<'a> { + /// The table name + table_name: &'a str, + + /// A condition that must be satisfied in order for a conditional PutItem operation to succeed. + condition_expression: &'a str, + + /// One or more substitution tokens for attribute names in an expression + expression_attribute_names: Map<'a, &'a str, &'a str>, + + /// One or more values that can be substituted in an expression + expression_attribute_values: Map<'a, &'a str, AttributeValue<'a>>, + + /// A map of attribute name/value pairs, one for each attribute + item: Map<'a, &'a str, AttributeValue<'a>>, + + /// Use ReturnValues if you want to get the item attributes as they appeared + /// before they were updated with the PutItem request. + #[serde(skip_serializing_if = "Option::is_none")] + return_values: Option, + + /// An optional parameter that returns the item attributes for a PutItem operation + /// that failed a condition check. + #[serde(skip_serializing_if = "Option::is_none")] + return_values_on_condition_check_failure: Option, +} + +#[derive(Deserialize)] +struct ErrorResponse<'a> { + #[serde(rename = "__type")] + error: &'a str, + + #[serde(borrow, default, rename = "Item")] + item: HashMap<&'a str, AttributeValue<'a>>, +} + +#[derive(Serialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +enum ReturnValues { + AllOld, +} + +/// A collection of key value pairs +/// +/// This provides cheap, ordered serialization of maps +struct Map<'a, K, V>(&'a [(K, V)]); + +impl<'a, K: Serialize, V: Serialize> Serialize for Map<'a, K, V> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + if self.0.is_empty() { + return serializer.serialize_none(); + } + let mut map = serializer.serialize_map(Some(self.0.len()))?; + for (k, v) in self.0 { + map.serialize_entry(k, v)? + } + map.end() + } +} + +/// A DynamoDB [AttributeValue] +/// +/// [AttributeValue]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html +#[derive(Debug, Serialize, Deserialize)] +enum AttributeValue<'a> { + #[serde(rename = "S")] + String(&'a str), + #[serde(rename = "N", with = "number")] + Number(usize), +} + +/// Numbers are serialized as strings +mod number { + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(v: &usize, s: S) -> Result { + s.serialize_str(&v.to_string()) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result { + let v: &str = Deserialize::deserialize(d)?; + v.parse().map_err(serde::de::Error::custom) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_attribute_serde() { + let serde = serde_json::to_string(&AttributeValue::Number(23)).unwrap(); + assert_eq!(serde, "{\"N\":\"23\"}"); + let back: AttributeValue<'_> = serde_json::from_str(&serde).unwrap(); + assert!(matches!(back, AttributeValue::Number(23))); + } +} diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index db3e1b9a4bbe..ded022b599f1 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -65,6 +65,7 @@ mod checksum; mod client; mod copy; mod credential; +mod dynamo; pub use checksum::Checksum; pub use copy::S3CopyIfNotExists; @@ -196,18 +197,19 @@ pub async fn resolve_bucket_region( #[derive(Debug)] pub struct AmazonS3 { client: Arc, + copy_if_not_exists: Option, } impl std::fmt::Display for AmazonS3 { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "AmazonS3({})", self.client.config().bucket) + write!(f, "AmazonS3({})", self.client.config.bucket) } } impl AmazonS3 { /// Returns the [`AwsCredentialProvider`] used by [`AmazonS3`] pub fn credentials(&self) -> &AwsCredentialProvider { - &self.client.config().credentials + &self.client.config.credentials } } @@ -295,11 +297,21 @@ impl ObjectStore for AmazonS3 { } async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - self.client.copy_request(from, to, true).await + self.client.copy_request(from, to, None).await } async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - self.client.copy_request(from, to, false).await + match &self.copy_if_not_exists { + Some(S3CopyIfNotExists::Header(k, v)) => { + self.client.copy_request(from, to, Some((k, v))).await + } + Some(S3CopyIfNotExists::Dynamo(table_name)) => { + dynamo::copy_if_not_exists(&self.client, table_name, from, to).await + } + None => Err(crate::Error::NotSupported { + source: "S3 does not support copy-if-not-exists".to_string().into(), + }), + } } } @@ -552,7 +564,7 @@ impl AsRef for AmazonS3ConfigKey { Self::ContainerCredentialsRelativeUri => { "aws_container_credentials_relative_uri" } - Self::CopyIfNotExists => "copy_if_not_exists", + Self::CopyIfNotExists => "aws_copy_if_not_exists", Self::Client(opt) => opt.as_ref(), } } @@ -586,7 +598,7 @@ impl FromStr for AmazonS3ConfigKey { "aws_container_credentials_relative_uri" => { Ok(Self::ContainerCredentialsRelativeUri) } - "copy_if_not_exists" => Ok(Self::CopyIfNotExists), + "aws_copy_if_not_exists" | "copy_if_not_exists" => Ok(Self::CopyIfNotExists), // Backwards compatibility "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)), _ => match s.parse() { @@ -1063,40 +1075,38 @@ impl AmazonS3Builder { )) as _ }; - let endpoint: String; - let bucket_endpoint: String; - // If `endpoint` is provided then its assumed to be consistent with // `virtual_hosted_style_request`. i.e. if `virtual_hosted_style_request` is true then // `endpoint` should have bucket name included. - if self.virtual_hosted_style_request.get()? { - endpoint = self - .endpoint - .unwrap_or_else(|| format!("https://{bucket}.s3.{region}.amazonaws.com")); - bucket_endpoint = endpoint.clone(); + let bucket_endpoint = if self.virtual_hosted_style_request.get()? { + self.endpoint + .clone() + .unwrap_or_else(|| format!("https://{bucket}.s3.{region}.amazonaws.com")) } else { - endpoint = self - .endpoint - .unwrap_or_else(|| format!("https://s3.{region}.amazonaws.com")); - bucket_endpoint = format!("{endpoint}/{bucket}"); - } + match &self.endpoint { + None => format!("https://s3.{region}.amazonaws.com/{bucket}"), + Some(endpoint) => format!("{endpoint}/{bucket}"), + } + }; let config = S3Config { region, - endpoint, bucket, bucket_endpoint, credentials, + endpoint: self.endpoint, retry_config: self.retry_config, client_options: self.client_options, sign_payload: !self.unsigned_payload.get()?, checksum, - copy_if_not_exists, }; let client = Arc::new(S3Client::new(config)?); - Ok(AmazonS3 { client }) + Ok(AmazonS3 { + client, + copy_if_not_exists, + }) } } @@ -1207,7 +1217,6 @@ mod tests { let config = AmazonS3Builder::from_env(); let is_local = matches!(&config.endpoint, Some(e) if e.starts_with("http://")); - let test_not_exists = config.copy_if_not_exists.is_some(); let integration = config.build().unwrap(); // Localstack doesn't support listing with spaces https://github.com/localstack/localstack/issues/6328 @@ -1217,9 +1226,7 @@ mod tests { list_with_delimiter(&integration).await; rename_and_copy(&integration).await; stream_get(&integration).await; - if test_not_exists { - copy_if_not_exists(&integration).await; - } + copy_if_not_exists(&integration).await; // run integration test with unsigned payload enabled let config = AmazonS3Builder::from_env().with_unsigned_payload(true);