Skip to content

Commit

Permalink
DynamoDB lock (apache#4880)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 11, 2023
1 parent 556c5ff commit 35149f2
Show file tree
Hide file tree
Showing 5 changed files with 434 additions and 56 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ jobs:
AWS_SECRET_ACCESS_KEY: test
AWS_ENDPOINT: http://localhost:4566
AWS_ALLOW_HTTP: true
AWS_COPY_IF_NOT_EXISTS: dynamo:test-table
HTTP_URL: "http://localhost:8080"
GOOGLE_BUCKET: test-bucket
GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
Expand All @@ -136,6 +137,7 @@ jobs:
docker run -d -p 4566:4566 localstack/localstack:2.0
docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=key,KeyType=HASH --attribute-definitions AttributeName=key,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
- name: Configure Azurite (Azure emulation)
# the magical connection string is from
Expand Down
43 changes: 13 additions & 30 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt};
use crate::aws::{
AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET,
};
use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET};
use crate::client::get::GetClient;
use crate::client::list::ListClient;
use crate::client::list_response::ListResponse;
Expand Down Expand Up @@ -200,15 +198,14 @@ impl From<DeleteError> for Error {
#[derive(Debug)]
pub struct S3Config {
pub region: String,
pub endpoint: String,
pub endpoint: Option<String>,
pub bucket: String,
pub bucket_endpoint: String,
pub credentials: AwsCredentialProvider,
pub retry_config: RetryConfig,
pub client_options: ClientOptions,
pub sign_payload: bool,
pub checksum: Option<Checksum>,
pub copy_if_not_exists: Option<S3CopyIfNotExists>,
}

impl S3Config {
Expand All @@ -219,8 +216,8 @@ impl S3Config {

#[derive(Debug)]
pub(crate) struct S3Client {
config: S3Config,
client: ReqwestClient,
pub config: S3Config,
pub client: ReqwestClient,
}

impl S3Client {
Expand All @@ -229,12 +226,7 @@ impl S3Client {
Ok(Self { config, client })
}

/// Returns the config
pub fn config(&self) -> &S3Config {
&self.config
}

async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
pub async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
self.config.credentials.get_credential().await
}

Expand All @@ -250,7 +242,7 @@ impl S3Client {
let mut builder = self.client.request(Method::PUT, url);
let mut payload_sha256 = None;

if let Some(checksum) = self.config().checksum {
if let Some(checksum) = self.config.checksum {
let digest = checksum.digest(&bytes);
builder =
builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest));
Expand All @@ -264,7 +256,7 @@ impl S3Client {
false => builder.body(bytes),
};

if let Some(value) = self.config().client_options.get_content_type(path) {
if let Some(value) = self.config.client_options.get_content_type(path) {
builder = builder.header(CONTENT_TYPE, value);
}

Expand Down Expand Up @@ -377,7 +369,7 @@ impl S3Client {

// Compute checksum - S3 *requires* this for DeleteObjects requests, so we default to
// their algorithm if the user hasn't specified one.
let checksum = self.config().checksum.unwrap_or(Checksum::SHA256);
let checksum = self.config.checksum.unwrap_or(Checksum::SHA256);
let digest = checksum.digest(&body);
builder = builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest));
let payload_sha256 = if checksum == Checksum::SHA256 {
Expand Down Expand Up @@ -427,11 +419,13 @@ impl S3Client {
}

/// Make an S3 Copy request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
///
/// `header` is an optional header-value pair to include in the request
pub async fn copy_request(
&self,
from: &Path,
to: &Path,
overwrite: bool,
header: Option<(&str, &str)>,
) -> Result<()> {
let credential = self.get_credential().await?;
let url = self.config.path_url(to);
Expand All @@ -442,19 +436,8 @@ impl S3Client {
.request(Method::PUT, url)
.header("x-amz-copy-source", source);

if !overwrite {
match &self.config.copy_if_not_exists {
Some(S3CopyIfNotExists::Header(k, v)) => {
builder = builder.header(k, v);
}
None => {
return Err(crate::Error::NotSupported {
source: "S3 does not support copy-if-not-exists"
.to_string()
.into(),
})
}
}
if let Some((k, v)) = header {
builder = builder.header(k, v);
}

builder
Expand Down
53 changes: 53 additions & 0 deletions object_store/src/aws/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,64 @@ pub enum S3CopyIfNotExists {
///
/// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists
Header(String, String),
/// The name of a DynamoDB table to use for coordination
///
/// Encoded as `dynamodb:<TABLE_NAME>` ignoring whitespace
///
/// This will use the same region, credentials and endpoint as configured for S3
///
/// ## Limitations
///
/// Only conditional operations, e.g. `copy_if_not_exists` will be synchronized, and can
/// therefore race with non-conditional operations, e.g. `put`, `copy`, or conditional
/// operations performed by writers not configured to synchronize with DynamoDB.
///
/// Workloads making use of this mechanism **must** ensure:
///
/// * Conditional and non-conditional operations are not performed on the same paths
/// * Conditional operations are only performed via similarly configured clients
///
/// Additionally as the locking mechanism relies on timeouts to detect stale locks,
/// performance will be poor for systems that frequently rewrite the same path, instead
/// being optimised for systems that primarily create files with paths never used before.
///
/// ## Locking Protocol
///
/// The DynamoDB schema is as follows:
///
/// * A string hash key named `"key"`
/// * A numeric [TTL] attribute named `"ttl"`
/// * A numeric attribute named `"generation"`
///
/// The lock procedure is as follows:
///
/// * Error if file exists in S3
/// * Create a corresponding record in DynamoDB with the path as the `"key"`
/// * On Success: Create object in S3
/// * On Conflict:
/// * Periodically check if file exists in S3
/// * After a 60 second timeout attempt to "claim" the lock by incrementing `"generation"`
/// * GOTO start
///
/// This is inspired by the [DynamoDB Lock Client] but simplified for the more limited
/// requirements of synchronizing object storage.
///
/// The major changes are:
///
/// * Uses a monotonic generation count instead of a UUID rvn
/// * Relies on [TTL] to eventually clean up old locks
/// * Uses a hard-coded lease duration of 20 seconds
///
/// [TTL]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html
/// [DynamoDB Lock Client]: https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/
Dynamo(String),
}

impl std::fmt::Display for S3CopyIfNotExists {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Header(k, v) => write!(f, "header: {}: {}", k, v),
Self::Dynamo(table) => write!(f, "dynamo: {table}"),
}
}
}
Expand All @@ -57,6 +109,7 @@ impl S3CopyIfNotExists {
let (k, v) = value.split_once(':')?;
Some(Self::Header(k.trim().to_string(), v.trim().to_string()))
}
"dynamo" => Some(Self::Dynamo(value.trim().to_string())),
_ => None,
}
}
Expand Down
Loading

0 comments on commit 35149f2

Please sign in to comment.