Skip to content

Commit

Permalink
fix API
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Apr 1, 2024
1 parent 043ade0 commit 767b68d
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ rumqttc = { version = "0.24.0", features = ["url"] }
rust_decimal = "1"
rustls-native-certs = "0.7"
rustls-pemfile = "2"
rustls-pki-types = "1"
rw_futures_util = { workspace = true }
serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
Expand All @@ -143,7 +144,6 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
] }
tokio-postgres = { version = "0.7", features = ["with-uuid-1"] }
tokio-retry = "0.3"
tokio-rustls = "0.24"
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["codec", "io"] }
tonic = { workspace = true }
Expand Down
10 changes: 4 additions & 6 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,21 +687,21 @@ impl NatsCommon {

pub(crate) fn load_certs(
certificates: &str,
) -> ConnectorResult<Vec<tokio_rustls::rustls::Certificate>> {
) -> ConnectorResult<Vec<rustls_pki_types::CertificateDer<'static>>> {
let cert_bytes = if let Some(path) = certificates.strip_prefix("fs://") {
std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())?
} else {
certificates.as_bytes().to_owned()
};

rustls_pemfile::certs(&mut cert_bytes.as_slice())
.map(|cert| Ok(tokio_rustls::rustls::Certificate(cert?.to_vec())))
.map(|cert| Ok(cert?))
.collect()
}

pub(crate) fn load_private_key(
certificate: &str,
) -> ConnectorResult<tokio_rustls::rustls::PrivateKey> {
) -> ConnectorResult<rustls_pki_types::PrivateKeyDer<'static>> {
let cert_bytes = if let Some(path) = certificate.strip_prefix("fs://") {
std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())?
} else {
Expand All @@ -711,7 +711,5 @@ pub(crate) fn load_private_key(
let cert = rustls_pemfile::pkcs8_private_keys(&mut cert_bytes.as_slice())
.next()
.ok_or_else(|| anyhow!("No private key found"))?;
Ok(tokio_rustls::rustls::PrivateKey(
cert?.secret_pkcs8_der().to_vec(),
))
Ok(cert?.into())
}
15 changes: 6 additions & 9 deletions src/connector/src/connector_common/mqtt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use rumqttc::tokio_rustls::rustls;
use rumqttc::v5::mqttbytes::QoS;
use rumqttc::v5::{AsyncClient, EventLoop, MqttOptions};
use serde_derive::Deserialize;
Expand Down Expand Up @@ -141,26 +142,22 @@ impl MqttCommon {
.unwrap_or(QoS::AtMostOnce)
}

fn get_tls_config(&self) -> ConnectorResult<tokio_rustls::rustls::ClientConfig> {
let mut root_cert_store = tokio_rustls::rustls::RootCertStore::empty();
fn get_tls_config(&self) -> ConnectorResult<rustls::ClientConfig> {
let mut root_cert_store = rustls::RootCertStore::empty();
if let Some(ca) = &self.ca {
let certificates = load_certs(ca)?;
for cert in certificates {
root_cert_store.add(&cert).unwrap();
root_cert_store.add(cert).unwrap();
}
} else {
for cert in
rustls_native_certs::load_native_certs().expect("could not load platform certs")
{
root_cert_store
.add(&tokio_rustls::rustls::Certificate(cert.to_vec()))
.unwrap();
root_cert_store.add(cert).unwrap();
}
}

let builder = tokio_rustls::rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_cert_store);
let builder = rustls::ClientConfig::builder().with_root_certificates(root_cert_store);

let tls_config = if let (Some(client_cert), Some(client_key)) =
(self.client_cert.as_ref(), self.client_key.as_ref())
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def_anyhow_newtype! {
redis::RedisError => "Redis error",
arrow_schema::ArrowError => "Arrow error",
google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error",
tokio_rustls::rustls::Error => "TLS error",
rumqttc::tokio_rustls::rustls::Error => "TLS error",
rumqttc::v5::ClientError => "MQTT error",
rumqttc::v5::OptionError => "MQTT error",

Expand Down

0 comments on commit 767b68d

Please sign in to comment.