Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tough: migrate to async #687

Merged
merged 1 commit into from
Nov 3, 2023
Merged

tough: migrate to async #687

merged 1 commit into from
Nov 3, 2023

Conversation

phu-cinemo
Copy link
Contributor

Depend on tokio for all IO access and convert all related interfaces to async.
Replace std::io::Read with futures_core::stream::Stream for hopefully seamless migration once std::async_iter stabilized. Use async-compatible locks from tokio and coincidentally fix a locking issue in datastore.

This also affects the crates tough-kms, tough-ssm and tuftool.

Issue #, if available:
#213
#602
Description of changes:

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Copy link
Contributor

@webern webern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an amazing PR. I've gotten through 33 / 54 file 😅!

{
root: R,
pub struct RepositoryLoader<'a> {
root: &'a [u8],
Copy link
Contributor

@webern webern Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this is the right interface.

What I'm thinking about here is whether there should be a Root object (maybe one of the existing ones, need to looks at those types) that deserializes itself from a Transport.

pub struct RepositoryLoader {
    root: DeserializedRoot,
...
impl DeserializedRoot {
    pub async fn from_https(url: Url) -> Result<Self> {}
    pub async fn from_file(path: impl AsRef<Path>) -> Result<Self> {}
    pub async fn from_bytes(path: &[u8]) -> Result<Self> {}
}

Basically a convenience to the user. Just a thought...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the root is supposed to be "shipped with your software", I figured having it in a file or in memory would be the most common case. Having Read/Stream-like access to a file doesn't make much sense as there are no async interfaces for that in serde_json. So it has to be completely in memory anyway.

You can read a file into memory with a single call already, so that would stay the same with your approach in terms of convenience.
If users keep the root in memory already (because it's linked in the binary or because they get it by means we don't support directly), that would require an extra step now.

But maybe from_https is a common thing to do?
Also, your proposal adds the slight benefit of an early deserialization error.

tough/src/lib.rs Show resolved Hide resolved
Comment on lines 45 to 46
/// TODO: [provide a thread safe interface](https://github.com/awslabs/tough/issues/602)
///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks to me like it's thread safe now! @cbgbt do you agree?

Suggested change
/// TODO: [provide a thread safe interface](https://github.com/awslabs/tough/issues/602)
///

pub(crate) fn reader(&self, file: &str) -> Result<Option<impl Read>> {
let path = self.read().path().join(file);
match File::open(&path) {
pub(crate) async fn bytes(&self, file: &str) -> Result<Option<Vec<u8>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stating the obvious: returning the bytes here instead of a reader or stream solves our thread safety problems.

tough/src/http.rs Show resolved Hide resolved
tough/src/http.rs Outdated Show resolved Hide resolved
@phu-cinemo
Copy link
Contributor Author

updated according to suggestions

Copy link
Contributor

@webern webern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again: amazing work.

I have gotten through all of the files. Now I need to test it in Bottlerocket and pubsys.

Ok(entry) => {
if entry.file_type().is_file() {
let future = async move { process_target(entry.path()).await };
Some(tokio::task::spawn(future).await.unwrap())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you have an unwrap here? We generally try to avoid panicking and favor errors.
Maybe a failure to spawn is catastrophic, but maybe there's a way to produce a SpawnErrSnafu.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd only panic if the task happened to panic, that's why I opted for unwrap here.
but I'm sure I can turn that into an error, no problem.

.collect()
}
})
.try_collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to myself: double check parallelization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good instincts! it wasn't really parallel at all.. see my other comment

Comment on lines 408 to 409
enum HttpResult {
/// We got a response with an HTTP code that indicates success.
Ok(reqwest::blocking::Response),
Ok(reqwest::Response),
Err(ErrorClass),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be:

type HttpResult = std::result::Result<reqwest::Response, ErrorClass>;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The orphan rule would keep us from implementing external traits like From for it

Comment on lines 405 to 407
/// Much of the complexity in the `fetch_with_retries` function is in deciphering the `Result`
/// we get from `reqwest::Client::execute`. Using this enum we categorize the states of the
/// `Result` into the categories that we need to understand.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This documentation is more applicable to enum ErrorClass now.

self.digest.is_some(),
"DigestAdapter::read called after end of file"
);
impl Stream for DigestAdapter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I trust that your Stream implementations are correct as I have never implemented Stream myself.

tough/src/io.rs Outdated
use ring::digest::{Context, SHA256};
use std::io::{self, Read};
use std::{convert::TryInto, path::Path, task::Poll};
use tokio::fs::metadata;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use tokio::fs::metadata;
use tokio::fs;

tough/src/io.rs Outdated
}
/// Async analogue of `std::path::Path::is_file`
pub async fn is_file(path: impl AsRef<Path>) -> bool {
metadata(path).await.map(|m| m.is_file()).unwrap_or(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: slightly clearer that this is a tokio function.

Suggested change
metadata(path).await.map(|m| m.is_file()).unwrap_or(false)
fs::metadata(path).await.map(|m| m.is_file()).unwrap_or(false)

tough/src/io.rs Outdated
}
/// Async analogue of `std::path::Path::is_dir`
pub async fn is_dir(path: impl AsRef<Path>) -> bool {
metadata(path).await.map(|m| m.is_dir()).unwrap_or(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
metadata(path).await.map(|m| m.is_dir()).unwrap_or(false)
fs::metadata(path).await.map(|m| m.is_dir()).unwrap_or(false)

None
let indir = indir.as_ref().to_owned();

let (tx, rx) = tokio::sync::mpsc::channel(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 10 the max parallelization? Should it be num_cpu?

    let (tx, rx) = tokio::sync::mpsc::channel(10);

Copy link
Contributor Author

@phu-cinemo phu-cinemo Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just the buffer depth of the producer. WalkDir will run on a special thread reserved for blocking tasks. I chose some value >1 to avoid starvation of the consumers while this thread isn't scheduled.
tokio with rt-multi-thread enabled will automatically scale the number of threads according to the number of cores. As the consuming futures are spawned as independent tasks, they will evenly distribute across those threads.

Edit: Writing this I realized that I failed to spawn all tasks before awaiting them. This is fixed now.

tough/src/lib.rs Outdated
pub use crate::transport::{
DefaultTransport, FilesystemTransport, Transport, TransportError, TransportErrorKind,
};
pub use crate::urlpath::SafeUrlPath;
use async_recursion::async_recursion;
pub use async_trait::async_trait;
use bytes::Bytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the interface:

Suggested change
use bytes::Bytes;
pub use bytes::Bytes;

tough/src/lib.rs Outdated
use crate::schema::{
DelegatedRole, Delegations, Role, RoleType, Root, Signed, Snapshot, Timestamp,
};
pub use crate::target_name::TargetName;
use crate::transport::IntoVec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we implement some convenience so that download_target("sometarget").to_bytes() or to_vec() is possible?

Copy link
Contributor Author

@phu-cinemo phu-cinemo Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generalized the implementation of IntoVec to deal with any error type and made the trait public. It now can be used on streams returned by the public interface, e.g. read_target.
I hope that's what you had in mind!

@webern
Copy link
Contributor

webern commented Oct 6, 2023

I have started the testing and integration process here bottlerocket-os/twoliter#98

@phu-cinemo
Copy link
Contributor Author

I incorporated another round of suggestions

@matze
Copy link

matze commented Oct 26, 2023

@webern is there anything blocking from further review?

@webern
Copy link
Contributor

webern commented Oct 26, 2023

@webern is there anything blocking from further review?

I have been out on vacation and will resume the work next week. The blocker is that we want to make sure Bottlerocket can be built, run and update using the new code. I have done about 75% of the work thus far.

@webern
Copy link
Contributor

webern commented Oct 31, 2023

Update. I've run into a problem integrating this downstream in Bottlerocket's update system.

The LZ4 decompression library we use in updog takes a blocking Read object. I am holding a Stream of the target, which in this case is the very large bottlerocket-image.lz4 file.

I am not seeing an async-compatible (ideally stream compatible) LZ4 library at the moment.

The best I can think of is to write the LZ4 file to disk then open that file with std::fs to get a blocking Read that I can feed to the lz4 decompression lib. But this is going to be slower than what we have now (feeding the tough http transport Read straight to decompression). The file is also too large to store it in memory. Another options might be somehow spawning a runtime and blocking on each next of the Stream to convert to a Read?

@bcressey and @ecpullen 👀

https://github.com/bottlerocket-os/bottlerocket/blob/d74bdb9a3758b2d177921c9a26f8fbd4afedc100/sources/updater/updog/src/main.rs#L229

https://github.com/10XGenomics/lz4-rs/blob/418855f339fe229083f6f8cce77a61d8632ab4c6/src/decoder.rs#L23

@webern
Copy link
Contributor

webern commented Oct 31, 2023

Update. I've run into a problem integrating this downstream in Bottlerocket's update system.

Hmmm, I wonder if I can create a Read object that somehow holds a handle to the tokio runtime and uses block_on.

@webern
Copy link
Contributor

webern commented Oct 31, 2023

Update. I've run into a problem integrating this downstream in Bottlerocket's update system.

Hmmm, I wonder if I can create a Read object that somehow holds a handle to the tokio runtime and uses block_on.

Untested but this compiles

struct StreamRead<S>
where
    S: Stream<Item = std::result::Result<Bytes, tough::error::Error>> + Send + Unpin,
{
    runtime: tokio::runtime::Handle,
    stream: S,
}

impl<S> StreamRead<S>
where
    S: Stream<Item = std::result::Result<Bytes, tough::error::Error>> + Send + Unpin,
{
    fn new(stream: S) -> Self {
        let runtime = tokio::runtime::Handle::current();
        Self { runtime, stream }
    }
}

impl<S> Read for StreamRead<S>
where
    S: Stream<Item = std::result::Result<Bytes, tough::error::Error>> + Send + Unpin,
{
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        let maybe_next = self.runtime.block_on(self.stream.next());
        let next = match maybe_next {
            None => return Ok(0),
            Some(next) => next,
        };
        let bytes = next.map_err(|e| std::io::Error::new(ErrorKind::Other, e))?;
        let buffer_length = buf.len();
        let bytes_len = bytes.len();
        if bytes_len > buffer_length {
            // TODO - handle this painful case
            return Err(std::io::Error::new(ErrorKind::Other, "Bug"));
        }
        let bytes = bytes.iter().as_slice();
        // TODO - this seems stupid
        for i in 0..bytes_len {
            buf[i] = bytes[i];
        }
        Ok(bytes_len)
    }
}

@phu-cinemo
Copy link
Contributor Author

@webern I think you can find the necessary adapters for this in tokio_util.
I didn't try it myself, but you should be able to get from Stream to AsyncRead using tokio_util::io::StreamReader. Then, you'd use tokio_util::io::SyncIoBridge to get a blocking Read.

Copy link
Contributor

@webern webern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested this in Bottlerocket and Bottlerocket's release tooling. Nice work!

@phu-cinemo Can you rebase to resolve the Cargo.toml conflict?

@webern
Copy link
Contributor

webern commented Nov 2, 2023

Here is the corresponding Bottlerocket PR bottlerocket-os/bottlerocket#3566

Depend on tokio for all IO access and convert all related interfaces to
async.
Replace `std::io::Read` with `futures_core::stream::Stream` for
hopefully seamless migration once `std::async_iter` stabilized.
Use async-compatible locks from tokio and coincidentally fix a locking issue in
datastore.

This also affects the crates tough-kms, tough-ssm and tuftool.
@phu-cinemo
Copy link
Contributor Author

@webern Thanks! I squashed the fixups and rebased.

Copy link
Contributor

@zmrow zmrow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

tough/Cargo.toml Show resolved Hide resolved
tuftool/Cargo.toml Show resolved Hide resolved
tuftool/Cargo.toml Show resolved Hide resolved
where
T: serde::Serialize,
{
// Use `tempfile::NamedTempFile::persist` to perform an atomic file write.
let parent = path.parent().context(error::PathParentSnafu { path })?;
let mut writer =
let file =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need the same tokio::task::spawn_blocking handling since NamedTempFile can block?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do this after merging. #697

#[derive(Debug)]
pub struct RetryRead {
pub(crate) struct RetryStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be pub(crate)? Not sure why RetryRead was pub before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check this after merging #698

@webern
Copy link
Contributor

webern commented Nov 3, 2023

@phu-cinemo thank you for this! Merging!

@webern webern merged commit f5fbea3 into awslabs:develop Nov 3, 2023
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants