diff --git a/bin/oli/README.md b/bin/oli/README.md index d1ea13d1fa06..0a44e5a0ab5c 100644 --- a/bin/oli/README.md +++ b/bin/oli/README.md @@ -32,7 +32,7 @@ cargo install oli --all-features - `~/Library/Application Support/oli/config.toml` on macOS - `C:\Users\\AppData\Roaming\oli\config.toml` on Windows -The content of `config.toml` should be follow these pattern: +The content of `config.toml` should follow these pattern: ```toml [profiles.] diff --git a/bin/oli/src/bin/oli.rs b/bin/oli/src/bin/oli.rs index 99ba3d276307..8ae115602f4a 100644 --- a/bin/oli/src/bin/oli.rs +++ b/bin/oli/src/bin/oli.rs @@ -74,6 +74,10 @@ async fn main() -> Result<()> { let cmd: oli::commands::stat::StatCmd = clap::Parser::parse(); cmd.run().await?; } + Some("omv") => { + let cmd: oli::commands::mv::MoveCmd = clap::Parser::parse(); + cmd.run().await?; + } Some(v) => { println!("{v} is not supported") } diff --git a/bin/oli/src/commands/mod.rs b/bin/oli/src/commands/mod.rs index e70d0c101a69..0ab829edb647 100644 --- a/bin/oli/src/commands/mod.rs +++ b/bin/oli/src/commands/mod.rs @@ -20,6 +20,7 @@ pub mod cat; pub mod cp; pub mod ls; +pub mod mv; pub mod rm; pub mod stat; @@ -30,6 +31,7 @@ pub enum OliSubcommand { Ls(ls::LsCmd), Rm(rm::RmCmd), Stat(stat::StatCmd), + Mv(mv::MoveCmd), } impl OliSubcommand { @@ -40,6 +42,7 @@ impl OliSubcommand { Self::Ls(cmd) => cmd.run().await, Self::Rm(cmd) => cmd.run().await, Self::Stat(cmd) => cmd.run().await, + Self::Mv(cmd) => cmd.run().await, } } } diff --git a/bin/oli/src/commands/mv.rs b/bin/oli/src/commands/mv.rs new file mode 100644 index 000000000000..3f5fa093a010 --- /dev/null +++ b/bin/oli/src/commands/mv.rs @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::config::Config; +use crate::params::config::ConfigParams; +use anyhow::{Error, Result}; +use futures::{AsyncWriteExt, TryStreamExt}; +use opendal::Operator; +use std::path::Path; + +#[derive(Debug, clap::Parser)] +#[command(name = "mv", about = "Move object", disable_version_flag = true)] +pub struct MoveCmd { + #[command(flatten)] + pub config_params: ConfigParams, + #[arg()] + pub source: String, + #[arg()] + pub destination: String, + /// Move objects recursively. + #[arg(short = 'r', long)] + pub recursive: bool, +} + +impl MoveCmd { + pub async fn run(&self) -> Result<()> { + let cfg = Config::load(&self.config_params.config)?; + + let (src_op, src_path) = cfg.parse_location(&self.source)?; + let (dst_op, dst_path) = cfg.parse_location(&self.destination)?; + + let src_meta = src_op.stat(&src_path).await?; + if !self.recursive || src_meta.is_file() { + if src_meta.is_dir() { + return Err(Error::msg("can not move a directory in non-recursive mode")); + } + + let mut actual_dst_path = dst_path.clone(); + if let Ok(meta) = dst_op.stat(&dst_path).await { + if meta.is_dir() && !dst_path.ends_with("/") { + actual_dst_path.push('/'); + } + } + if actual_dst_path.is_empty() || actual_dst_path.ends_with("/") { + let file_name = src_path.rsplit_once("/").unwrap_or(("", &src_path)).1; + actual_dst_path.push_str(file_name); + } + + println!("Moving: {}", src_path); + self.cp_file( + &src_op, + &src_path, + &dst_op, + &actual_dst_path, + src_meta.content_length(), + ) + .await?; + src_op.delete(&src_path).await?; + + return Ok(()); + } + + let dst_root = Path::new(&dst_path); + let prefix = src_path.strip_prefix('/').unwrap_or(src_path.as_str()); + let mut lst = src_op.lister_with(&src_path).recursive(true).await?; + while let Some(entry) = lst.try_next().await? { + let path = entry.path(); + if path == src_path { + continue; + } + + let suffix = path.strip_prefix(prefix).expect("invalid path"); + let depath = dst_root.join(suffix); + + println!("Moving: {}", path); + let meta = entry.metadata(); + if meta.is_dir() { + dst_op.create_dir(&depath.to_string_lossy()).await?; + src_op.delete(path).await?; + continue; + } + + let path_metadata = src_op.stat(path).await?; + self.cp_file( + &src_op, + path, + &dst_op, + &depath.to_string_lossy(), + path_metadata.content_length(), + ) + .await?; + + src_op.delete(path).await?; + } + + Ok(()) + } + + async fn cp_file( + &self, + src_op: &Operator, + src_path: &str, + dst_op: &Operator, + dst_path: &str, + length: u64, + ) -> Result<()> { + let src_reader = src_op + .reader_with(src_path) + .chunk(8 * 1024 * 1024) + .await? + .into_futures_async_read(0..length) + .await?; + + let mut dst_writer = dst_op.writer(dst_path).await?.into_futures_async_write(); + + futures::io::copy_buf(src_reader, &mut dst_writer).await?; + dst_writer.close().await?; + + Ok(()) + } +} diff --git a/bin/oli/src/config/mod.rs b/bin/oli/src/config/mod.rs index ec401f367488..3ed44e0246cc 100644 --- a/bin/oli/src/config/mod.rs +++ b/bin/oli/src/config/mod.rs @@ -37,7 +37,7 @@ pub struct Config { profiles: HashMap>, } -/// resolve_relative_path turns a relative path to a absolute path. +/// resolve_relative_path turns a relative path to an absolute path. /// /// The reason why we don't use `fs::canonicalize` here is `fs::canonicalize` /// will return an error if the path does not exist, which is unwanted. diff --git a/bin/oli/tests/mv.rs b/bin/oli/tests/mv.rs new file mode 100644 index 000000000000..926a937a684e --- /dev/null +++ b/bin/oli/tests/mv.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use anyhow::Result; +use assert_cmd::Command; +use std::fs; + +#[tokio::test] +async fn test_basic_mv() -> Result<()> { + let dir = tempfile::tempdir()?; + let src_path = dir.path().join("src.txt"); + let dst_path = dir.path().join("dst.txt"); + let expect = "hello"; + fs::write(&src_path, expect)?; + + let mut cmd = Command::cargo_bin("oli")?; + cmd.arg("mv") + .arg(src_path.as_os_str()) + .arg(dst_path.as_os_str()); + cmd.assert().success(); + + let actual = fs::read_to_string(&dst_path)?; + assert_eq!(actual, expect); + + assert!(!fs::exists(&src_path)?); + + Ok(()) +} + +#[tokio::test] +async fn test_move_a_file_to_a_dir() -> Result<()> { + let src_dir = tempfile::tempdir()?; + let src_path = src_dir.path().join("src.txt"); + let expect = "hello"; + fs::write(&src_path, expect)?; + + let dst_dir = tempfile::tempdir()?; + let dst_path = dst_dir.path().join("dir/"); + + let mut cmd = Command::cargo_bin("oli")?; + cmd.arg("mv") + .arg(src_path.as_os_str()) + .arg(dst_path.as_os_str()); + cmd.assert().success(); + + let dst_path = dst_path.join("src.txt"); + let actual = fs::read_to_string(&dst_path)?; + assert_eq!(actual, expect); + + assert!(!fs::exists(&src_path)?); + + Ok(()) +} + +#[tokio::test] +async fn test_mv_with_recursive() -> Result<()> { + let src_root = tempfile::tempdir()?; + let src_path = src_root.path().join("src/"); + fs::create_dir(&src_path)?; + + let src_file1 = src_path.as_path().join("file1.txt"); + let file1_content = "file1"; + fs::write(&src_file1, file1_content).expect("write file1 error"); + + let src_dir = src_path.join("dir/"); + fs::create_dir(&src_dir)?; + let src_file2 = src_dir.as_path().join("file2.txt"); + let file2_content = "file2"; + fs::write(&src_file2, file2_content).expect("write file2 error"); + + let src_empty_dir = src_path.join("empty_dir/"); + fs::create_dir(&src_empty_dir)?; + + let dst_path = tempfile::tempdir()?; + + let mut cmd = Command::cargo_bin("oli")?; + cmd.arg("mv") + .arg(src_path.as_os_str()) + .arg(dst_path.path().as_os_str()) + .arg("-r"); + cmd.assert().success(); + + let dst_file1_content = + fs::read_to_string(dst_path.path().join("file1.txt")).expect("read file1 error"); + assert_eq!(dst_file1_content, file1_content); + let dst_file2_content = + fs::read_to_string(dst_path.path().join("dir/file2.txt")).expect("read dir/file2 error"); + assert_eq!(dst_file2_content, file2_content); + assert!(fs::exists(dst_path.path().join("empty_dir/"))?); + + // src_path is empty now + let mut src_data = fs::read_dir(&src_path)?; + assert!(src_data.next().is_none()); + + Ok(()) +} diff --git a/core/src/types/entry.rs b/core/src/types/entry.rs index 9fe4510806fe..cdb70e91b6f3 100644 --- a/core/src/types/entry.rs +++ b/core/src/types/entry.rs @@ -62,7 +62,7 @@ impl Entry { &self.metadata } - /// Consume this entry to get it's path and metadata. + /// Consume this entry to get its path and metadata. pub fn into_parts(self) -> (String, Metadata) { (self.path, self.metadata) }