Skip to content

Commit

Permalink
feat(services/azblob): support multi write for azblob (#4181)
Browse files Browse the repository at this point in the history
* save work

* save work

* finish framework

* add some comments

* minor

* minor

* remove redudant clone

* add base64 encode

* minor fix

* put blocklist part should contain block_id

* minor fix

* fix base64 encode

* should do url encode after base64 encode

* use latest when complete put block list

* enhance ut

* put block list use put not post

* add content length

* empty commit for retry

* resolve some comments

* come bacl, to_string for block_id

* fmt

* resolve comments

* remove .simple() to see whether CI can pass

* do not percent_encode_path in xml body

* use BASE64_STANDARD
  • Loading branch information
wcy-fdu authored Feb 18, 2024
1 parent 93e5f65 commit e668413
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 38 deletions.
5 changes: 3 additions & 2 deletions core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,9 @@ impl Accessor for AzblobBackend {
read_with_override_content_disposition: true,

write: true,
write_can_empty: true,
write_can_append: true,
write_can_empty: true,
write_can_multi: true,
write_with_cache_control: true,
write_with_content_type: true,

Expand Down Expand Up @@ -631,7 +632,7 @@ impl Accessor for AzblobBackend {
let w = if args.append() {
AzblobWriters::Two(oio::AppendWriter::new(w))
} else {
AzblobWriters::One(oio::OneShotWriter::new(w))
AzblobWriters::One(oio::BlockWriter::new(w, args.concurrent()))
};

Ok((RpWrite::default(), w))
Expand Down
174 changes: 167 additions & 7 deletions core/src/services/azblob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
use std::time::Duration;

use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use http::header::HeaderName;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
Expand All @@ -32,7 +29,13 @@ use http::Response;
use reqsign::AzureStorageCredential;
use reqsign::AzureStorageLoader;
use reqsign::AzureStorageSigner;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
use std::time::Duration;
use uuid::Uuid;

use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -370,6 +373,118 @@ impl AzblobCore {
Ok(req)
}

pub fn azblob_put_block_request(
&self,
path: &str,
block_id: Uuid,
size: Option<u64>,
args: &OpWrite,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
// To be written as part of a blob, a block must have been successfully written to the server in an earlier Put Block operation.
// refer to https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id
let p = build_abs_path(&self.root, path);

let encoded_block_id: String =
percent_encode_path(&BASE64_STANDARD.encode(block_id.as_bytes()));
let url = format!(
"{}/{}/{}?comp=block&blockid={}",
self.endpoint,
self.container,
percent_encode_path(&p),
encoded_block_id,
);
let mut req = Request::put(&url);
// Set SSE headers.
req = self.insert_sse_headers(req);

if let Some(cache_control) = args.cache_control() {
req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control);
}
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size)
}

if let Some(ty) = args.content_type() {
req = req.header(CONTENT_TYPE, ty)
}
// Set body
let req = req.body(body).map_err(new_request_build_error)?;

Ok(req)
}

pub async fn azblob_put_block(
&self,
path: &str,
block_id: Uuid,
size: Option<u64>,
args: &OpWrite,
body: AsyncBody,
) -> Result<Response<IncomingAsyncBody>> {
let mut req = self.azblob_put_block_request(path, block_id, size, args, body)?;

self.sign(&mut req).await?;
self.send(req).await
}

pub async fn azblob_complete_put_block_list_request(
&self,
path: &str,
block_ids: Vec<Uuid>,
args: &OpWrite,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/{}/{}?comp=blocklist",
self.endpoint,
self.container,
percent_encode_path(&p),
);

let req = Request::put(&url);

// Set SSE headers.
let mut req = self.insert_sse_headers(req);
if let Some(cache_control) = args.cache_control() {
req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control);
}

let content = quick_xml::se::to_string(&PutBlockListRequest {
latest: block_ids
.into_iter()
.map(|block_id| {
let encoded_block_id: String = BASE64_STANDARD.encode(block_id.as_bytes());
encoded_block_id
})
.collect(),
})
.map_err(new_xml_deserialize_error)?;

req = req.header(CONTENT_LENGTH, content.len());

let req = req
.body(AsyncBody::Bytes(Bytes::from(content)))
.map_err(new_request_build_error)?;

