Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

WIP IML-1329 move mailbox tmpdir #1365

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions chroma_core/models/stratagem.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def run(self, args):
unique_id = args["uuid"]
fs_name = args["fs_name"]

_, stratagem_result, mailbox_files = scan_result
parent_dir, stratagem_result, mailbox_files = scan_result

# Send stratagem_results to time series database
influx_entries = parse_stratagem_results_to_influx(temp_stratagem_measurement, fs_name, stratagem_result)
Expand All @@ -402,7 +402,7 @@ def run(self, args):
record_stratagem_point("\n".join(influx_entries))

mailbox_files = map(lambda xs: (xs[0], "{}-{}".format(unique_id, xs[1])), mailbox_files)
result = self.invoke_rust_agent_expect_result(host, "stream_fidlists_stratagem", mailbox_files)
result = self.invoke_rust_agent_expect_result(host, "stream_fidlists_stratagem", (parent_dir, mailbox_files))

return result

Expand Down Expand Up @@ -562,7 +562,7 @@ def run(self, args):
if duration is not None
]

action_list = filter(lambda (_, xs): path.exists("{}/{}".format(MAILBOX_PATH, xs[1])), action_list)
action_list = filter(lambda action: path.exists("{}/{}".format(MAILBOX_PATH, action[1][1])), action_list)

file_location = pipe(
action_list,
Expand Down
67 changes: 67 additions & 0 deletions iml-fs/src/remove_dir_all.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use futures::{future::Future, Async, Poll};
use std::{fs, io, path::Path, pin::Pin};
use tokio_threadpool::blocking;

/// tokio-fs.remove_dir_all

/// Removes a directory at this path, after removing all its contents. Use carefully!
///
/// This is an async version of [`std::fs::remove_dir_all`][std]
///
/// [std]: https://doc.rust-lang.org/std/fs/fn.remove_dir_all.html
pub fn remove_dir_all<P: AsRef<Path>>(path: P) -> RemoveDirAllFuture<P> {
RemoveDirAllFuture::new(path)
}

/// Future returned by `remove_dir_all`.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct RemoveDirAllFuture<P>
where
P: AsRef<Path>,
{
path: P,
}

impl<P> RemoveDirAllFuture<P>
where
P: AsRef<Path>,
{
fn new(path: P) -> RemoveDirAllFuture<P> {
RemoveDirAllFuture { path }
}
}

///our future FeaturesFuture
impl<P> Future for RemoveDirAllFuture<P>
where
P: AsRef<Path>,
{
type Item = ();
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
blocking_io(|| fs::remove_dir_all(&self.path))
}
}

/// tokio-fs.lib
fn blocking_io<F, T>(f: F) -> Poll<T, io::Error>
where
F: FnOnce() -> io::Result<T>,
{
match tokio_threadpool::blocking(f) {
Ok(Async::Ready(Ok(v))) => Ok(v.into()),
Ok(Async::Ready(Err(err))) => Err(err),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(blocking_err()),
}
}

fn blocking_err() -> io::Error {
io::Error::new(
io::ErrorKind::Other,
"`blocking` annotated I/O must be called \
from the context of the Tokio runtime.",
)
}
75 changes: 75 additions & 0 deletions iml-fs/src/remove_dir_all_tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use futures::{
Async,
Poll as FeaturePoll,
};
use std::{
fs,
future::Future,
io,
path::Path,
pin::Pin,
task::{Context, Poll},
};
use tokio_threadpool::blocking;


/// tokio-fs.remove_dir_all

/// Removes a directory at this path, after removing all its contents. Use carefully!
///
/// This is an async version of [`std::fs::remove_dir_all`][std]
///
/// [std]: https://doc.rust-lang.org/std/fs/fn.remove_dir_all.html
pub fn remove_dir_all<P: AsRef<Path>>(path: P) -> RemoveDirAllFuture<P> {
RemoveDirAllFuture::new(path)
}

/// Future returned by `remove_dir_all`.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct RemoveDirAllFuture<P>
where
P: AsRef<Path>,
{
path: P,
}

impl<P> RemoveDirAllFuture<P>
where
P: AsRef<Path>,
{
fn new(path: P) -> RemoveDirAllFuture<P> {
RemoveDirAllFuture { path }
}
}

impl<P> Future for RemoveDirAllFuture<P>
where
P: AsRef<Path>,
{
type Output = io::Result<()>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
blocking_io(|| fs::remove_dir_all(&self.path))
}
}

/// tokio-fs.lib
fn blocking_io<F, T>(f: F) -> Poll<io::Result<T>>
where
F: FnOnce() -> io::Result<T>,
{
match tokio_threadpool::blocking(f) {
FeaturePoll::Ok(Async::Ready(v)) => Poll::Ready(v),
FeaturePoll::Ok(Async::NotReady) => Poll::Pending,
FeaturePoll::Err(_) => Poll::Ready(io::Result::Err(blocking_err())),
}
}

fn blocking_err() -> io::Error {
io::Error::new(
io::ErrorKind::Other,
"`blocking` annotated I/O must be called \
from the context of the Tokio runtime.",
)
}