From e23effe008556b6965882d99e1bda8d9b0b90d73 Mon Sep 17 00:00:00 2001 From: Xynnn007 Date: Thu, 15 Aug 2024 11:32:02 +0800 Subject: [PATCH] ocicrypt-rs/image-rs: make ttrpc keyprovider client sync The original ttrpc keyprovider client is marked `async`, but we use it as sync version by fork a new thread and create a new tokio runtime. This commit changes the client into sync version, this will match the original usage. Why we need this commit is the original fork-thread-and-tokio-runtime way would create a dead lock for CDH's image-pull API when an encrypted image is pulled. When we bring in a tokio::spawn_blocking in image-rs, the tokio runtime would have possibility to switch context thus a dead lock can be avoided. Signed-off-by: Xynnn007 --- image-rs/src/pull.rs | 12 +++-- ocicrypt-rs/build.rs | 2 +- ocicrypt-rs/src/keywrap/keyprovider/mod.rs | 48 ++++------------- .../src/utils/ttrpc/keyprovider_ttrpc.rs | 53 +++++++++---------- 4 files changed, 44 insertions(+), 71 deletions(-) diff --git a/image-rs/src/pull.rs b/image-rs/src/pull.rs index 61afe516c..bed95304e 100644 --- a/image-rs/src/pull.rs +++ b/image-rs/src/pull.rs @@ -142,10 +142,16 @@ impl<'a> PullClient<'a> { }; let decryptor = Decryptor::from_media_type(&layer.media_type); + let decryptor_i = decryptor.clone(); + let layer_i = layer.clone(); + let decrypt_config_i = decrypt_config.as_ref().map(|inner| inner.to_string()); if decryptor.is_encrypted() { - let decrypt_key = decryptor - .get_decrypt_key(&layer, decrypt_config) - .map_err(|e| anyhow!("failed to get decrypt key {}", e.to_string()))?; + let decrypt_key = tokio::task::spawn_blocking(move || { + decryptor_i + .get_decrypt_key(&layer_i, &decrypt_config_i.as_deref()) + .map_err(|e| anyhow!("failed to get decrypt key {}", e.to_string())) + }) + .await??; let plaintext_layer = decryptor .async_get_plaintext_layer(layer_reader, &layer, &decrypt_key) .map_err(|e| anyhow!("failed to async_get_plaintext_layer: {:?}", e))?; diff --git a/ocicrypt-rs/build.rs b/ocicrypt-rs/build.rs index 836510bf7..477a4dbe4 100644 --- a/ocicrypt-rs/build.rs +++ b/ocicrypt-rs/build.rs @@ -37,7 +37,7 @@ fn main() -> Result<(), Box> { .include("src/utils") .rust_protobuf() .customize(ttrpc_codegen::Customize { - async_all: true, + async_all: false, ..Default::default() }) .rust_protobuf_customize(ttrpc_codegen::ProtobufCustomize::default().gen_mod_rs(false)) diff --git a/ocicrypt-rs/src/keywrap/keyprovider/mod.rs b/ocicrypt-rs/src/keywrap/keyprovider/mod.rs index 7bebda453..a680f631d 100644 --- a/ocicrypt-rs/src/keywrap/keyprovider/mod.rs +++ b/ocicrypt-rs/src/keywrap/keyprovider/mod.rs @@ -324,29 +324,12 @@ impl KeyProviderKeyWrapper { } #[cfg(feature = "keywrap-keyprovider-ttrpc")] { + use anyhow::Context; + let ttrpc = ttrpc.to_string(); - let handler = std::thread::spawn(move || { - create_async_runtime()?.block_on(async { - KeyProviderKeyWrapProtocolOutput::from_ttrpc(_input, &ttrpc, OpKey::Wrap) - .map_err(|e| format!("{e}")) - }) - }); - let protocol_output = match handler.join() { - Ok(Ok(v)) => v, - Ok(Err(e)) => { - return Err(anyhow!( - "keyprovider: ttrpc provider failed to execute {} operation: {}", - OpKey::Wrap, - e - )); - } - Err(e) => { - return Err(anyhow!( - "keyprovider: ttrpc provider failed to execute {} operation: {e:?}", - OpKey::Wrap, - )); - } - }; + let protocol_output = + KeyProviderKeyWrapProtocolOutput::from_ttrpc(_input, &ttrpc, OpKey::Wrap) + .context("keyprovider: ttrpc provider failed to execute Wrap")?; if let Some(result) = protocol_output.key_wrap_results { Ok(result.annotation) } else { @@ -422,23 +405,11 @@ impl KeyProviderKeyWrapper { )); #[cfg(feature = "keywrap-keyprovider-ttrpc")] { + use anyhow::Context; let ttrpc = ttrpc.to_string(); - let handler = std::thread::spawn(move || { - create_async_runtime()?.block_on(async { - KeyProviderKeyWrapProtocolOutput::from_ttrpc(_input, &ttrpc, OpKey::Unwrap) - .map_err(|e| { - format!( - "keyprovider: ttrpc provider failed to execute {} operation: {e}", - OpKey::Wrap, - ) - }) - }) - }); - match handler.join() { - Ok(Ok(v)) => Ok(v), - Ok(Err(e)) => bail!("failed to unwrap key by ttrpc, {e}"), - Err(e) => bail!("failed to unwrap key by ttrpc, {e:?}"), - } + + KeyProviderKeyWrapProtocolOutput::from_ttrpc(_input, &ttrpc, OpKey::Unwrap) + .context("keyprovider: failed to unwrap key by ttrpc") } } @@ -563,7 +534,6 @@ impl KeyWrapper for KeyProviderKeyWrapper { #[cfg(any( feature = "keywrap-keyprovider-grpc", - feature = "keywrap-keyprovider-ttrpc", feature = "keywrap-keyprovider-native" ))] fn create_async_runtime() -> std::result::Result { diff --git a/ocicrypt-rs/src/utils/ttrpc/keyprovider_ttrpc.rs b/ocicrypt-rs/src/utils/ttrpc/keyprovider_ttrpc.rs index 77a6fdc6c..e485751fd 100644 --- a/ocicrypt-rs/src/utils/ttrpc/keyprovider_ttrpc.rs +++ b/ocicrypt-rs/src/utils/ttrpc/keyprovider_ttrpc.rs @@ -18,28 +18,29 @@ use protobuf::{CodedInputStream, CodedOutputStream, Message}; use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; #[derive(Clone)] pub struct KeyProviderServiceClient { - client: ::ttrpc::r#async::Client, + client: ::ttrpc::Client, } impl KeyProviderServiceClient { - pub fn new(client: ::ttrpc::r#async::Client) -> Self { + pub fn new(client: ::ttrpc::Client) -> Self { KeyProviderServiceClient { client, } } - pub async fn wrap_key(&self, ctx: ttrpc::context::Context, req: &super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result { + pub fn wrap_key(&self, ctx: ttrpc::context::Context, req: &super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result { let mut cres = super::keyprovider::KeyProviderKeyWrapProtocolOutput::new(); - ::ttrpc::async_client_request!(self, ctx, req, "keyprovider.KeyProviderService", "WrapKey", cres); + ::ttrpc::client_request!(self, ctx, req, "keyprovider.KeyProviderService", "WrapKey", cres); + Ok(cres) } - pub async fn un_wrap_key(&self, ctx: ttrpc::context::Context, req: &super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result { + pub fn un_wrap_key(&self, ctx: ttrpc::context::Context, req: &super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result { let mut cres = super::keyprovider::KeyProviderKeyWrapProtocolOutput::new(); - ::ttrpc::async_client_request!(self, ctx, req, "keyprovider.KeyProviderService", "UnWrapKey", cres); + ::ttrpc::client_request!(self, ctx, req, "keyprovider.KeyProviderService", "UnWrapKey", cres); + Ok(cres) } } @@ -47,10 +48,10 @@ struct WrapKeyMethod { service: Arc>, } -#[async_trait] -impl ::ttrpc::r#async::MethodHandler for WrapKeyMethod { - async fn handler(&self, ctx: ::ttrpc::r#async::TtrpcContext, req: ::ttrpc::Request) -> ::ttrpc::Result<::ttrpc::Response> { - ::ttrpc::async_request_handler!(self, ctx, req, keyprovider, KeyProviderKeyWrapProtocolInput, wrap_key); +impl ::ttrpc::MethodHandler for WrapKeyMethod { + fn handler(&self, ctx: ::ttrpc::TtrpcContext, req: ::ttrpc::Request) -> ::ttrpc::Result<()> { + ::ttrpc::request_handler!(self, ctx, req, keyprovider, KeyProviderKeyWrapProtocolInput, wrap_key); + Ok(()) } } @@ -58,34 +59,30 @@ struct UnWrapKeyMethod { service: Arc>, } -#[async_trait] -impl ::ttrpc::r#async::MethodHandler for UnWrapKeyMethod { - async fn handler(&self, ctx: ::ttrpc::r#async::TtrpcContext, req: ::ttrpc::Request) -> ::ttrpc::Result<::ttrpc::Response> { - ::ttrpc::async_request_handler!(self, ctx, req, keyprovider, KeyProviderKeyWrapProtocolInput, un_wrap_key); +impl ::ttrpc::MethodHandler for UnWrapKeyMethod { + fn handler(&self, ctx: ::ttrpc::TtrpcContext, req: ::ttrpc::Request) -> ::ttrpc::Result<()> { + ::ttrpc::request_handler!(self, ctx, req, keyprovider, KeyProviderKeyWrapProtocolInput, un_wrap_key); + Ok(()) } } -#[async_trait] -pub trait KeyProviderService: Sync { - async fn wrap_key(&self, _ctx: &::ttrpc::r#async::TtrpcContext, _: super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result { +pub trait KeyProviderService { + fn wrap_key(&self, _ctx: &::ttrpc::TtrpcContext, _: super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result { Err(::ttrpc::Error::RpcStatus(::ttrpc::get_status(::ttrpc::Code::NOT_FOUND, "/keyprovider.KeyProviderService/WrapKey is not supported".to_string()))) } - async fn un_wrap_key(&self, _ctx: &::ttrpc::r#async::TtrpcContext, _: super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result { + fn un_wrap_key(&self, _ctx: &::ttrpc::TtrpcContext, _: super::keyprovider::KeyProviderKeyWrapProtocolInput) -> ::ttrpc::Result { Err(::ttrpc::Error::RpcStatus(::ttrpc::get_status(::ttrpc::Code::NOT_FOUND, "/keyprovider.KeyProviderService/UnWrapKey is not supported".to_string()))) } } -pub fn create_key_provider_service(service: Arc>) -> HashMap { - let mut ret = HashMap::new(); +pub fn create_key_provider_service(service: Arc>) -> HashMap> { let mut methods = HashMap::new(); - let streams = HashMap::new(); - methods.insert("WrapKey".to_string(), - Box::new(WrapKeyMethod{service: service.clone()}) as Box); + methods.insert("/keyprovider.KeyProviderService/WrapKey".to_string(), + Box::new(WrapKeyMethod{service: service.clone()}) as Box); - methods.insert("UnWrapKey".to_string(), - Box::new(UnWrapKeyMethod{service: service.clone()}) as Box); + methods.insert("/keyprovider.KeyProviderService/UnWrapKey".to_string(), + Box::new(UnWrapKeyMethod{service: service.clone()}) as Box); - ret.insert("keyprovider.KeyProviderService".to_string(), ::ttrpc::r#async::Service{ methods, streams }); - ret + methods }