Ok(req)
}

pub async fn azblob_complete_put_block_list(
&self,
path: &str,
block_ids: Vec<Uuid>,
args: &OpWrite,
) -> Result<Response<IncomingAsyncBody>> {
let mut req = self
.azblob_complete_put_block_list_request(path, block_ids, args)
.await?;

self.sign(&mut req).await?;

self.send(req).await
}

pub fn azblob_head_blob_request(
&self,
path: &str,
Expand Down Expand Up @@ -533,6 +648,13 @@ impl AzblobCore {
}
}

/// Request of PutBlockListRequest
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(default, rename = "BlockList", rename_all = "PascalCase")]
pub struct PutBlockListRequest {
pub latest: Vec<String>,
}

#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct ListBlobsOutput {
Expand Down Expand Up @@ -761,4 +883,42 @@ mod tests {

de::from_reader(Bytes::from(bs).reader()).expect("must success")
}

/// This example is from https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id
#[test]
fn test_serialize_put_block_list_request() {
let req = PutBlockListRequest {
latest: vec!["1".to_string(), "2".to_string(), "3".to_string()],
};

let actual = quick_xml::se::to_string(&req).expect("must succeed");

pretty_assertions::assert_eq!(
actual,
r#"
<BlockList>
<Latest>1</Latest>
<Latest>2</Latest>
<Latest>3</Latest>
</BlockList>"#
// Cleanup space and new line
.replace([' ', '\n'], "")
// Escape `"` by hand to address <https://github.com/tafia/quick-xml/issues/362>
.replace('"', "&quot;")
);

let bs = "<?xml version=\"1.0\" encoding=\"utf-8\"?>
<BlockList>
<Latest>1</Latest>
<Latest>2</Latest>
<Latest>3</Latest>
</BlockList>";

let out: PutBlockListRequest =
de::from_reader(Bytes::from(bs).reader()).expect("must success");
assert_eq!(
out.latest,
vec!["1".to_string(), "2".to_string(), "3".to_string()]
);
}
}
94 changes: 65 additions & 29 deletions core/src/services/azblob/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use http::StatusCode;
use uuid::Uuid;

use super::core::AzblobCore;
use super::error::parse_error;
Expand All @@ -27,7 +28,7 @@ use crate::*;

const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";

pub type AzblobWriters = TwoWays<oio::OneShotWriter<AzblobWriter>, oio::AppendWriter<AzblobWriter>>;
pub type AzblobWriters = TwoWays<oio::BlockWriter<AzblobWriter>, oio::AppendWriter<AzblobWriter>>;

pub struct AzblobWriter {
core: Arc<AzblobCore>,
Expand All @@ -42,34 +43,6 @@ impl AzblobWriter {
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl oio::OneShotWrite for AzblobWriter {
async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
let mut req = self.core.azblob_put_blob_request(
&self.path,
Some(bs.len() as u64),
&self.op,
AsyncBody::ChunkedBytes(bs),
)?;

self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();

match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl oio::AppendWrite for AzblobWriter {
Expand Down Expand Up @@ -137,3 +110,66 @@ impl oio::AppendWrite for AzblobWriter {
}
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[async_trait]
impl oio::BlockWrite for AzblobWriter {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req: http::Request<AsyncBody> =
self.core
.azblob_put_blob_request(&self.path, Some(size), &self.op, body)?;
self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();

match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}

async fn write_block(&self, block_id: Uuid, size: u64, body: AsyncBody) -> Result<()> {
let resp = self
.core
.azblob_put_block(&self.path, block_id, Some(size), &self.op, body)
.await?;

let status = resp.status();
match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}

async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<()> {
let resp = self
.core
.azblob_complete_put_block_list(&self.path, block_ids, &self.op)
.await?;

let status = resp.status();
match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}

async fn abort_block(&self, _block_ids: Vec<Uuid>) -> Result<()> {
// refer to https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id
// Any uncommitted blocks are garbage collected if there are no successful calls to Put Block or Put Block List on the blob within a week.
// If Put Blob is called on the blob, any uncommitted blocks are garbage collected.
Ok(())
}
}

0 comments on commit e668413

Please sign in to comment.