diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 74b582ff592c..9bc3afb64497 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -137,7 +137,7 @@ jobs: - name: Setup LocalStack (AWS emulation) run: | - echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.0.2)" >> $GITHUB_ENV + echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack@sha256:a0b79cb2430f1818de2c66ce89d41bba40f5a1823410f5a7eaf3494b692eed97)" >> $GITHUB_ENV echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index 4331ae2a340a..2390187e7f72 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -242,40 +242,6 @@ impl DynamoCommit { } } - /// Retrieve a lock, returning an error if it doesn't exist - async fn get_lock(&self, s3: &S3Client, path: &str, etag: Option<&str>) -> Result { - let key_attributes = [ - ("path", AttributeValue::from(path)), - ("etag", AttributeValue::from(etag.unwrap_or("*"))), - ]; - let req = GetItem { - table_name: &self.table_name, - key: Map(&key_attributes), - }; - let credential = s3.config.get_credential().await?; - - let resp = self - .request(s3, credential.as_deref(), "DynamoDB_20120810.GetItem", req) - .await - .map_err(|e| e.error(STORE, path.to_string()))?; - - let body = resp.bytes().await.map_err(|e| Error::Generic { - store: STORE, - source: Box::new(e), - })?; - - let response: GetItemResponse<'_> = - serde_json::from_slice(body.as_ref()).map_err(|e| Error::Generic { - store: STORE, - source: Box::new(e), - })?; - - extract_lease(&response.item).ok_or_else(|| Error::NotFound { - path: path.into(), - source: "DynamoDB GetItem returned no items".to_string().into(), - }) - } - /// Attempt to acquire a lock, reclaiming an existing lease if provided async fn try_lock( &self, @@ -332,22 +298,10 @@ impl DynamoCommit { Err(e) => match parse_error_response(&e) { Some(e) if e.error.ends_with(CONFLICT) => match extract_lease(&e.item) { Some(lease) => Ok(TryLockResult::Conflict(lease)), - // ReturnValuesOnConditionCheckFailure is a relatively recent addition - // to DynamoDB and is not supported by dynamodb-local, which is used - // by localstack. In such cases the conflict error will not contain - // the conflicting item, and we must instead perform a get request - // - // There is a potential race here if the conflicting record is removed - // before we retrieve it. We could retry the transaction in such a scenario, - // but as this only occurs for emulators, we simply abort with a - // not found error - // - // - // - // - None => Ok(TryLockResult::Conflict( - self.get_lock(s3, path, etag).await?, - )), + None => Err(Error::Generic { + store: STORE, + source: "Failed to extract lease from conflict ReturnValuesOnConditionCheckFailure response".into() + }), }, _ => Err(Error::Generic { store: STORE, @@ -509,12 +463,6 @@ struct GetItem<'a> { key: Map<'a, &'a str, AttributeValue<'a>>, } -#[derive(Deserialize)] -struct GetItemResponse<'a> { - #[serde(borrow, default, rename = "Item")] - item: HashMap<&'a str, AttributeValue<'a>>, -} - #[derive(Deserialize)] struct ErrorResponse<'a> { #[serde(rename = "__type")]