From 2c77e1d06cf312265c25da1759a793fb0671004c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 10 Jan 2024 17:21:07 +0800 Subject: [PATCH] refactor: Polish internal types and remove not needed deps (#3964) * Save Signed-off-by: Xuanwo * Remove deps Signed-off-by: Xuanwo * Remove dirs Signed-off-by: Xuanwo * Update cargo lock Signed-off-by: Xuanwo * Format Signed-off-by: Xuanwo * Fix Signed-off-by: Xuanwo * Fix build Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- Cargo.lock | 3 - bindings/c/Cargo.lock | 41 ------ core/Cargo.toml | 131 ++++++++++++------ core/src/raw/enum_utils.rs | 4 +- core/src/raw/oio/list/api.rs | 9 +- core/src/raw/oio/read/api.rs | 24 +--- core/src/raw/oio/stream/api.rs | 8 +- core/src/raw/oio/write/api.rs | 30 ++-- ...append_object_write.rs => append_write.rs} | 37 +++-- core/src/raw/oio/write/block_write.rs | 9 ++ core/src/raw/oio/write/exact_buf_write.rs | 14 +- core/src/raw/oio/write/mod.rs | 14 +- ...art_upload_write.rs => multipart_write.rs} | 107 +++++++------- core/src/raw/oio/write/range_write.rs | 10 +- core/src/services/alluxio/writer.rs | 4 +- core/src/services/azblob/backend.rs | 2 +- core/src/services/azblob/writer.rs | 5 +- core/src/services/azdls/backend.rs | 2 +- core/src/services/azdls/writer.rs | 5 +- core/src/services/azfile/backend.rs | 2 +- core/src/services/azfile/writer.rs | 5 +- core/src/services/b2/backend.rs | 2 +- core/src/services/b2/writer.rs | 14 +- core/src/services/cos/backend.rs | 4 +- core/src/services/cos/writer.rs | 17 +-- core/src/services/dbfs/writer.rs | 2 +- core/src/services/memory/backend.rs | 10 +- core/src/services/obs/backend.rs | 4 +- core/src/services/obs/writer.rs | 15 +- core/src/services/oss/backend.rs | 4 +- core/src/services/oss/writer.rs | 17 +-- core/src/services/s3/backend.rs | 2 +- core/src/services/s3/writer.rs | 14 +- core/src/services/sftp/backend.rs | 11 -- core/src/services/upyun/backend.rs | 2 +- core/src/services/upyun/writer.rs | 14 +- core/src/services/webhdfs/backend.rs | 2 +- core/src/services/webhdfs/writer.rs | 4 +- 38 files changed, 282 insertions(+), 322 deletions(-) rename core/src/raw/oio/write/{append_object_write.rs => append_write.rs} (80%) rename core/src/raw/oio/write/{multipart_upload_write.rs => multipart_write.rs} (85%) diff --git a/Cargo.lock b/Cargo.lock index 0be9fc19a384..cd56abe92a40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4540,7 +4540,6 @@ dependencies = [ "chrono", "criterion", "dashmap", - "dirs", "dotenvy", "etcd-client", "flagset", @@ -4566,10 +4565,8 @@ dependencies = [ "openssh", "openssh-sftp-client", "opentelemetry", - "parking_lot 0.12.1", "percent-encoding", "persy", - "pin-project", "pretty_assertions", "prometheus", "prometheus-client", diff --git a/bindings/c/Cargo.lock b/bindings/c/Cargo.lock index 628b1e95f0c2..d7455a07cdbd 100644 --- a/bindings/c/Cargo.lock +++ b/bindings/c/Cargo.lock @@ -764,16 +764,6 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" -[[package]] -name = "lock_api" -version = "0.4.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" -dependencies = [ - "autocfg", - "scopeguard", -] - [[package]] name = "log" version = "0.4.20" @@ -923,9 +913,7 @@ dependencies = [ "log", "md-5", "once_cell", - "parking_lot", "percent-encoding", - "pin-project", "quick-xml 0.30.0", "reqsign", "reqwest", @@ -969,29 +957,6 @@ version = "6.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" -[[package]] -name = "parking_lot" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-targets 0.48.5", -] - [[package]] name = "pem" version = "3.0.3" @@ -1354,12 +1319,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "scopeguard" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" - [[package]] name = "sct" version = "0.7.1" diff --git a/core/Cargo.toml b/core/Cargo.toml index 1ca4ca41e453..5fbe2a7369ba 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -181,7 +181,7 @@ services-s3 = [ "reqsign?/reqwest_request", ] services-seafile = [] -services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:dirs"] +services-sftp = ["dep:openssh", "dep:openssh-sftp-client"] services-sled = ["dep:sled"] services-sqlite = ["dep:rusqlite", "dep:r2d2"] services-supabase = [] @@ -217,94 +217,139 @@ path = "tests/behavior/main.rs" required-features = ["tests"] [dependencies] -anyhow = { version = "1.0.30", features = ["std"] } -async-backtrace = { version = "0.2.6", optional = true } async-tls = { version = "0.12.0", optional = true } + +# Required dependencies +anyhow = { version = "1.0.30", features = ["std"] } async-trait = "0.1.68" -atomic_lib = { version = "0.34.5", optional = true } -await-tree = { version = "0.1.1", optional = true } backon = "0.4.1" base64 = "0.21" +bytes = "1.4" +chrono = { version = "0.4.28", default-features = false, features = [ + "clock", + "std", +] } +flagset = "0.4" +futures = { version = "0.3", default-features = false, features = ["std"] } +http = "0.2.9" +log = "0.4" +md-5 = "0.10" +# TODO: remove once_cell when lazy_lock is stable: https://doc.rust-lang.org/std/cell/struct.LazyCell.html +once_cell = "1" +percent-encoding = "2" +quick-xml = { version = "0.30", features = ["serialize", "overlapped-lists"] } +reqwest = { version = "0.11.18", features = [ + "stream", +], default-features = false } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +# TODO: remove `rt` after https://github.com/apache/incubator-opendal/issues/3948 +tokio = { version = "1.27", features = ["sync", "rt"] } +uuid = { version = "1", features = ["serde", "v4"] } + +# Test only dependencies +dotenvy = { version = "0.15", optional = true } +rand = { version = "0.8", optional = true } + +# Optional dependencies + +# Services +# general dependencies. bb8 = { version = "0.8", optional = true } +prost = { version = "0.11", optional = true } +r2d2 = { version = "0.8", optional = true } +sha1 = { version = "0.10.6", optional = true } +sha2 = { version = "0.10", optional = true } + +# For http based services. +reqsign = { version = "0.14.6", default-features = false, optional = true } + +# for services-atomic-server +atomic_lib = { version = "0.34.5", optional = true } +# for services-postgres bb8-postgres = { version = "0.8.1", optional = true } -bytes = "1.4" +tokio-postgres = { version = "0.7.8", optional = true } +# for services-cacache cacache = { version = "12.0", default-features = false, features = [ "tokio-runtime", "mmap", ], optional = true } -chrono = { version = "0.4.28", default-features = false, features = [ - "clock", - "std", -] } +# for services-dashmap dashmap = { version = "5.4", optional = true } -dirs = { version = "5.0.1", optional = true } -dotenvy = { version = "0.15", optional = true } +# for services-etcd etcd-client = { version = "0.12", optional = true, features = ["tls"] } -flagset = "0.4" +# for services-foundationdb foundationdb = { version = "0.8.0", features = [ "embedded-fdb-include", ], optional = true } -futures = { version = "0.3", default-features = false, features = ["std"] } -governor = { version = "0.6.0", optional = true, features = ["std"] } +# for services-hdfs hdrs = { version = "0.3.0", optional = true, features = ["async_file"] } +# for services-upyun hmac = { version = "0.12.1", optional = true } +# for services-libsql hrana-client-proto = { version = "0.2.1", optional = true } -http = "0.2.9" -log = "0.4" -madsim = { version = "0.2.21", optional = true } -md-5 = "0.10" -metrics = { version = "0.20", optional = true } +# for services-mini-moka mini-moka = { version = "0.10", optional = true } -minitrace = { version = "0.6", optional = true } +# for services-moka moka = { version = "0.12", optional = true, features = ["future", "sync"] } +# for services-mongodb mongodb = { version = "2.7.0", optional = true, features = ["tokio-runtime"] } +# for services-mysql mysql_async = { version = "0.32.2", default-features = false, features = [ "default-rustls", ], optional = true } -once_cell = "1" +# for services-sftp openssh = { version = "0.10.0", optional = true } openssh-sftp-client = { version = "0.14.0", optional = true, features = [ "openssh", "tracing", ] } -opentelemetry = { version = "0.21.0", optional = true } -parking_lot = "0.12" -percent-encoding = "2" +# for services-persy persy = { version = "1.4.6", optional = true } -pin-project = "1" -prometheus = { version = "0.13", features = ["process"], optional = true } -prometheus-client = { version = "0.22.0", optional = true } -prost = { version = "0.11", optional = true } -quick-xml = { version = "0.30", features = ["serialize", "overlapped-lists"] } -r2d2 = { version = "0.8", optional = true } -rand = { version = "0.8", optional = true } +# for services-redb redb = { version = "1.1.0", optional = true } +# for services-redis redis = { version = "0.23.1", features = [ "cluster-async", "tokio-comp", "connection-manager", ], optional = true } -reqsign = { version = "0.14.6", default-features = false, optional = true } -reqwest = { version = "0.11.18", features = [ - "stream", -], default-features = false } +# for services-rocksdb rocksdb = { version = "0.21.0", default-features = false, optional = true } +# for services-sqlite rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] } -serde = { version = "1", features = ["derive"] } -serde_json = "1" -sha1 = { version = "0.10.6", optional = true } -sha2 = { version = "0.10", optional = true } +# for services-sled sled = { version = "0.34.7", optional = true } +# for services-ftp suppaftp = { version = "5.2", default-features = false, features = [ "async-secure", "rustls", "async-rustls", ], optional = true } +# for services-tikv tikv-client = { version = "0.3.0", optional = true, default-features = false } -tokio = { version = "1.27", features = ["sync", "rt"] } -tokio-postgres = { version = "0.7.8", optional = true } + +# Layers +# for layers-async-backtrace +async-backtrace = { version = "0.2.6", optional = true } +# for layers-await-tree +await-tree = { version = "0.1.1", optional = true } +# for layers-throttle +governor = { version = "0.6.0", optional = true, features = ["std"] } +# for layers-madsim +madsim = { version = "0.2.21", optional = true } +# for layers-metrics +metrics = { version = "0.20", optional = true } +# for layers-minitrace +minitrace = { version = "0.6", optional = true } +# for layers-opentelemetry +opentelemetry = { version = "0.21.0", optional = true } +# for layers-prometheus +prometheus = { version = "0.13", features = ["process"], optional = true } +# for layers-prometheus-client +prometheus-client = { version = "0.22.0", optional = true } +# for layers-tracing tracing = { version = "0.1", optional = true } -uuid = { version = "1", features = ["serde", "v4"] } [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2", features = ["js"] } diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs index 56b32a9de070..518ccc674087 100644 --- a/core/src/raw/enum_utils.rs +++ b/core/src/raw/enum_utils.rs @@ -29,8 +29,8 @@ //! ```txt //! impl Accessor for OssBackend { //! type Writer = raw::TwoWays< -//! oio::MultipartUploadWriter, -//! oio::AppendObjectWriter, +//! oio::MultipartWriter, +//! oio::AppendWriter, //! >; //! } //! ``` diff --git a/core/src/raw/oio/list/api.rs b/core/src/raw/oio/list/api.rs index 6130aa914f00..07dbdf774bb4 100644 --- a/core/src/raw/oio/list/api.rs +++ b/core/src/raw/oio/list/api.rs @@ -22,8 +22,6 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -use pin_project::pin_project; - use crate::raw::oio::Entry; use crate::*; @@ -105,8 +103,6 @@ pub trait ListExt: List { } } -/// Make this future `!Unpin` for compatibility with async trait methods. -#[pin_project(!Unpin)] pub struct NextFuture<'a, L: List + Unpin + ?Sized> { lister: &'a mut L, } @@ -117,9 +113,8 @@ where { type Output = Result>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - let this = self.project(); - Pin::new(this.lister).poll_next(cx) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + self.lister.poll_next(cx) } } diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs index c2c96811223b..f6175f436045 100644 --- a/core/src/raw/oio/read/api.rs +++ b/core/src/raw/oio/read/api.rs @@ -25,7 +25,6 @@ use std::task::Poll; use bytes::Bytes; use futures::Future; -use pin_project::pin_project; use tokio::io::ReadBuf; use crate::*; @@ -207,8 +206,6 @@ pub trait ReadExt: Read { } } -/// Make this future `!Unpin` for compatibility with async trait methods. -#[pin_project(!Unpin)] pub struct ReadFuture<'a, R: Read + Unpin + ?Sized> { reader: &'a mut R, buf: &'a mut [u8], @@ -221,13 +218,11 @@ where type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - Pin::new(this.reader).poll_read(cx, this.buf) + let this = self.get_mut(); + this.reader.poll_read(cx, this.buf) } } -/// Make this future `!Unpin` for compatibility with async trait methods. -#[pin_project(!Unpin)] pub struct SeekFuture<'a, R: Read + Unpin + ?Sized> { reader: &'a mut R, pos: io::SeekFrom, @@ -240,13 +235,11 @@ where type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - Pin::new(this.reader).poll_seek(cx, *this.pos) + let this = self.get_mut(); + this.reader.poll_seek(cx, this.pos) } } -/// Make this future `!Unpin` for compatibility with async trait methods. -#[pin_project(!Unpin)] pub struct NextFuture<'a, R: Read + Unpin + ?Sized> { reader: &'a mut R, } @@ -257,14 +250,11 @@ where { type Output = Option>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - let this = self.project(); - Pin::new(this.reader).poll_next(cx) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + self.reader.poll_next(cx) } } -/// Make this future `!Unpin` for compatibility with async trait methods. -#[pin_project(!Unpin)] pub struct ReadToEndFuture<'a, R: Read + Unpin + ?Sized> { reader: &'a mut R, buf: &'a mut Vec, @@ -277,7 +267,7 @@ where type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); + let this = self.get_mut(); let start_len = this.buf.len(); loop { diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs index 6dcbbb5a565f..fe95bb656ac9 100644 --- a/core/src/raw/oio/stream/api.rs +++ b/core/src/raw/oio/stream/api.rs @@ -23,7 +23,6 @@ use std::task::Poll; use bytes::Bytes; use bytes::BytesMut; -use pin_project::pin_project; use crate::*; @@ -105,8 +104,6 @@ pub trait StreamExt: Stream { } } -/// Make this future `!Unpin` for compatibility with async trait methods. -#[pin_project(!Unpin)] pub struct NextFuture<'a, T: Stream + Unpin + ?Sized> { inner: &'a mut T, } @@ -117,9 +114,8 @@ where { type Output = Option>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - let this = self.project(); - Pin::new(this.inner).poll_next(cx) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + self.inner.poll_next(cx) } } diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index 13612565744e..637fd11dc007 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -22,8 +22,6 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -use pin_project::pin_project; - use crate::raw::*; use crate::*; @@ -33,10 +31,11 @@ use crate::*; pub enum WriteOperation { /// Operation for [`Write::write`] Write, - /// Operation for [`Write::abort`] - Abort, /// Operation for [`Write::close`] Close, + /// Operation for [`Write::abort`] + Abort, + /// Operation for [`BlockingWrite::write`] BlockingWrite, /// Operation for [`BlockingWrite::close`] @@ -62,8 +61,9 @@ impl From for &'static str { match v { Write => "Writer::write", - Abort => "Writer::abort", Close => "Writer::close", + Abort => "Writer::abort", + BlockingWrite => "BlockingWriter::write", BlockingClose => "BlockingWriter::close", } @@ -151,8 +151,6 @@ pub trait WriteExt: Write { } } -/// Make this future `!Unpin` for compatibility with async trait methods. -#[pin_project(!Unpin)] pub struct WriteFuture<'a, W: Write + Unpin + ?Sized> { writer: &'a mut W, buf: &'a dyn oio::WriteBuf, @@ -165,13 +163,11 @@ where type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - Pin::new(this.writer).poll_write(cx, *this.buf) + let this = self.get_mut(); + this.writer.poll_write(cx, this.buf) } } -/// Make this future `!Unpin` for compatibility with async trait methods. -#[pin_project(!Unpin)] pub struct AbortFuture<'a, W: Write + Unpin + ?Sized> { writer: &'a mut W, } @@ -182,14 +178,11 @@ where { type Output = Result<()>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - Pin::new(this.writer).poll_abort(cx) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.writer.poll_abort(cx) } } -/// Make this future `!Unpin` for compatibility with async trait methods. -#[pin_project(!Unpin)] pub struct CloseFuture<'a, W: Write + Unpin + ?Sized> { writer: &'a mut W, } @@ -200,9 +193,8 @@ where { type Output = Result<()>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - Pin::new(this.writer).poll_close(cx) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.writer.poll_close(cx) } } diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_write.rs similarity index 80% rename from core/src/raw/oio/write/append_object_write.rs rename to core/src/raw/oio/write/append_write.rs index e0f591c8adce..caa8fa677575 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_write.rs @@ -24,18 +24,25 @@ use async_trait::async_trait; use crate::raw::*; use crate::*; -/// AppendObjectWrite is used to implement [`Write`] based on append -/// object. By implementing AppendObjectWrite, services don't need to +/// AppendWrite is used to implement [`Write`] based on append +/// object. By implementing AppendWrite, services don't need to /// care about the details of buffering and uploading parts. /// -/// The layout after adopting [`AppendObjectWrite`]: +/// The layout after adopting [`AppendWrite`]: /// -/// - Services impl `AppendObjectWrite` -/// - `AppendObjectWriter` impl `Write` -/// - Expose `AppendObjectWriter` as `Accessor::Writer` +/// - Services impl `AppendWrite` +/// - `AppendWriter` impl `Write` +/// - Expose `AppendWriter` as `Accessor::Writer` +/// +/// ## Requirements +/// +/// Services that implement `AppendWrite` must fulfill the following requirements: +/// +/// - Must be a http service that could accept `AsyncBody`. +/// - Provide a way to get the current offset of the append object. #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -pub trait AppendObjectWrite: Send + Sync + Unpin + 'static { +pub trait AppendWrite: Send + Sync + Unpin + 'static { /// Get the current offset of the append object. /// /// Returns `0` if the object is not exist. @@ -45,12 +52,12 @@ pub trait AppendObjectWrite: Send + Sync + Unpin + 'static { async fn append(&self, offset: u64, size: u64, body: AsyncBody) -> Result<()>; } -/// AppendObjectWriter will implements [`Write`] based on append object. +/// AppendWriter will implements [`Write`] based on append object. /// /// ## TODO /// /// - Allow users to switch to un-buffered mode if users write 16MiB every time. -pub struct AppendObjectWriter { +pub struct AppendWriter { state: State, offset: Option, @@ -65,15 +72,15 @@ enum State { /// # Safety /// /// wasm32 is a special target that we only have one event-loop for this state. -unsafe impl Send for State {} +unsafe impl Send for State {} /// # Safety /// /// We will only take `&mut Self` reference for State. -unsafe impl Sync for State {} +unsafe impl Sync for State {} -impl AppendObjectWriter { - /// Create a new AppendObjectWriter. +impl AppendWriter { + /// Create a new AppendWriter. pub fn new(inner: W) -> Self { Self { state: State::Idle(Some(inner)), @@ -82,9 +89,9 @@ impl AppendObjectWriter { } } -impl oio::Write for AppendObjectWriter +impl oio::Write for AppendWriter where - W: AppendObjectWrite, + W: AppendWrite, { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { loop { diff --git a/core/src/raw/oio/write/block_write.rs b/core/src/raw/oio/write/block_write.rs index 9e50f2271033..fb021335bb67 100644 --- a/core/src/raw/oio/write/block_write.rs +++ b/core/src/raw/oio/write/block_write.rs @@ -51,6 +51,15 @@ use crate::*; /// ``` /// /// We will use `write_once` instead of starting a new block upload. +/// +/// # Requirements +/// +/// Services that implement `BlockWrite` must fulfill the following requirements: +/// +/// - Must be a http service that could accept `AsyncBody`. +/// - Don't need initialization before writing. +/// - Block ID is generated by caller `BlockWrite` instead of services. +/// - Complete block by an ordered block id list. #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait BlockWrite: Send + Sync + Unpin + 'static { diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 3f4dd7adebc8..bb149edcfbbf 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -65,11 +65,6 @@ impl oio::Write for ExactBufWriter { Poll::Ready(Ok(written)) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { - self.buffer.clear(); - self.inner.poll_abort(cx) - } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { while !self.buffer.is_empty() { let n = ready!(self.inner.poll_write(cx, &self.buffer))?; @@ -78,6 +73,11 @@ impl oio::Write for ExactBufWriter { self.inner.poll_close(cx) } + + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { + self.buffer.clear(); + self.inner.poll_abort(cx) + } } #[cfg(test)] @@ -110,11 +110,11 @@ mod tests { Poll::Ready(Ok(bs.chunk().len())) } - fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll> { + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { + fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs index e2fdf70e47d2..5f437e2525fa 100644 --- a/core/src/raw/oio/write/mod.rs +++ b/core/src/raw/oio/write/mod.rs @@ -23,14 +23,14 @@ pub use api::WriteExt; pub use api::WriteOperation; pub use api::Writer; -mod multipart_upload_write; -pub use multipart_upload_write::MultipartUploadPart; -pub use multipart_upload_write::MultipartUploadWrite; -pub use multipart_upload_write::MultipartUploadWriter; +mod multipart_write; +pub use multipart_write::MultipartPart; +pub use multipart_write::MultipartWrite; +pub use multipart_write::MultipartWriter; -mod append_object_write; -pub use append_object_write::AppendObjectWrite; -pub use append_object_write::AppendObjectWriter; +mod append_write; +pub use append_write::AppendWrite; +pub use append_write::AppendWriter; mod one_shot_write; pub use one_shot_write::OneShotWrite; diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_write.rs similarity index 85% rename from core/src/raw/oio/write/multipart_upload_write.rs rename to core/src/raw/oio/write/multipart_write.rs index 3749dcb3f5dd..3ca119144000 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_write.rs @@ -29,21 +29,21 @@ use futures::StreamExt; use crate::raw::*; use crate::*; -/// MultipartUploadWrite is used to implement [`Write`] based on multipart -/// uploads. By implementing MultipartUploadWrite, services don't need to +/// MultipartWrite is used to implement [`Write`] based on multipart +/// uploads. By implementing MultipartWrite, services don't need to /// care about the details of uploading parts. /// /// # Architecture /// -/// The architecture after adopting [`MultipartUploadWrite`]: +/// The architecture after adopting [`MultipartWrite`]: /// -/// - Services impl `MultipartUploadWrite` -/// - `MultipartUploadWriter` impl `Write` -/// - Expose `MultipartUploadWriter` as `Accessor::Writer` +/// - Services impl `MultipartWrite` +/// - `MultipartWriter` impl `Write` +/// - Expose `MultipartWriter` as `Accessor::Writer` /// /// # Notes /// -/// `MultipartUploadWrite` has an oneshot optimization when `write` has been called only once: +/// `MultipartWrite` has an oneshot optimization when `write` has been called only once: /// /// ```no_build /// w.write(bs).await?; @@ -51,19 +51,28 @@ use crate::*; /// ``` /// /// We will use `write_once` instead of starting a new multipart upload. +/// +/// # Requirements +/// +/// Services that implement `BlockWrite` must fulfill the following requirements: +/// +/// - Must be a http service that could accept `AsyncBody`. +/// - Don't need initialization before writing. +/// - Block ID is generated by caller `BlockWrite` instead of services. +/// - Complete block by an ordered block id list. #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -pub trait MultipartUploadWrite: Send + Sync + Unpin + 'static { +pub trait MultipartWrite: Send + Sync + Unpin + 'static { /// write_once is used to write the data to underlying storage at once. /// - /// MultipartUploadWriter will call this API when: + /// MultipartWriter will call this API when: /// /// - All the data has been written to the buffer and we can perform the upload at once. async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>; /// initiate_part will call start a multipart upload and return the upload id. /// - /// MultipartUploadWriter will call this when: + /// MultipartWriter will call this when: /// /// - the total size of data is unknown. /// - the total size of data is known, but the size of current write @@ -71,9 +80,9 @@ pub trait MultipartUploadWrite: Send + Sync + Unpin + 'static { async fn initiate_part(&self) -> Result; /// write_part will write a part of the data and returns the result - /// [`MultipartUploadPart`]. + /// [`MultipartPart`]. /// - /// MultipartUploadWriter will call this API and stores the result in + /// MultipartWriter will call this API and stores the result in /// order. /// /// - part_number is the index of the part, starting from 0. @@ -83,24 +92,24 @@ pub trait MultipartUploadWrite: Send + Sync + Unpin + 'static { part_number: usize, size: u64, body: AsyncBody, - ) -> Result; + ) -> Result; /// complete_part will complete the multipart upload to build the final /// file. - async fn complete_part(&self, upload_id: &str, parts: &[MultipartUploadPart]) -> Result<()>; + async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) -> Result<()>; /// abort_part will cancel the multipart upload and purge all data. async fn abort_part(&self, upload_id: &str) -> Result<()>; } -/// The result of [`MultipartUploadWrite::write_part`]. +/// The result of [`MultipartWrite::write_part`]. /// -/// services implement should convert MultipartUploadPart to their own represents. +/// services implement should convert MultipartPart to their own represents. /// /// - `part_number` is the index of the part, starting from 0. /// - `etag` is the `ETag` of the part. #[derive(Clone)] -pub struct MultipartUploadPart { +pub struct MultipartPart { /// The number of the part, starting from 0. pub part_number: usize, /// The etag of the part. @@ -110,7 +119,7 @@ pub struct MultipartUploadPart { /// WritePartResult is the result returned by [`WritePartFuture`]. /// /// The error part will carries input `(part_number, bytes, err)` so caller can retry them. -type WritePartResult = std::result::Result; +type WritePartResult = std::result::Result; struct WritePartFuture(BoxedFuture); @@ -132,7 +141,7 @@ impl Future for WritePartFuture { } impl WritePartFuture { - pub fn new( + pub fn new( w: Arc, upload_id: Arc, part_number: usize, @@ -153,14 +162,14 @@ impl WritePartFuture { } } -/// MultipartUploadWriter will implements [`Write`] based on multipart +/// MultipartWriter will implements [`Write`] based on multipart /// uploads. -pub struct MultipartUploadWriter { +pub struct MultipartWriter { state: State, w: Arc, upload_id: Option>, - parts: Vec, + parts: Vec, cache: Option, futures: ConcurrentFutures, next_part_number: usize, @@ -182,8 +191,8 @@ unsafe impl Send for State {} /// We will only take `&mut Self` reference for State. unsafe impl Sync for State {} -impl MultipartUploadWriter { - /// Create a new MultipartUploadWriter. +impl MultipartWriter { + /// Create a new MultipartWriter. pub fn new(inner: W, concurrent: usize) -> Self { Self { state: State::Idle, @@ -206,9 +215,9 @@ impl MultipartUploadWriter { } } -impl oio::Write for MultipartUploadWriter +impl oio::Write for MultipartWriter where - W: MultipartUploadWrite, + W: MultipartWrite, { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { loop { @@ -267,14 +276,10 @@ where self.state = State::Idle; } State::Close(_) => { - unreachable!( - "MultipartUploadWriter must not go into State::Close during poll_write" - ) + unreachable!("MultipartWriter must not go into State::Close during poll_write") } State::Abort(_) => { - unreachable!( - "MultipartUploadWriter must not go into State::Abort during poll_write" - ) + unreachable!("MultipartWriter must not go into State::Abort during poll_write") } } } @@ -352,12 +357,12 @@ where return Poll::Ready(Ok(())); } - State::Init(_) => unreachable!( - "MultipartUploadWriter must not go into State::Init during poll_close" - ), - State::Abort(_) => unreachable!( - "MultipartUploadWriter must not go into State::Abort during poll_close" - ), + State::Init(_) => { + unreachable!("MultipartWriter must not go into State::Init during poll_close") + } + State::Abort(_) => { + unreachable!("MultipartWriter must not go into State::Abort during poll_close") + } } } } @@ -385,12 +390,12 @@ where self.state = State::Idle; return Poll::Ready(res); } - State::Init(_) => unreachable!( - "MultipartUploadWriter must not go into State::Init during poll_abort" - ), - State::Close(_) => unreachable!( - "MultipartUploadWriter must not go into State::Close during poll_abort" - ), + State::Init(_) => { + unreachable!("MultipartWriter must not go into State::Init during poll_abort") + } + State::Close(_) => { + unreachable!("MultipartWriter must not go into State::Close during poll_abort") + } } } } @@ -424,7 +429,7 @@ mod tests { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] - impl MultipartUploadWrite for Arc> { + impl MultipartWrite for Arc> { async fn write_once(&self, size: u64, _: AsyncBody) -> Result<()> { self.lock().unwrap().length += size; Ok(()) @@ -441,7 +446,7 @@ mod tests { part_number: usize, size: u64, _: AsyncBody, - ) -> Result { + ) -> Result { let mut test = self.lock().unwrap(); assert_eq!(upload_id, test.upload_id); @@ -453,17 +458,13 @@ mod tests { test.part_numbers.push(part_number); test.length += size; - Ok(MultipartUploadPart { + Ok(MultipartPart { part_number, etag: "etag".to_string(), }) } - async fn complete_part( - &self, - upload_id: &str, - parts: &[MultipartUploadPart], - ) -> Result<()> { + async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) -> Result<()> { let test = self.lock().unwrap(); assert_eq!(upload_id, test.upload_id); assert_eq!(parts.len(), test.part_numbers.len()); @@ -483,7 +484,7 @@ mod tests { async fn test_multipart_upload_writer_with_concurrent_errors() { let mut rng = thread_rng(); - let mut w = MultipartUploadWriter::new(TestWrite::new(), 8); + let mut w = MultipartWriter::new(TestWrite::new(), 8); let mut total_size = 0u64; for _ in 0..1000 { diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 13a7b2fb3e9a..d876962bb67f 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -51,6 +51,14 @@ use crate::*; /// - Services impl `RangeWrite` /// - `RangeWriter` impl `Write` /// - Expose `RangeWriter` as `Accessor::Writer` +/// +/// # Requirements +/// +/// Services that implement `RangeWrite` must fulfill the following requirements: +/// +/// - Must be a http service that could accept `AsyncBody`. +/// - Need initialization before writing. +/// - Writing data based on range: `offset`, `size`. #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait RangeWrite: Send + Sync + Unpin + 'static { @@ -161,7 +169,7 @@ unsafe impl Send for State {} unsafe impl Sync for State {} impl RangeWriter { - /// Create a new MultipartUploadWriter. + /// Create a new MultipartWriter. pub fn new(inner: W, concurrent: usize) -> Self { Self { state: State::Idle, diff --git a/core/src/services/alluxio/writer.rs b/core/src/services/alluxio/writer.rs index a45fdbdf7984..fce9fe347605 100644 --- a/core/src/services/alluxio/writer.rs +++ b/core/src/services/alluxio/writer.rs @@ -101,9 +101,7 @@ impl oio::Write for AlluxioWriter { return Poll::Ready(Ok(part?)); } State::Close(_) => { - unreachable!( - "MultipartUploadWriter must not go into State::Close during poll_write" - ) + unreachable!("MultipartWriter must not go into State::Close during poll_write") } } } diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 3a38c9977ca9..1a24335da846 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -629,7 +629,7 @@ impl Accessor for AzblobBackend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let w = AzblobWriter::new(self.core.clone(), args.clone(), path.to_string()); let w = if args.append() { - AzblobWriters::Two(oio::AppendObjectWriter::new(w)) + AzblobWriters::Two(oio::AppendWriter::new(w)) } else { AzblobWriters::One(oio::OneShotWriter::new(w)) }; diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 895bce1bf5ca..6e8b415dd987 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -27,8 +27,7 @@ use crate::*; const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; -pub type AzblobWriters = - TwoWays, oio::AppendObjectWriter>; +pub type AzblobWriters = TwoWays, oio::AppendWriter>; pub struct AzblobWriter { core: Arc, @@ -73,7 +72,7 @@ impl oio::OneShotWrite for AzblobWriter { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl oio::AppendObjectWrite for AzblobWriter { +impl oio::AppendWrite for AzblobWriter { async fn offset(&self) -> Result { let resp = self .core diff --git a/core/src/services/azdls/backend.rs b/core/src/services/azdls/backend.rs index 53fa53d496e2..e7243ab45aab 100644 --- a/core/src/services/azdls/backend.rs +++ b/core/src/services/azdls/backend.rs @@ -356,7 +356,7 @@ impl Accessor for AzdlsBackend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let w = AzdlsWriter::new(self.core.clone(), args.clone(), path.to_string()); let w = if args.append() { - AzdlsWriters::Two(oio::AppendObjectWriter::new(w)) + AzdlsWriters::Two(oio::AppendWriter::new(w)) } else { AzdlsWriters::One(oio::OneShotWriter::new(w)) }; diff --git a/core/src/services/azdls/writer.rs b/core/src/services/azdls/writer.rs index e17cd1c28afc..6add8edd1a21 100644 --- a/core/src/services/azdls/writer.rs +++ b/core/src/services/azdls/writer.rs @@ -26,8 +26,7 @@ use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; -pub type AzdlsWriters = - TwoWays, oio::AppendObjectWriter>; +pub type AzdlsWriters = TwoWays, oio::AppendWriter>; pub struct AzdlsWriter { core: Arc, @@ -91,7 +90,7 @@ impl oio::OneShotWrite for AzdlsWriter { } #[async_trait] -impl oio::AppendObjectWrite for AzdlsWriter { +impl oio::AppendWrite for AzdlsWriter { async fn offset(&self) -> Result { let resp = self.core.azdls_get_properties(&self.path).await?; diff --git a/core/src/services/azfile/backend.rs b/core/src/services/azfile/backend.rs index dea84c73aaa2..84f02c88bf2e 100644 --- a/core/src/services/azfile/backend.rs +++ b/core/src/services/azfile/backend.rs @@ -351,7 +351,7 @@ impl Accessor for AzfileBackend { self.core.ensure_parent_dir_exists(path).await?; let w = AzfileWriter::new(self.core.clone(), args.clone(), path.to_string()); let w = if args.append() { - AzfileWriters::Two(oio::AppendObjectWriter::new(w)) + AzfileWriters::Two(oio::AppendWriter::new(w)) } else { AzfileWriters::One(oio::OneShotWriter::new(w)) }; diff --git a/core/src/services/azfile/writer.rs b/core/src/services/azfile/writer.rs index fbd8daff10c6..7ccacf8847fb 100644 --- a/core/src/services/azfile/writer.rs +++ b/core/src/services/azfile/writer.rs @@ -25,8 +25,7 @@ use super::error::parse_error; use crate::raw::*; use crate::*; -pub type AzfileWriters = - TwoWays, oio::AppendObjectWriter>; +pub type AzfileWriters = TwoWays, oio::AppendWriter>; pub struct AzfileWriter { core: Arc, @@ -80,7 +79,7 @@ impl oio::OneShotWrite for AzfileWriter { } #[async_trait] -impl oio::AppendObjectWrite for AzfileWriter { +impl oio::AppendWrite for AzfileWriter { async fn offset(&self) -> Result { let resp = self.core.azfile_get_file_properties(&self.path).await?; diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs index a6f29055fddb..5d1ec9b5313a 100644 --- a/core/src/services/b2/backend.rs +++ b/core/src/services/b2/backend.rs @@ -379,7 +379,7 @@ impl Accessor for B2Backend { let concurrent = args.concurrent(); let writer = B2Writer::new(self.core.clone(), path, args); - let w = oio::MultipartUploadWriter::new(writer, concurrent); + let w = oio::MultipartWriter::new(writer, concurrent); Ok((RpWrite::default(), w)) } diff --git a/core/src/services/b2/writer.rs b/core/src/services/b2/writer.rs index 59f4367fb59f..abbdc4dc36b7 100644 --- a/core/src/services/b2/writer.rs +++ b/core/src/services/b2/writer.rs @@ -27,7 +27,7 @@ use super::error::parse_error; use crate::raw::*; use crate::*; -pub type B2Writers = oio::MultipartUploadWriter; +pub type B2Writers = oio::MultipartWriter; pub struct B2Writer { core: Arc, @@ -47,7 +47,7 @@ impl B2Writer { } #[async_trait] -impl oio::MultipartUploadWrite for B2Writer { +impl oio::MultipartWrite for B2Writer { async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let resp = self .core @@ -89,7 +89,7 @@ impl oio::MultipartUploadWrite for B2Writer { part_number: usize, size: u64, body: AsyncBody, - ) -> Result { + ) -> Result { // B2 requires part number must between [1..=10000] let part_number = part_number + 1; @@ -107,7 +107,7 @@ impl oio::MultipartUploadWrite for B2Writer { let result: UploadPartResponse = serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; - Ok(oio::MultipartUploadPart { + Ok(oio::MultipartPart { etag: result.content_sha1, part_number, }) @@ -116,11 +116,7 @@ impl oio::MultipartUploadWrite for B2Writer { } } - async fn complete_part( - &self, - upload_id: &str, - parts: &[oio::MultipartUploadPart], - ) -> Result<()> { + async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { let part_sha1_array = parts .iter() .map(|p| { diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index e0e58701fd68..decb37bd1da2 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -338,9 +338,9 @@ impl Accessor for CosBackend { let writer = CosWriter::new(self.core.clone(), path, args.clone()); let w = if args.append() { - CosWriters::Two(oio::AppendObjectWriter::new(writer)) + CosWriters::Two(oio::AppendWriter::new(writer)) } else { - CosWriters::One(oio::MultipartUploadWriter::new(writer, args.concurrent())) + CosWriters::One(oio::MultipartWriter::new(writer, args.concurrent())) }; Ok((RpWrite::default(), w)) diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index b1d1dbac505a..a05472aa8aae 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -25,8 +25,7 @@ use super::error::parse_error; use crate::raw::*; use crate::*; -pub type CosWriters = - TwoWays, oio::AppendObjectWriter>; +pub type CosWriters = TwoWays, oio::AppendWriter>; pub struct CosWriter { core: Arc, @@ -46,7 +45,7 @@ impl CosWriter { } #[async_trait] -impl oio::MultipartUploadWrite for CosWriter { +impl oio::MultipartWrite for CosWriter { async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self .core @@ -95,7 +94,7 @@ impl oio::MultipartUploadWrite for CosWriter { part_number: usize, size: u64, body: AsyncBody, - ) -> Result { + ) -> Result { // COS requires part number must between [1..=10000] let part_number = part_number + 1; @@ -119,17 +118,13 @@ impl oio::MultipartUploadWrite for CosWriter { resp.into_body().consume().await?; - Ok(oio::MultipartUploadPart { part_number, etag }) + Ok(oio::MultipartPart { part_number, etag }) } _ => Err(parse_error(resp).await?), } } - async fn complete_part( - &self, - upload_id: &str, - parts: &[oio::MultipartUploadPart], - ) -> Result<()> { + async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { let parts = parts .iter() .map(|p| CompleteMultipartUploadRequestPart { @@ -173,7 +168,7 @@ impl oio::MultipartUploadWrite for CosWriter { } #[async_trait] -impl oio::AppendObjectWrite for CosWriter { +impl oio::AppendWrite for CosWriter { async fn offset(&self) -> Result { let resp = self .core diff --git a/core/src/services/dbfs/writer.rs b/core/src/services/dbfs/writer.rs index f2db1124f4ff..735f883d3b9c 100644 --- a/core/src/services/dbfs/writer.rs +++ b/core/src/services/dbfs/writer.rs @@ -49,7 +49,7 @@ impl oio::OneShotWrite for DbfsWriter { if size >= Self::MAX_SIMPLE_SIZE { return Err(Error::new( ErrorKind::Unsupported, - "AppendObjectWrite has not been implemented for Dbfs", + "AppendWrite has not been implemented for Dbfs", )); } diff --git a/core/src/services/memory/backend.rs b/core/src/services/memory/backend.rs index 3057d0e46e1d..6549d0dbe880 100644 --- a/core/src/services/memory/backend.rs +++ b/core/src/services/memory/backend.rs @@ -19,9 +19,9 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; +use std::sync::Mutex; use async_trait::async_trait; -use parking_lot::Mutex; use crate::raw::adapters::typed_kv; use crate::*; @@ -96,7 +96,7 @@ impl typed_kv::Adapter for Adapter { } fn blocking_get(&self, path: &str) -> Result> { - match self.inner.lock().get(path) { + match self.inner.lock().unwrap().get(path) { None => Ok(None), Some(bs) => Ok(Some(bs.to_owned())), } @@ -107,7 +107,7 @@ impl typed_kv::Adapter for Adapter { } fn blocking_set(&self, path: &str, value: typed_kv::Value) -> Result<()> { - self.inner.lock().insert(path.to_string(), value); + self.inner.lock().unwrap().insert(path.to_string(), value); Ok(()) } @@ -117,7 +117,7 @@ impl typed_kv::Adapter for Adapter { } fn blocking_delete(&self, path: &str) -> Result<()> { - self.inner.lock().remove(path); + self.inner.lock().unwrap().remove(path); Ok(()) } @@ -127,7 +127,7 @@ impl typed_kv::Adapter for Adapter { } fn blocking_scan(&self, path: &str) -> Result> { - let inner = self.inner.lock(); + let inner = self.inner.lock().unwrap(); let keys: Vec<_> = if path.is_empty() { inner.keys().cloned().collect() } else { diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index ad3b9c98fc89..91bfa8893de3 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -348,9 +348,9 @@ impl Accessor for ObsBackend { let writer = ObsWriter::new(self.core.clone(), path, args.clone()); let w = if args.append() { - ObsWriters::Two(oio::AppendObjectWriter::new(writer)) + ObsWriters::Two(oio::AppendWriter::new(writer)) } else { - ObsWriters::One(oio::MultipartUploadWriter::new(writer, args.concurrent())) + ObsWriters::One(oio::MultipartWriter::new(writer, args.concurrent())) }; Ok((RpWrite::default(), w)) diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index 1dff95eda12d..63d7287169ae 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -22,12 +22,11 @@ use http::StatusCode; use super::core::*; use super::error::parse_error; -use crate::raw::oio::MultipartUploadPart; +use crate::raw::oio::MultipartPart; use crate::raw::*; use crate::*; -pub type ObsWriters = - TwoWays, oio::AppendObjectWriter>; +pub type ObsWriters = TwoWays, oio::AppendWriter>; pub struct ObsWriter { core: Arc, @@ -47,7 +46,7 @@ impl ObsWriter { } #[async_trait] -impl oio::MultipartUploadWrite for ObsWriter { +impl oio::MultipartWrite for ObsWriter { async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self .core @@ -96,7 +95,7 @@ impl oio::MultipartUploadWrite for ObsWriter { part_number: usize, size: u64, body: AsyncBody, - ) -> Result { + ) -> Result { // Obs service requires part number must between [1..=10000] let part_number = part_number + 1; @@ -120,13 +119,13 @@ impl oio::MultipartUploadWrite for ObsWriter { resp.into_body().consume().await?; - Ok(MultipartUploadPart { part_number, etag }) + Ok(MultipartPart { part_number, etag }) } _ => Err(parse_error(resp).await?), } } - async fn complete_part(&self, upload_id: &str, parts: &[MultipartUploadPart]) -> Result<()> { + async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) -> Result<()> { let parts = parts .iter() .map(|p| CompleteMultipartUploadRequestPart { @@ -170,7 +169,7 @@ impl oio::MultipartUploadWrite for ObsWriter { } #[async_trait] -impl oio::AppendObjectWrite for ObsWriter { +impl oio::AppendWrite for ObsWriter { async fn offset(&self) -> Result { let resp = self .core diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 79624a7dded8..2b5583d34550 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -490,9 +490,9 @@ impl Accessor for OssBackend { let writer = OssWriter::new(self.core.clone(), path, args.clone()); let w = if args.append() { - OssWriters::Two(oio::AppendObjectWriter::new(writer)) + OssWriters::Two(oio::AppendWriter::new(writer)) } else { - OssWriters::One(oio::MultipartUploadWriter::new(writer, args.concurrent())) + OssWriters::One(oio::MultipartWriter::new(writer, args.concurrent())) }; Ok((RpWrite::default(), w)) diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index 1770ffeb64d2..c8015788d32c 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -25,8 +25,7 @@ use super::error::parse_error; use crate::raw::*; use crate::*; -pub type OssWriters = - TwoWays, oio::AppendObjectWriter>; +pub type OssWriters = TwoWays, oio::AppendWriter>; pub struct OssWriter { core: Arc, @@ -46,7 +45,7 @@ impl OssWriter { } #[async_trait] -impl oio::MultipartUploadWrite for OssWriter { +impl oio::MultipartWrite for OssWriter { async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self.core @@ -101,7 +100,7 @@ impl oio::MultipartUploadWrite for OssWriter { part_number: usize, size: u64, body: AsyncBody, - ) -> Result { + ) -> Result { // OSS requires part number must between [1..=10000] let part_number = part_number + 1; @@ -125,17 +124,13 @@ impl oio::MultipartUploadWrite for OssWriter { resp.into_body().consume().await?; - Ok(oio::MultipartUploadPart { part_number, etag }) + Ok(oio::MultipartPart { part_number, etag }) } _ => Err(parse_error(resp).await?), } } - async fn complete_part( - &self, - upload_id: &str, - parts: &[oio::MultipartUploadPart], - ) -> Result<()> { + async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { let parts = parts .iter() .map(|p| MultipartUploadPart { @@ -178,7 +173,7 @@ impl oio::MultipartUploadWrite for OssWriter { } #[async_trait] -impl oio::AppendObjectWrite for OssWriter { +impl oio::AppendWrite for OssWriter { async fn offset(&self) -> Result { let resp = self.core.oss_head_object(&self.path, None, None).await?; diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 2094fde0d93c..35d1eb2b4cba 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -1088,7 +1088,7 @@ impl Accessor for S3Backend { let concurrent = args.concurrent(); let writer = S3Writer::new(self.core.clone(), path, args); - let w = oio::MultipartUploadWriter::new(writer, concurrent); + let w = oio::MultipartWriter::new(writer, concurrent); Ok((RpWrite::default(), w)) } diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 22ecc041abdd..9b6b1bb6cc25 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -25,7 +25,7 @@ use super::error::parse_error; use crate::raw::*; use crate::*; -pub type S3Writers = oio::MultipartUploadWriter; +pub type S3Writers = oio::MultipartWriter; pub struct S3Writer { core: Arc, @@ -46,7 +46,7 @@ impl S3Writer { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl oio::MultipartUploadWrite for S3Writer { +impl oio::MultipartWrite for S3Writer { async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self .core @@ -95,7 +95,7 @@ impl oio::MultipartUploadWrite for S3Writer { part_number: usize, size: u64, body: AsyncBody, - ) -> Result { + ) -> Result { // AWS S3 requires part number must between [1..=10000] let part_number = part_number + 1; @@ -122,17 +122,13 @@ impl oio::MultipartUploadWrite for S3Writer { resp.into_body().consume().await?; - Ok(oio::MultipartUploadPart { part_number, etag }) + Ok(oio::MultipartPart { part_number, etag }) } _ => Err(parse_error(resp).await?), } } - async fn complete_part( - &self, - upload_id: &str, - parts: &[oio::MultipartUploadPart], - ) -> Result<()> { + async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { let parts = parts .iter() .map(|p| CompleteMultipartUploadRequestPart { diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 5be1f3bbe781..7ab1943754d4 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -512,17 +512,6 @@ async fn connect_sftp( session.keyfile(key); } - // set control directory to avoid temp files in root directory when panic - if let Some(dir) = dirs::runtime_dir() { - session.control_directory(dir); - } - - #[cfg(target_os = "macos")] - { - let _ = std::fs::create_dir("/private/tmp/.opendal/"); - session.control_directory("/private/tmp/.opendal/"); - } - session.known_hosts_check(known_hosts_strategy); let session = session.connect(&endpoint).await.map_err(parse_ssh_error)?; diff --git a/core/src/services/upyun/backend.rs b/core/src/services/upyun/backend.rs index a13764b945cc..c198bb32b50c 100644 --- a/core/src/services/upyun/backend.rs +++ b/core/src/services/upyun/backend.rs @@ -319,7 +319,7 @@ impl Accessor for UpyunBackend { let concurrent = args.concurrent(); let writer = UpyunWriter::new(self.core.clone(), args, path.to_string()); - let w = oio::MultipartUploadWriter::new(writer, concurrent); + let w = oio::MultipartWriter::new(writer, concurrent); Ok((RpWrite::default(), w)) } diff --git a/core/src/services/upyun/writer.rs b/core/src/services/upyun/writer.rs index b4f508212202..973585f1fe8b 100644 --- a/core/src/services/upyun/writer.rs +++ b/core/src/services/upyun/writer.rs @@ -26,7 +26,7 @@ use super::error::parse_error; use crate::raw::*; use crate::*; -pub type UpyunWriters = oio::MultipartUploadWriter; +pub type UpyunWriters = oio::MultipartWriter; pub struct UpyunWriter { core: Arc, @@ -41,7 +41,7 @@ impl UpyunWriter { } #[async_trait] -impl oio::MultipartUploadWrite for UpyunWriter { +impl oio::MultipartWrite for UpyunWriter { async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let req = self .core @@ -89,7 +89,7 @@ impl oio::MultipartUploadWrite for UpyunWriter { part_number: usize, size: u64, body: AsyncBody, - ) -> Result { + ) -> Result { let req = self .core .upload_part(&self.path, upload_id, part_number, size, body) @@ -103,7 +103,7 @@ impl oio::MultipartUploadWrite for UpyunWriter { StatusCode::NO_CONTENT | StatusCode::CREATED => { resp.into_body().consume().await?; - Ok(oio::MultipartUploadPart { + Ok(oio::MultipartPart { part_number, etag: "".to_string(), }) @@ -112,11 +112,7 @@ impl oio::MultipartUploadWrite for UpyunWriter { } } - async fn complete_part( - &self, - upload_id: &str, - _parts: &[oio::MultipartUploadPart], - ) -> Result<()> { + async fn complete_part(&self, upload_id: &str, _parts: &[oio::MultipartPart]) -> Result<()> { let resp = self .core .complete_multipart_upload(&self.path, upload_id) diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 80b9e3e72ed1..7944b03e3c2f 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -684,7 +684,7 @@ impl Accessor for WebhdfsBackend { let w = WebhdfsWriter::new(self.clone(), args.clone(), path.to_string()); let w = if args.append() { - WebhdfsWriters::Two(oio::AppendObjectWriter::new(w)) + WebhdfsWriters::Two(oio::AppendWriter::new(w)) } else { WebhdfsWriters::One(oio::BlockWriter::new(w, args.concurrent())) }; diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 52a955b1aed6..f405708b742b 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -24,7 +24,7 @@ use crate::raw::*; use crate::*; pub type WebhdfsWriters = - TwoWays, oio::AppendObjectWriter>; + TwoWays, oio::AppendWriter>; pub struct WebhdfsWriter { backend: WebhdfsBackend, @@ -153,7 +153,7 @@ impl oio::BlockWrite for WebhdfsWriter { } #[async_trait] -impl oio::AppendObjectWrite for WebhdfsWriter { +impl oio::AppendWrite for WebhdfsWriter { async fn offset(&self) -> Result { Ok(0) }