diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 11485cb57cbd..085e15c7bf05 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -515,156 +515,16 @@ impl Operator { /// /// # Options /// - /// ## `range` + /// Visit [`FutureRead`] for all available options. /// - /// Set `range` for this `read` request. - /// - /// If we have a file with size `n`. - /// - /// - `..` means read bytes in range `[0, n)` of file. - /// - `0..1024` and `..1024` means read bytes in range `[0, 1024)` of file - /// - `1024..` means read bytes in range `[1024, n)` of file - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::TryStreamExt; - /// # async fn test(op: Operator) -> Result<()> { - /// let bs = op.read_with("path/to/file").range(0..1024).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `concurrent` - /// - /// Set `concurrent` for the reader. - /// - /// OpenDAL by default to write file without concurrent. This is not efficient for cases when users - /// read large chunks of data. By setting `concurrent`, opendal will read files concurrently - /// on support storage services. - /// - /// By setting `concurrent`, opendal will fetch chunks concurrently with - /// the given chunk size. - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use opendal::Scheme; - /// # async fn test(op: Operator) -> Result<()> { - /// let r = op.read_with("path/to/file").concurrent(8).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `chunk` - /// - /// OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs. - /// - /// This following example will make opendal read data in 4MiB chunks: - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use opendal::Scheme; - /// # async fn test(op: Operator) -> Result<()> { - /// let r = op.read_with("path/to/file").chunk(4 * 1024 * 1024).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `version` - /// - /// Set `version` for this `read` request. - /// - /// This feature can be used to retrieve the data of a specified version of the given path. - /// - /// If the version doesn't exist, an error with kind [`ErrorKind::NotFound`] will be returned. - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// - /// # async fn test(op: Operator, version: &str) -> Result<()> { - /// let mut bs = op.read_with("path/to/file").version(version).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_match` - /// - /// Set `if_match` for this `read` request. - /// - /// This feature can be used to check if the file's `ETag` matches the given `ETag`. - /// - /// If file exists and it's etag doesn't match, an error with kind [`ErrorKind::ConditionNotMatch`] - /// will be returned. - /// - /// ``` - /// # use opendal::Result; - /// use opendal::Operator; - /// # async fn test(op: Operator, etag: &str) -> Result<()> { - /// let mut metadata = op.read_with("path/to/file").if_match(etag).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_none_match` - /// - /// Set `if_none_match` for this `read` request. - /// - /// This feature can be used to check if the file's `ETag` doesn't match the given `ETag`. - /// - /// If file exists and it's etag match, an error with kind [`ErrorKind::ConditionNotMatch`] - /// will be returned. - /// - /// ``` - /// # use opendal::Result; - /// use opendal::Operator; - /// # async fn test(op: Operator, etag: &str) -> Result<()> { - /// let mut metadata = op.read_with("path/to/file").if_none_match(etag).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_modified_since` - /// - /// Set `if_modified_since` for this `read` request. - /// - /// This feature can be used to check if the file has been modified since the given timestamp. - /// - /// If file exists and it hasn't been modified since the specified time, an error with kind - /// [`ErrorKind::ConditionNotMatch`] will be returned. - /// - /// ``` - /// # use opendal::Result; - /// use opendal::Operator; - /// use chrono::DateTime; - /// use chrono::Utc; - /// # async fn test(op: Operator, time: DateTime) -> Result<()> { - /// let mut metadata = op.read_with("path/to/file").if_modified_since(time).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_unmodified_since` - /// - /// Set `if_unmodified_since` for this `read` request. - /// - /// This feature can be used to check if the file hasn't been modified since the given timestamp. - /// - /// If file exists and it has been modified since the specified time, an error with kind - /// [`ErrorKind::ConditionNotMatch`] will be returned. - /// - /// ``` - /// # use opendal::Result; - /// use opendal::Operator; - /// use chrono::DateTime; - /// use chrono::Utc; - /// # async fn test(op: Operator, time: DateTime) -> Result<()> { - /// let mut metadata = op.read_with("path/to/file").if_unmodified_since(time).await?; - /// # Ok(()) - /// # } - /// ``` + /// - [`range`](./operator_futures/type.FutureRead.html#method.version): Set `range` for the read. + /// - [`concurrent`](./operator_futures/type.FutureRead.html#method.concurrent): Set `concurrent` for the read. + /// - [`chunk`](./operator_futures/type.FutureRead.html#method.chunk): Set `chunk` for the read. + /// - [`version`](./operator_futures/type.FutureRead.html#method.version): Set `version` for the read. + /// - [`if_match`](./operator_futures/type.FutureRead.html#method.if_match): Set `if-match` for the read. + /// - [`if_none_match`](./operator_futures/type.FutureRead.html#method.if_none_match): Set `if-none-match` for the read. + /// - [`if_modified_since`](./operator_futures/type.FutureRead.html#method.if_modified_since): Set `if-modified-since` for the read. + /// - [`if_unmodified_since`](./operator_futures/type.FutureRead.html#method.if_unmodified_since): Set `if-unmodified-since` for the read. /// /// # Examples /// @@ -745,169 +605,16 @@ impl Operator { /// /// # Options /// - /// ## `concurrent` - /// - /// Set `concurrent` for the reader. - /// - /// OpenDAL by default to write file without concurrent. This is not efficient for cases when users - /// read large chunks of data. By setting `concurrent`, opendal will reading files concurrently - /// on support storage services. - /// - /// By setting `concurrent``, opendal will fetch chunks concurrently with - /// the give chunk size. - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use opendal::Scheme; - /// # async fn test(op: Operator) -> Result<()> { - /// let r = op.reader_with("path/to/file").concurrent(8).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `chunk` - /// - /// OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs. - /// - /// This following example will make opendal read data in 4MiB chunks: - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use opendal::Scheme; - /// # async fn test(op: Operator) -> Result<()> { - /// let r = op - /// .reader_with("path/to/file") - /// .chunk(4 * 1024 * 1024) - /// .await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `gap` - /// - /// Controls the optimization strategy for range reads in [`Reader::fetch`]. - /// - /// When performing range reads, if the gap between two requested ranges is smaller than - /// the configured `gap` size, OpenDAL will merge these ranges into a single read request - /// and discard the unrequested data in between. This helps reduce the number of API calls - /// to remote storage services. - /// - /// This optimization is particularly useful when performing multiple small range reads - /// that are close to each other, as it reduces the overhead of multiple network requests - /// at the cost of transferring some additional data. - /// - /// In this example, if two requested ranges are separated by less than 1MiB, - /// they will be merged into a single read request: - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use opendal::Scheme; - /// # async fn test(op: Operator) -> Result<()> { - /// let r = op - /// .reader_with("path/to/file") - /// .chunk(4 * 1024 * 1024) - /// .gap(1024 * 1024) // 1MiB gap - /// .await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `version` - /// - /// Set `version` for this `reader`. - /// - /// This feature can be used to retrieve the data of a specified version of the given path. - /// - /// If the version doesn't exist, an error with kind [`ErrorKind::NotFound`] will be returned. - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// - /// # async fn test(op: Operator, version: &str) -> Result<()> { - /// let mut r = op.reader_with("path/to/file").version(version).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_match` - /// - /// Set `if-match` for this `read` request. - /// - /// This feature can be used to check if the file's `ETag` matches the given `ETag`. - /// - /// If file exists and it's etag doesn't match, an error with kind [`ErrorKind::ConditionNotMatch`] - /// will be returned. - /// - /// ``` - /// # use opendal::Result; - /// use opendal::Operator; - /// # async fn test(op: Operator, etag: &str) -> Result<()> { - /// let mut r = op.reader_with("path/to/file").if_match(etag).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_none_match` - /// - /// Set `if-none-match` for this `read` request. - /// - /// This feature can be used to check if the file's `ETag` doesn't match the given `ETag`. - /// - /// If file exists and it's etag match, an error with kind [`ErrorKind::ConditionNotMatch`] - /// will be returned. - /// - /// ``` - /// # use opendal::Result; - /// use opendal::Operator; - /// # async fn test(op: Operator, etag: &str) -> Result<()> { - /// let mut r = op.reader_with("path/to/file").if_none_match(etag).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_modified_since` - /// - /// Set `if-modified-since` for this `read` request. - /// - /// This feature can be used to check if the file has been modified since the given timestamp. - /// - /// If file exists and it hasn't been modified since the specified time, an error with kind - /// [`ErrorKind::ConditionNotMatch`] will be returned. - /// - /// ``` - /// # use opendal::Result; - /// use opendal::Operator; - /// use chrono::DateTime; - /// use chrono::Utc; - /// # async fn test(op: Operator, time: DateTime) -> Result<()> { - /// let mut r = op.reader_with("path/to/file").if_modified_since(time).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_unmodified_since` + /// Visit [`FutureReader`] for all available options. /// - /// Set `if-unmodified-since` for this `read` request. - /// - /// This feature can be used to check if the file hasn't been modified since the given timestamp. - /// - /// If file exists and it has been modified since the specified time, an error with kind - /// [`ErrorKind::ConditionNotMatch`] will be returned. - /// - /// ``` - /// # use opendal::Result; - /// use opendal::Operator; - /// use chrono::DateTime; - /// use chrono::Utc; - /// # async fn test(op: Operator, time: DateTime) -> Result<()> { - /// let mut r = op.reader_with("path/to/file").if_unmodified_since(time).await?; - /// # Ok(()) - /// # } - /// ``` + /// - [`version`](./operator_futures/type.FutureReader.html#method.version): Set `version` for the reader. + /// - [`concurrent`](./operator_futures/type.FutureReader.html#method.concurrent): Set `concurrent` for the reader. + /// - [`chunk`](./operator_futures/type.FutureReader.html#method.chunk): Set `chunk` for the reader. + /// - [`gap`](./operator_futures/type.FutureReader.html#method.gap): Set `gap` for the reader. + /// - [`if_match`](./operator_futures/type.FutureReader.html#method.if_match): Set `if-match` for the reader. + /// - [`if_none_match`](./operator_futures/type.FutureReader.html#method.if_none_match): Set `if-none-match` for the reader. + /// - [`if_modified_since`](./operator_futures/type.FutureReader.html#method.if_modified_since): Set `if-modified-since` for the reader. + /// - [`if_unmodified_since`](./operator_futures/type.FutureReader.html#method.if_unmodified_since): Set `if-unmodified-since` for the reader. /// /// # Examples /// @@ -1150,966 +857,112 @@ impl Operator { /// Create a writer for streaming data to the given path with more options. /// - /// # Usages - /// - /// ## `append` - /// - /// Sets append mode for this write request. - /// - /// ### Capability + /// ## Options /// - /// Check [`Capability::write_can_append`] before using this feature. + /// Visit [`FutureWriter`] for all available options. /// - /// ### Behavior + /// - [`append`](./operator_futures/type.FutureWriter.html#method.append): Sets append mode for this write request. + /// - [`chunk`](./operator_futures/type.FutureWriter.html#method.chunk): Sets chunk size for buffered writes. + /// - [`concurrent`](./operator_futures/type.FutureWriter.html#method.concurrent): Sets concurrent write operations for this writer. + /// - [`cache_control`](./operator_futures/type.FutureWriter.html#method.cache_control): Sets cache control for this write request. + /// - [`content_type`](./operator_futures/type.FutureWriter.html#method.content_type): Sets content type for this write request. + /// - [`content_disposition`](./operator_futures/type.FutureWriter.html#method.content_disposition): Sets content disposition for this write request. + /// - [`content_encoding`](./operator_futures/type.FutureWriter.html#method.content_encoding): Sets content encoding for this write request. + /// - [`if_match`](./operator_futures/type.FutureWriter.html#method.if_match): Sets if-match for this write request. + /// - [`if_none_match`](./operator_futures/type.FutureWriter.html#method.if_none_match): Sets if-none-match for this write request. + /// - [`if_not_exist`](./operator_futures/type.FutureWriter.html#method.if_not_exist): Sets if-not-exist for this write request. + /// - [`user_metadata`](./operator_futures/type.FutureWriter.html#method.user_metadata): Sets user metadata for this write request. /// - /// - By default, write operations overwrite existing files - /// - When append is set to true: - /// - New data will be appended to the end of existing file - /// - If file doesn't exist, it will be created - /// - If not supported, will return an error - /// - /// This operation allows adding data to existing files instead of overwriting them. - /// - /// ### Example + /// ## Examples /// /// ``` /// # use opendal::Result; /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; /// use bytes::Bytes; /// /// # async fn test(op: Operator) -> Result<()> { - /// let mut w = op.writer_with("path/to/file").append(true).await?; + /// let mut w = op.writer_with("path/to/file") + /// .chunk(4*1024*1024) + /// .concurrent(8) + /// .await?; /// w.write(vec![0; 4096]).await?; /// w.write(vec![1; 4096]).await?; /// w.close().await?; /// # Ok(()) /// # } /// ``` + pub fn writer_with(&self, path: &str) -> FutureWriter>> { + let path = normalize_path(path); + + OperatorFuture::new( + self.inner().clone(), + path, + ( + OpWrite::default().merge_executor(self.default_executor.clone()), + OpWriter::default(), + ), + |inner, path, (args, options)| async move { + if !validate_path(&path, EntryMode::FILE) { + return Err( + Error::new(ErrorKind::IsADirectory, "write path is a directory") + .with_operation("Operator::writer") + .with_context("service", inner.info().scheme().into_static()) + .with_context("path", &path), + ); + } + + let context = WriteContext::new(inner, path, args, options); + let w = Writer::new(context).await?; + Ok(w) + }, + ) + } + + /// Write data with extra options. /// - /// ## `chunk` - /// - /// Sets chunk size for buffered writes. - /// - /// ### Capability + /// # Notes /// - /// Check [`Capability::write_multi_min_size`] and [`Capability::write_multi_max_size`] for size limits. + /// ## Streaming Write /// - /// ### Behavior + /// This method performs a single bulk write operation for all bytes. For finer-grained + /// memory control or lazy writing, consider using [`Operator::writer_with`] instead. /// - /// - By default, OpenDAL sets optimal chunk size based on service capabilities - /// - When chunk size is set: - /// - Data will be buffered until reaching chunk size - /// - One API call will be made per chunk - /// - Last chunk may be smaller than chunk size - /// - Important considerations: - /// - Some services require minimum chunk sizes (e.g. S3's EntityTooSmall error) - /// - Smaller chunks increase API calls and costs - /// - Larger chunks increase memory usage, but improve performance and reduce costs - /// - /// ### Performance Impact - /// - /// Setting appropriate chunk size can: - /// - Reduce number of API calls - /// - Improve overall throughput - /// - Lower operation costs - /// - Better utilize network bandwidth - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// // Set 8MiB chunk size - data will be sent in one API call at close - /// let mut w = op - /// .writer_with("path/to/file") - /// .chunk(8 * 1024 * 1024) - /// .await?; - /// w.write(vec![0; 4096]).await?; - /// w.write(vec![1; 4096]).await?; - /// w.close().await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// # `concurrent` - /// - /// Sets concurrent write operations for this writer. - /// - /// ## Behavior - /// - /// - By default, OpenDAL writes files sequentially - /// - When concurrent is set: - /// - Multiple write operations can execute in parallel - /// - Write operations return immediately without waiting if tasks space are available - /// - Close operation ensures all writes complete in order - /// - Memory usage increases with concurrency level - /// - If not supported, falls back to sequential writes - /// - /// This feature significantly improves performance when: - /// - Writing large files - /// - Network latency is high - /// - Storage service supports concurrent uploads like multipart uploads - /// - /// ## Performance Impact - /// - /// Setting appropriate concurrency can: - /// - Increase write throughput - /// - Reduce total write time - /// - Better utilize available bandwidth - /// - Trade memory for performance - /// - /// ## Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// // Enable concurrent writes with 8 parallel operations - /// let mut w = op.writer_with("path/to/file").concurrent(8).await?; - /// - /// // First write starts immediately - /// w.write(vec![0; 4096]).await?; - /// - /// // Second write runs concurrently with first - /// w.write(vec![1; 4096]).await?; - /// - /// // Ensures all writes complete successfully and in order - /// w.close().await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `cache_control` - /// - /// Sets Cache-Control header for this write operation. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_cache_control`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, sets Cache-Control as system metadata on the target file - /// - The value should follow HTTP Cache-Control header format - /// - If not supported, the value will be ignored - /// - /// This operation allows controlling caching behavior for the written content. - /// - /// ### Use Cases - /// - /// - Setting browser cache duration - /// - Configuring CDN behavior - /// - Optimizing content delivery - /// - Managing cache invalidation - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// // Cache content for 7 days (604800 seconds) - /// let mut w = op - /// .writer_with("path/to/file") - /// .cache_control("max-age=604800") - /// .await?; - /// w.write(vec![0; 4096]).await?; - /// w.write(vec![1; 4096]).await?; - /// w.close().await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ### References - /// - /// - [MDN Cache-Control](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control) - /// - [RFC 7234 Section 5.2](https://tools.ietf.org/html/rfc7234#section-5.2) - /// - /// ## `content_type` - /// - /// Sets `Content-Type` header for this write operation. - /// - /// ## Capability - /// - /// Check [`Capability::write_with_content_type`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, sets Content-Type as system metadata on the target file - /// - The value should follow MIME type format (e.g. "text/plain", "image/jpeg") - /// - If not supported, the value will be ignored - /// - /// This operation allows specifying the media type of the content being written. - /// - /// ## Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// // Set content type for plain text file - /// let mut w = op - /// .writer_with("path/to/file") - /// .content_type("text/plain") - /// .await?; - /// w.write(vec![0; 4096]).await?; - /// w.write(vec![1; 4096]).await?; - /// w.close().await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `content_disposition` - /// - /// Sets Content-Disposition header for this write request. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_content_disposition`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, sets Content-Disposition as system metadata on the target file - /// - The value should follow HTTP Content-Disposition header format - /// - Common values include: - /// - `inline` - Content displayed within browser - /// - `attachment` - Content downloaded as file - /// - `attachment; filename="example.jpg"` - Downloaded with specified filename - /// - If not supported, the value will be ignored - /// - /// This operation allows controlling how the content should be displayed or downloaded. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let mut w = op - /// .writer_with("path/to/file") - /// .content_disposition("attachment; filename=\"filename.jpg\"") - /// .await?; - /// w.write(vec![0; 4096]).await?; - /// w.write(vec![1; 4096]).await?; - /// w.close().await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `content_encoding` - /// - /// Sets Content-Encoding header for this write request. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_content_encoding`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, sets Content-Encoding as system metadata on the target file - /// - The value should follow HTTP Content-Encoding header format - /// - Common values include: - /// - `gzip` - Content encoded using gzip compression - /// - `deflate` - Content encoded using deflate compression - /// - `br` - Content encoded using Brotli compression - /// - `identity` - No encoding applied (default value) - /// - If not supported, the value will be ignored - /// - /// This operation allows specifying the encoding applied to the content being written. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let mut w = op - /// .writer_with("path/to/file") - /// .content_encoding("gzip") - /// .await?; - /// w.write(vec![0; 4096]).await?; - /// w.write(vec![1; 4096]).await?; - /// w.close().await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_match` - /// - /// Sets If-Match header for this write request. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_if_match`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, the write operation will only succeed if the target's ETag matches the specified value - /// - The value should be a valid ETag string - /// - Common values include: - /// - A specific ETag value like `"686897696a7c876b7e"` - /// - `*` - Matches any existing resource - /// - If not supported, the value will be ignored - /// - /// This operation provides conditional write functionality based on ETag matching, - /// helping prevent unintended overwrites in concurrent scenarios. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let mut w = op - /// .writer_with("path/to/file") - /// .if_match("\"686897696a7c876b7e\"") - /// .await?; - /// w.write(vec![0; 4096]).await?; - /// w.write(vec![1; 4096]).await?; - /// w.close().await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_none_match` - /// - /// Sets If-None-Match header for this write request. - /// - /// Note: Certain services, like `s3`, support `if_not_exists` but not `if_none_match`. - /// Use `if_not_exists` if you only want to check whether a file exists. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_if_none_match`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, the write operation will only succeed if the target's ETag does not match the specified value - /// - The value should be a valid ETag string - /// - Common values include: - /// - A specific ETag value like `"686897696a7c876b7e"` - /// - `*` - Matches if the resource does not exist - /// - If not supported, the value will be ignored - /// - /// This operation provides conditional write functionality based on ETag non-matching, - /// useful for preventing overwriting existing resources or ensuring unique writes. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let mut w = op - /// .writer_with("path/to/file") - /// .if_none_match("\"686897696a7c876b7e\"") - /// .await?; - /// w.write(vec![0; 4096]).await?; - /// w.write(vec![1; 4096]).await?; - /// w.close().await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_not_exists` - /// - /// Sets the condition that write operation will succeed only if target does not exist. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_if_not_exists`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, the write operation will only succeed if the target path does not exist - /// - Equivalent to setting `if_none_match("*")` if available - /// - Will return error if target already exists - /// - If not supported, the value will be ignored - /// - /// This operation provides a way to ensure write operations only create new resources - /// without overwriting existing ones, useful for implementing "create if not exists" logic. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let mut w = op - /// .writer_with("path/to/file") - /// .if_not_exists(true) - /// .await?; - /// w.write(vec![0; 4096]).await?; - /// w.write(vec![1; 4096]).await?; - /// w.close().await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `user_metadata` - /// - /// Sets user metadata for this write request. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_user_metadata`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, the user metadata will be attached to the object during write - /// - Accepts key-value pairs where both key and value are strings - /// - Keys are case-insensitive in most services - /// - Services may have limitations for user metadata, for example: - /// - Key length is typically limited (e.g., 1024 bytes) - /// - Value length is typically limited (e.g., 4096 bytes) - /// - Total metadata size might be limited - /// - Some characters might be forbidden in keys - /// - If not supported, the metadata will be ignored - /// - /// User metadata provides a way to attach custom metadata to objects during write operations. - /// This metadata can be retrieved later when reading the object. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let mut w = op - /// .writer_with("path/to/file") - /// .user_metadata([ - /// ("language".to_string(), "rust".to_string()), - /// ("author".to_string(), "OpenDAL".to_string()), - /// ]) - /// .await?; - /// w.write(vec![0; 4096]).await?; - /// w.close().await?; - /// # Ok(()) - /// # } - /// ``` - pub fn writer_with(&self, path: &str) -> FutureWriter>> { - let path = normalize_path(path); - - OperatorFuture::new( - self.inner().clone(), - path, - ( - OpWrite::default().merge_executor(self.default_executor.clone()), - OpWriter::default(), - ), - |inner, path, (args, options)| async move { - if !validate_path(&path, EntryMode::FILE) { - return Err( - Error::new(ErrorKind::IsADirectory, "write path is a directory") - .with_operation("Operator::writer") - .with_context("service", inner.info().scheme().into_static()) - .with_context("path", &path), - ); - } - - let context = WriteContext::new(inner, path, args, options); - let w = Writer::new(context).await?; - Ok(w) - }, - ) - } - - /// Write data with extra options. - /// - /// # Notes - /// - /// ## Streaming Write - /// - /// This method performs a single bulk write operation for all bytes. For finer-grained - /// memory control or lazy writing, consider using [`Operator::writer_with`] instead. - /// - /// ## Multipart Uploads + /// ## Multipart Uploads /// /// OpenDAL handles multipart uploads through the [`Writer`] abstraction, managing all /// the upload details automatically. You can customize the upload behavior by configuring /// `chunk` size and `concurrent` operations via [`Operator::writer_with`]. /// - /// # Usages - /// - /// ## `append` - /// - /// Sets append mode for this write request. - /// - /// ### Capability - /// - /// Check [`Capability::write_can_append`] before using this feature. - /// - /// ### Behavior - /// - /// - By default, write operations overwrite existing files - /// - When append is set to true: - /// - New data will be appended to the end of existing file - /// - If file doesn't exist, it will be created - /// - If not supported, will return an error - /// - /// This operation allows adding data to existing files instead of overwriting them. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let _ = op.write_with("path/to/file", vec![0; 4096]).append(true).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `chunk` - /// - /// Sets chunk size for buffered writes. - /// - /// ### Capability - /// - /// Check [`Capability::write_multi_min_size`] and [`Capability::write_multi_max_size`] for size limits. - /// - /// ### Behavior - /// - /// - By default, OpenDAL sets optimal chunk size based on service capabilities - /// - When chunk size is set: - /// - Data will be buffered until reaching chunk size - /// - One API call will be made per chunk - /// - Last chunk may be smaller than chunk size - /// - Important considerations: - /// - Some services require minimum chunk sizes (e.g. S3's EntityTooSmall error) - /// - Smaller chunks increase API calls and costs - /// - Larger chunks increase memory usage, but improve performance and reduce costs - /// - /// ### Performance Impact - /// - /// Setting appropriate chunk size can: - /// - Reduce number of API calls - /// - Improve overall throughput - /// - Lower operation costs - /// - Better utilize network bandwidth - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// // Set 8MiB chunk size - data will be sent in one API call at close - /// let _ = op - /// .write_with("path/to/file", vec![0; 4096]) - /// .chunk(8 * 1024 * 1024) - /// .await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// # `concurrent` - /// - /// Sets concurrent write operations for this writer. - /// - /// ## Behavior - /// - /// - By default, OpenDAL writes files sequentially - /// - When concurrent is set: - /// - Multiple write operations can execute in parallel - /// - Write operations return immediately without waiting if tasks space are available - /// - Close operation ensures all writes complete in order - /// - Memory usage increases with concurrency level - /// - If not supported, falls back to sequential writes - /// - /// This feature significantly improves performance when: - /// - Writing large files - /// - Network latency is high - /// - Storage service supports concurrent uploads like multipart uploads - /// - /// ## Performance Impact - /// - /// Setting appropriate concurrency can: - /// - Increase write throughput - /// - Reduce total write time - /// - Better utilize available bandwidth - /// - Trade memory for performance - /// - /// ## Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// // Enable concurrent writes with 8 parallel operations at 128B chunk. - /// let _ = op.write_with("path/to/file", vec![0; 4096]).chunk(128).concurrent(8).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `cache_control` - /// - /// Sets Cache-Control header for this write operation. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_cache_control`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, sets Cache-Control as system metadata on the target file - /// - The value should follow HTTP Cache-Control header format - /// - If not supported, the value will be ignored - /// - /// This operation allows controlling caching behavior for the written content. - /// - /// ### Use Cases - /// - /// - Setting browser cache duration - /// - Configuring CDN behavior - /// - Optimizing content delivery - /// - Managing cache invalidation - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// // Cache content for 7 days (604800 seconds) - /// let _ = op - /// .write_with("path/to/file", vec![0; 4096]) - /// .cache_control("max-age=604800") - /// .await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ### References - /// - /// - [MDN Cache-Control](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control) - /// - [RFC 7234 Section 5.2](https://tools.ietf.org/html/rfc7234#section-5.2) - /// - /// ## `content_type` - /// - /// Sets `Content-Type` header for this write operation. - /// - /// ## Capability - /// - /// Check [`Capability::write_with_content_type`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, sets Content-Type as system metadata on the target file - /// - The value should follow MIME type format (e.g. "text/plain", "image/jpeg") - /// - If not supported, the value will be ignored - /// - /// This operation allows specifying the media type of the content being written. - /// - /// ## Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// // Set content type for plain text file - /// let _ = op - /// .write_with("path/to/file", vec![0; 4096]) - /// .content_type("text/plain") - /// .await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `content_disposition` - /// - /// Sets Content-Disposition header for this write request. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_content_disposition`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, sets Content-Disposition as system metadata on the target file - /// - The value should follow HTTP Content-Disposition header format - /// - Common values include: - /// - `inline` - Content displayed within browser - /// - `attachment` - Content downloaded as file - /// - `attachment; filename="example.jpg"` - Downloaded with specified filename - /// - If not supported, the value will be ignored - /// - /// This operation allows controlling how the content should be displayed or downloaded. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let _ = op - /// .write_with("path/to/file", vec![0; 4096]) - /// .content_disposition("attachment; filename=\"filename.jpg\"") - /// .await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `content_encoding` - /// - /// Sets Content-Encoding header for this write request. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_content_encoding`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, sets Content-Encoding as system metadata on the target file - /// - The value should follow HTTP Content-Encoding header format - /// - Common values include: - /// - `gzip` - Content encoded using gzip compression - /// - `deflate` - Content encoded using deflate compression - /// - `br` - Content encoded using Brotli compression - /// - `identity` - No encoding applied (default value) - /// - If not supported, the value will be ignored - /// - /// This operation allows specifying the encoding applied to the content being written. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let _ = op - /// .write_with("path/to/file", vec![0; 4096]) - /// .content_encoding("gzip") - /// .await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_match` - /// - /// Sets If-Match header for this write request. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_if_match`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, the write operation will only succeed if the target's ETag matches the specified value - /// - The value should be a valid ETag string - /// - Common values include: - /// - A specific ETag value like `"686897696a7c876b7e"` - /// - `*` - Matches any existing resource - /// - If not supported, the value will be ignored - /// - /// This operation provides conditional write functionality based on ETag matching, - /// helping prevent unintended overwrites in concurrent scenarios. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let _ = op - /// .write_with("path/to/file", vec![0; 4096]) - /// .if_match("\"686897696a7c876b7e\"") - /// .await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_none_match` - /// - /// Sets If-None-Match header for this write request. - /// - /// Note: Certain services, like `s3`, support `if_not_exists` but not `if_none_match`. - /// Use `if_not_exists` if you only want to check whether a file exists. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_if_none_match`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, the write operation will only succeed if the target's ETag does not match the specified value - /// - The value should be a valid ETag string - /// - Common values include: - /// - A specific ETag value like `"686897696a7c876b7e"` - /// - `*` - Matches if the resource does not exist - /// - If not supported, the value will be ignored - /// - /// This operation provides conditional write functionality based on ETag non-matching, - /// useful for preventing overwriting existing resources or ensuring unique writes. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let _ = op - /// .write_with("path/to/file", vec![0; 4096]) - /// .if_none_match("\"686897696a7c876b7e\"") - /// .await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// ## `if_not_exists` - /// - /// Sets the condition that write operation will succeed only if target does not exist. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_if_not_exists`] before using this feature. - /// - /// ### Behavior + /// # Options /// - /// - If supported, the write operation will only succeed if the target path does not exist - /// - Equivalent to setting `if_none_match("*")` if available - /// - Will return error if target already exists - /// - If not supported, the value will be ignored + /// Visit [`FutureWrite`] for all available options. /// - /// This operation provides a way to ensure write operations only create new resources - /// without overwriting existing ones, useful for implementing "create if not exists" logic. + /// - [`append`](./operator_futures/type.FutureWrite.html#method.append): Sets append mode for this write request. + /// - [`chunk`](./operator_futures/type.FutureWrite.html#method.chunk): Sets chunk size for buffered writes. + /// - [`concurrent`](./operator_futures/type.FutureWrite.html#method.concurrent): Sets concurrent write operations for this writer. + /// - [`cache_control`](./operator_futures/type.FutureWrite.html#method.cache_control): Sets cache control for this write request. + /// - [`content_type`](./operator_futures/type.FutureWrite.html#method.content_type): Sets content type for this write request. + /// - [`content_disposition`](./operator_futures/type.FutureWrite.html#method.content_disposition): Sets content disposition for this write request. + /// - [`content_encoding`](./operator_futures/type.FutureWrite.html#method.content_encoding): Sets content encoding for this write request. + /// - [`if_match`](./operator_futures/type.FutureWrite.html#method.if_match): Sets if-match for this write request. + /// - [`if_none_match`](./operator_futures/type.FutureWrite.html#method.if_none_match): Sets if-none-match for this write request. + /// - [`if_not_exist`](./operator_futures/type.FutureWrite.html#method.if_not_exist): Sets if-not-exist for this write request. + /// - [`user_metadata`](./operator_futures/type.FutureWrite.html#method.user_metadata): Sets user metadata for this write request. /// - /// ### Example + /// # Examples /// /// ``` /// # use opendal::Result; /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; /// use bytes::Bytes; /// /// # async fn test(op: Operator) -> Result<()> { - /// let _ = op - /// .write_with("path/to/file", vec![0; 4096]) + /// let _ = op.write_with("path/to/file", vec![0; 4096]) /// .if_not_exists(true) /// .await?; /// # Ok(()) /// # } /// ``` - /// - /// ## `user_metadata` - /// - /// Sets user metadata for this write request. - /// - /// ### Capability - /// - /// Check [`Capability::write_with_user_metadata`] before using this feature. - /// - /// ### Behavior - /// - /// - If supported, the user metadata will be attached to the object during write - /// - Accepts key-value pairs where both key and value are strings - /// - Keys are case-insensitive in most services - /// - Services may have limitations for user metadata, for example: - /// - Key length is typically limited (e.g., 1024 bytes) - /// - Value length is typically limited (e.g., 4096 bytes) - /// - Total metadata size might be limited - /// - Some characters might be forbidden in keys - /// - If not supported, the metadata will be ignored - /// - /// User metadata provides a way to attach custom metadata to objects during write operations. - /// This metadata can be retrieved later when reading the object. - /// - /// ### Example - /// - /// ``` - /// # use opendal::Result; - /// # use opendal::Operator; - /// # use futures::StreamExt; - /// # use futures::SinkExt; - /// use bytes::Bytes; - /// - /// # async fn test(op: Operator) -> Result<()> { - /// let _ = op - /// .write_with("path/to/file", vec![0; 4096]) - /// .user_metadata([ - /// ("language".to_string(), "rust".to_string()), - /// ("author".to_string(), "OpenDAL".to_string()), - /// ]) - /// .await?; - /// # Ok(()) - /// # } - /// ``` pub fn write_with( &self, path: &str, diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index bbfa02657db7..c847b53745e2 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -196,7 +196,7 @@ impl>> FuturePresignWrite { } } -/// Future that generated by [`Operator::read_with`] or [`Operator::reader_with`]. +/// Future that generated by [`Operator::read_with`]. /// /// Users can add more options by public functions provided by this struct. pub type FutureRead = OperatorFuture<(OpRead, OpReader), Buffer, F>; @@ -404,7 +404,7 @@ impl>> FutureReader { /// read large chunks of data. By setting `concurrent`, opendal will reading files concurrently /// on support storage services. /// - /// By setting `concurrent``, opendal will fetch chunks concurrently with + /// By setting `concurrent`, opendal will fetch chunks concurrently with /// the give chunk size. /// /// ```