Skip to content

Commit

Permalink
add config to solve by custom dns server
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz committed Aug 9, 2023
1 parent abf22ba commit 5c8e062
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 46 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ serde_derive = "1.0"
thiserror = "1"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
tonic = { version = "0.9", features = ["tls"] }
trust-dns-resolver = "0.19.4"
url = "2.4"

[dev-dependencies]
clap = "2"
Expand Down
19 changes: 16 additions & 3 deletions src/common/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;

use log::info;
use regex::Regex;
Expand All @@ -15,6 +14,7 @@ use tonic::transport::Identity;

use crate::internal_err;
use crate::Result;
use crate::{util, Config};

lazy_static::lazy_static! {
static ref SCHEME_REG: Regex = Regex::new(r"^\s*(https?://)").unwrap();
Expand Down Expand Up @@ -73,17 +73,30 @@ impl SecurityManager {
// env: Arc<Environment>,
addr: &str,
factory: Factory,
config: &Config,
) -> Result<Client>
where
Factory: FnOnce(Channel) -> Client,
{
let addr = "http://".to_string() + &SCHEME_REG.replace(addr, "");

let addr = match config.dns_server_addr {
Some(ref dns_server_addr) => {
util::dns::custom_dns(
addr,
dns_server_addr.clone(),
config.dns_search_domain.clone(),
)
.await?
}
None => addr,
};

info!("connect to rpc server at endpoint: {:?}", addr);

let mut builder = Channel::from_shared(addr)?
.tcp_keepalive(Some(Duration::from_secs(10)))
.keep_alive_timeout(Duration::from_secs(3));
.tcp_keepalive(config.tcp_keepalive)
.keep_alive_timeout(config.keep_alive_timeout);

if !self.ca.is_empty() {
let tls = ClientTlsConfig::new()
Expand Down
10 changes: 10 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@ pub struct Config {
pub cert_path: Option<PathBuf>,
pub key_path: Option<PathBuf>,
pub timeout: Duration,
pub tcp_keepalive: Option<Duration>,
pub keep_alive_timeout: Duration,
pub dns_server_addr: Option<String>,
pub dns_search_domain: Vec<String>,
}

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
const DEFAULT_TCP_KEEPALIVE: Duration = Duration::from_secs(10);
const DEFAULT_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(3);

impl Default for Config {
fn default() -> Self {
Expand All @@ -30,6 +36,10 @@ impl Default for Config {
cert_path: None,
key_path: None,
timeout: DEFAULT_REQUEST_TIMEOUT,
tcp_keepalive: Some(DEFAULT_TCP_KEEPALIVE),
keep_alive_timeout: DEFAULT_KEEP_ALIVE_TIMEOUT,
dns_server_addr: None,
dns_search_domain: vec![],
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ use crate::Timestamp;
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
let config = Config::default();
PdRpcClient::new(
config.clone(),
&config,
|_| MockKvConnect,
|sm| {
futures::future::ok(RetryClient::new_with_cluster(
sm,
config.timeout,
config.clone(),
MockCluster,
))
},
Expand Down
10 changes: 5 additions & 5 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
impl PdRpcClient<TikvConnect, Cluster> {
pub async fn connect(
pd_endpoints: &[String],
config: Config,
config: &Config,
enable_codec: bool,
) -> Result<PdRpcClient> {
PdRpcClient::new(
config.clone(),
|security_mgr| TikvConnect::new(security_mgr, config.timeout),
|security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout),
config,
|security_mgr| TikvConnect::new(security_mgr, config.clone()),
|security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config),
enable_codec,
)
.await
Expand All @@ -275,7 +275,7 @@ impl PdRpcClient<TikvConnect, Cluster> {

impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
pub async fn new<PdFut, MakeKvC, MakePd>(
config: Config,
config: &Config,
kv_connect: MakeKvC,
pd: MakePd,
enable_codec: bool,
Expand Down
33 changes: 15 additions & 18 deletions src/pd/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tonic::Request;
use super::timestamp::TimestampOracle;
use crate::internal_err;
use crate::proto::pdpb;
use crate::Config;
use crate::Result;
use crate::SecurityManager;
use crate::Timestamp;
Expand Down Expand Up @@ -103,13 +104,9 @@ impl Connection {
Connection { security_mgr }
}

pub async fn connect_cluster(
&self,
endpoints: &[String],
timeout: Duration,
) -> Result<Cluster> {
let members = self.validate_endpoints(endpoints, timeout).await?;
let (client, members) = self.try_connect_leader(&members, timeout).await?;
pub async fn connect_cluster(&self, endpoints: &[String], config: &Config) -> Result<Cluster> {
let members = self.validate_endpoints(endpoints, config).await?;
let (client, members) = self.try_connect_leader(&members, config).await?;
let id = members.header.as_ref().unwrap().cluster_id;
let tso = TimestampOracle::new(id, &client)?;
let cluster = Cluster {
Expand All @@ -122,10 +119,10 @@ impl Connection {
}

// Re-establish connection with PD leader in asynchronous fashion.
pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> {
pub async fn reconnect(&self, cluster: &mut Cluster, config: &Config) -> Result<()> {
warn!("updating pd client");
let start = Instant::now();
let (client, members) = self.try_connect_leader(&cluster.members, timeout).await?;
let (client, members) = self.try_connect_leader(&cluster.members, config).await?;
let tso = TimestampOracle::new(cluster.id, &client)?;
*cluster = Cluster {
id: cluster.id,
Expand All @@ -141,7 +138,7 @@ impl Connection {
async fn validate_endpoints(
&self,
endpoints: &[String],
timeout: Duration,
config: &Config,
) -> Result<pdpb::GetMembersResponse> {
let mut endpoints_set = HashSet::with_capacity(endpoints.len());

Expand All @@ -152,7 +149,7 @@ impl Connection {
return Err(internal_err!("duplicated PD endpoint {}", ep));
}

let (_, resp) = match self.connect(ep, timeout).await {
let (_, resp) = match self.connect(ep, config).await {
Ok(resp) => resp,
// Ignore failed PD node.
Err(e) => {
Expand Down Expand Up @@ -193,11 +190,11 @@ impl Connection {
async fn connect(
&self,
addr: &str,
_timeout: Duration,
config: &Config,
) -> Result<(pdpb::pd_client::PdClient<Channel>, pdpb::GetMembersResponse)> {
let mut client = self
.security_mgr
.connect(addr, pdpb::pd_client::PdClient::<Channel>::new)
.connect(addr, pdpb::pd_client::PdClient::<Channel>::new, config)
.await?;
let resp: pdpb::GetMembersResponse = client
.get_members(pdpb::GetMembersRequest::default())
Expand All @@ -210,9 +207,9 @@ impl Connection {
&self,
addr: &str,
cluster_id: u64,
timeout: Duration,
config: &Config,
) -> Result<(pdpb::pd_client::PdClient<Channel>, pdpb::GetMembersResponse)> {
let (client, r) = self.connect(addr, timeout).await?;
let (client, r) = self.connect(addr, config).await?;
Connection::validate_cluster_id(addr, &r, cluster_id)?;
Ok((client, r))
}
Expand All @@ -238,7 +235,7 @@ impl Connection {
async fn try_connect_leader(
&self,
previous: &pdpb::GetMembersResponse,
timeout: Duration,
config: &Config,
) -> Result<(pdpb::pd_client::PdClient<Channel>, pdpb::GetMembersResponse)> {
let previous_leader = previous.leader.as_ref().unwrap();
let members = &previous.members;
Expand All @@ -252,7 +249,7 @@ impl Connection {
.chain(Some(previous_leader))
{
for ep in &m.client_urls {
match self.try_connect(ep.as_str(), cluster_id, timeout).await {
match self.try_connect(ep.as_str(), cluster_id, config).await {
Ok((_, r)) => {
resp = Some(r);
break 'outer;
Expand All @@ -269,7 +266,7 @@ impl Connection {
if let Some(resp) = resp {
let leader = resp.leader.as_ref().unwrap();
for ep in &leader.client_urls {
let r = self.try_connect(ep.as_str(), cluster_id, timeout).await;
let r = self.try_connect(ep.as_str(), cluster_id, config).await;
if r.is_ok() {
return r;
}
Expand Down
27 changes: 14 additions & 13 deletions src/pd/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::region::StoreId;
use crate::stats::pd_stats;
use crate::Config;
use crate::Error;
use crate::Result;
use crate::SecurityManager;
Expand Down Expand Up @@ -51,21 +52,21 @@ pub struct RetryClient<Cl = Cluster> {
// Tuple is the cluster and the time of the cluster's last reconnect.
cluster: RwLock<(Cl, Instant)>,
connection: Connection,
timeout: Duration,
config: Config,
}

#[cfg(test)]
impl<Cl> RetryClient<Cl> {
pub fn new_with_cluster(
security_mgr: Arc<SecurityManager>,
timeout: Duration,
config: Config,
cluster: Cl,
) -> RetryClient<Cl> {
let connection = Connection::new(security_mgr);
RetryClient {
cluster: RwLock::new((cluster, Instant::now())),
connection,
timeout,
config,
}
}
}
Expand Down Expand Up @@ -107,17 +108,17 @@ impl RetryClient<Cluster> {
pub async fn connect(
endpoints: &[String],
security_mgr: Arc<SecurityManager>,
timeout: Duration,
config: &Config,
) -> Result<RetryClient> {
let connection = Connection::new(security_mgr);
let cluster = RwLock::new((
connection.connect_cluster(endpoints, timeout).await?,
connection.connect_cluster(endpoints, config).await?,
Instant::now(),
));
Ok(RetryClient {
cluster,
connection,
timeout,
config: config.clone(),
})
}
}
Expand All @@ -131,7 +132,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
let key = key.clone();
async {
cluster
.get_region(key.clone(), self.timeout)
.get_region(key.clone(), self.config.timeout)
.await
.and_then(|resp| {
region_from_response(resp, || Error::RegionForKeyNotFound { key })
Expand All @@ -143,7 +144,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
async fn get_region_by_id(self: Arc<Self>, region_id: RegionId) -> Result<RegionWithLeader> {
retry!(self, "get_region_by_id", |cluster| async {
cluster
.get_region_by_id(region_id, self.timeout)
.get_region_by_id(region_id, self.config.timeout)
.await
.and_then(|resp| {
region_from_response(resp, || Error::RegionNotFoundInResponse { region_id })
Expand All @@ -154,7 +155,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
async fn get_store(self: Arc<Self>, id: StoreId) -> Result<metapb::Store> {
retry!(self, "get_store", |cluster| async {
cluster
.get_store(id, self.timeout)
.get_store(id, self.config.timeout)
.await
.map(|resp| resp.store.unwrap())
})
Expand All @@ -164,7 +165,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<metapb::Store>> {
retry!(self, "get_all_stores", |cluster| async {
cluster
.get_all_stores(self.timeout)
.get_all_stores(self.config.timeout)
.await
.map(|resp| resp.stores.into_iter().map(Into::into).collect())
})
Expand All @@ -177,7 +178,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool> {
retry!(self, "update_gc_safepoint", |cluster| async {
cluster
.update_safepoint(safepoint, self.timeout)
.update_safepoint(safepoint, self.config.timeout)
.await
.map(|resp| resp.new_safe_point == safepoint)
})
Expand All @@ -187,7 +188,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
impl fmt::Debug for RetryClient {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("pd::RetryClient")
.field("timeout", &self.timeout)
.field("timeout", &self.config.timeout)
.finish()
}
}
Expand Down Expand Up @@ -219,7 +220,7 @@ impl Reconnect for RetryClient<Cluster> {
// a concurrent reconnect is just succeed when this thread trying to get write lock
let should_connect = reconnect_begin > *last_connected + Duration::from_secs(interval_sec);
if should_connect {
self.connection.reconnect(cluster, self.timeout).await?;
self.connection.reconnect(cluster, &self.config).await?;
*last_connected = Instant::now();
}
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl Client<PdRpcClient> {
config: Config,
) -> Result<Self> {
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config, false).await?);
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, false).await?);
Ok(Client {
rpc,
cf: None,
Expand Down
7 changes: 4 additions & 3 deletions src/store/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tonic::transport::Channel;

use super::Request;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::Config;
use crate::Result;
use crate::SecurityManager;

Expand All @@ -24,7 +25,7 @@ pub trait KvConnect: Sized + Send + Sync + 'static {
#[derive(new, Clone)]
pub struct TikvConnect {
security_mgr: Arc<SecurityManager>,
timeout: Duration,
config: Config,
}

#[async_trait]
Expand All @@ -33,9 +34,9 @@ impl KvConnect for TikvConnect {

async fn connect(&self, address: &str) -> Result<KvRpcClient> {
self.security_mgr
.connect(address, TikvClient::new)
.connect(address, TikvClient::new, &self.config)
.await
.map(|c| KvRpcClient::new(c, self.timeout))
.map(|c| KvRpcClient::new(c, self.config.timeout))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/transaction/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Client {
) -> Result<Client> {
debug!("creating new transactional client");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true).await?);
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, true).await?);
Ok(Client { pd })
}

Expand Down
Loading

0 comments on commit 5c8e062

Please sign in to comment.