diff --git a/chroma_core/models/stratagem.py b/chroma_core/models/stratagem.py index a0bc204aec..878620d06a 100644 --- a/chroma_core/models/stratagem.py +++ b/chroma_core/models/stratagem.py @@ -393,7 +393,7 @@ def run(self, args): unique_id = args["uuid"] fs_name = args["fs_name"] - _, stratagem_result, mailbox_files = scan_result + parent_dir, stratagem_result, mailbox_files = scan_result # Send stratagem_results to time series database influx_entries = parse_stratagem_results_to_influx(temp_stratagem_measurement, fs_name, stratagem_result) @@ -402,7 +402,7 @@ def run(self, args): record_stratagem_point("\n".join(influx_entries)) mailbox_files = map(lambda xs: (xs[0], "{}-{}".format(unique_id, xs[1])), mailbox_files) - result = self.invoke_rust_agent_expect_result(host, "stream_fidlists_stratagem", mailbox_files) + result = self.invoke_rust_agent_expect_result(host, "stream_fidlists_stratagem", (parent_dir, mailbox_files)) return result @@ -562,7 +562,7 @@ def run(self, args): if duration is not None ] - action_list = filter(lambda (_, xs): path.exists("{}/{}".format(MAILBOX_PATH, xs[1])), action_list) + action_list = filter(lambda action: path.exists("{}/{}".format(MAILBOX_PATH, action[1][1])), action_list) file_location = pipe( action_list, diff --git a/iml-fs/src/remove_dir_all.rs b/iml-fs/src/remove_dir_all.rs new file mode 100644 index 0000000000..45912ae590 --- /dev/null +++ b/iml-fs/src/remove_dir_all.rs @@ -0,0 +1,67 @@ +use futures::{future::Future, Async, Poll}; +use std::{fs, io, path::Path, pin::Pin}; +use tokio_threadpool::blocking; + +/// tokio-fs.remove_dir_all + +/// Removes a directory at this path, after removing all its contents. Use carefully! +/// +/// This is an async version of [`std::fs::remove_dir_all`][std] +/// +/// [std]: https://doc.rust-lang.org/std/fs/fn.remove_dir_all.html +pub fn remove_dir_all>(path: P) -> RemoveDirAllFuture

{ + RemoveDirAllFuture::new(path) +} + +/// Future returned by `remove_dir_all`. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct RemoveDirAllFuture

+where + P: AsRef, +{ + path: P, +} + +impl

RemoveDirAllFuture

+where + P: AsRef, +{ + fn new(path: P) -> RemoveDirAllFuture

{ + RemoveDirAllFuture { path } + } +} + +///our future FeaturesFuture +impl

Future for RemoveDirAllFuture

+where + P: AsRef, +{ + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll { + blocking_io(|| fs::remove_dir_all(&self.path)) + } +} + +/// tokio-fs.lib +fn blocking_io(f: F) -> Poll +where + F: FnOnce() -> io::Result, +{ + match tokio_threadpool::blocking(f) { + Ok(Async::Ready(Ok(v))) => Ok(v.into()), + Ok(Async::Ready(Err(err))) => Err(err), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => Err(blocking_err()), + } +} + +fn blocking_err() -> io::Error { + io::Error::new( + io::ErrorKind::Other, + "`blocking` annotated I/O must be called \ + from the context of the Tokio runtime.", + ) +} diff --git a/iml-fs/src/remove_dir_all_tokio.rs b/iml-fs/src/remove_dir_all_tokio.rs new file mode 100644 index 0000000000..4326982c18 --- /dev/null +++ b/iml-fs/src/remove_dir_all_tokio.rs @@ -0,0 +1,75 @@ +use futures::{ + Async, + Poll as FeaturePoll, +}; +use std::{ + fs, + future::Future, + io, + path::Path, + pin::Pin, + task::{Context, Poll}, +}; +use tokio_threadpool::blocking; + + +/// tokio-fs.remove_dir_all + +/// Removes a directory at this path, after removing all its contents. Use carefully! +/// +/// This is an async version of [`std::fs::remove_dir_all`][std] +/// +/// [std]: https://doc.rust-lang.org/std/fs/fn.remove_dir_all.html +pub fn remove_dir_all>(path: P) -> RemoveDirAllFuture

{ + RemoveDirAllFuture::new(path) +} + +/// Future returned by `remove_dir_all`. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct RemoveDirAllFuture

+where + P: AsRef, +{ + path: P, +} + +impl

RemoveDirAllFuture

+where + P: AsRef, +{ + fn new(path: P) -> RemoveDirAllFuture

{ + RemoveDirAllFuture { path } + } +} + +impl

Future for RemoveDirAllFuture

+where + P: AsRef, +{ + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + blocking_io(|| fs::remove_dir_all(&self.path)) + } +} + +/// tokio-fs.lib +fn blocking_io(f: F) -> Poll> +where + F: FnOnce() -> io::Result, +{ + match tokio_threadpool::blocking(f) { + FeaturePoll::Ok(Async::Ready(v)) => Poll::Ready(v), + FeaturePoll::Ok(Async::NotReady) => Poll::Pending, + FeaturePoll::Err(_) => Poll::Ready(io::Result::Err(blocking_err())), + } +} + +fn blocking_err() -> io::Error { + io::Error::new( + io::ErrorKind::Other, + "`blocking` annotated I/O must be called \ + from the context of the Tokio runtime.", + ) +}