Skip to content

Commit

Permalink
Merge branch 'master' into limbooverlambda/fix-batch-put-ttl-issue
Browse files Browse the repository at this point in the history
  • Loading branch information
pingyu authored Jun 27, 2024
2 parents 390e7c7 + ec8dbcc commit 9c907c9
Showing 1 changed file with 30 additions and 17 deletions.
47 changes: 30 additions & 17 deletions src/common/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use std::time::Duration;

use log::info;
use regex::Regex;
use tonic::transport::Certificate;
use tonic::transport::Channel;
use tonic::transport::ClientTlsConfig;
use tonic::transport::Identity;
use tonic::transport::{Certificate, Endpoint};

use crate::internal_err;
use crate::Result;
Expand Down Expand Up @@ -77,27 +77,40 @@ impl SecurityManager {
where
Factory: FnOnce(Channel) -> Client,
{
let addr = "http://".to_string() + &SCHEME_REG.replace(addr, "");

info!("connect to rpc server at endpoint: {:?}", addr);
let channel = if !self.ca.is_empty() {
self.tls_channel(addr).await?
} else {
self.default_channel(addr).await?
};
let ch = channel.connect().await?;

let mut builder = Channel::from_shared(addr)?
.tcp_keepalive(Some(Duration::from_secs(10)))
.keep_alive_timeout(Duration::from_secs(3));
Ok(factory(ch))
}

if !self.ca.is_empty() {
let tls = ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(&self.ca))
.identity(Identity::from_pem(
&self.cert,
load_pem_file("private key", &self.key)?,
));
builder = builder.tls_config(tls)?;
};
async fn tls_channel(&self, addr: &str) -> Result<Endpoint> {
let addr = "https://".to_string() + &SCHEME_REG.replace(addr, "");
let builder = self.endpoint(addr.to_string())?;
let tls = ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(&self.ca))
.identity(Identity::from_pem(
&self.cert,
load_pem_file("private key", &self.key)?,
));
let builder = builder.tls_config(tls)?;
Ok(builder)
}

let ch = builder.connect().await?;
async fn default_channel(&self, addr: &str) -> Result<Endpoint> {
let addr = "http://".to_string() + &SCHEME_REG.replace(addr, "");
self.endpoint(addr)
}

Ok(factory(ch))
fn endpoint(&self, addr: String) -> Result<Endpoint> {
let endpoint = Channel::from_shared(addr)?
.tcp_keepalive(Some(Duration::from_secs(10)))
.keep_alive_timeout(Duration::from_secs(3));
Ok(endpoint)
}
}

Expand Down

0 comments on commit 9c907c9

Please sign in to comment.