diff --git a/README.md b/README.md index f2463a9..46f0940 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ $HOME/.config/batch_resolve.toml /etc/batch_resolve.toml ``` -Configuration includes DNS servers, maximum simultaneously running resolve tasks and retries on failure count +Configuration includes DNS servers, queries per second amount and retries on failure count ```toml # DNS servers are only accepted as socket addresses # If port is not specified default DNS :53 port will be used @@ -64,8 +64,8 @@ dns = [ "8.8.8.8" ] -# Simultaneous resolve tasks -tasks = 5000 +# How many queries to perform per second +queries_per_second = 2000 # Times to retry on connection timeout retry = 5 diff --git a/README_RUS.md b/README_RUS.md index bdaefd1..3900989 100644 --- a/README_RUS.md +++ b/README_RUS.md @@ -54,7 +54,7 @@ $HOME/.config/batch_resolve.toml /etc/batch_resolve.toml ``` -Конфигурация включает DNS сервера, количество одновременно выполняемых задач резолва и количество повторов по таймауту. +Конфигурация включает DNS сервера, количество запросов в секунду и количество повторов по таймауту. ```toml # Адреса DNS серверов # Если порт не указан -- по умолчанию будет использован полт 53 @@ -62,8 +62,8 @@ dns = [ "8.8.8.8" ] -# Количество одновременных задач -tasks = 5000 +# Количество запросов в секунду +queries_per_second = 2000 # Количество повторов запроса по таймауту retry = 5 diff --git a/batch_resolve.toml b/batch_resolve.toml index 592bd06..962e038 100644 --- a/batch_resolve.toml +++ b/batch_resolve.toml @@ -7,8 +7,8 @@ dns = [ #"77.88.8.1:53" ] -# Simultaneous resolve tasks -tasks = 5000 +# How many queries to perform per second +queries_per_second = 2000 # Times to retry on connection timeout retry = 5 \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 3817797..98ec64a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,7 +15,7 @@ lazy_static! { ]; static ref DEFAULT_TIMEOUT_RETRIES: u32 = 10; - static ref DEFAULT_TASKS: u32 = 5000; + static ref DEFAULT_QPS: u32 = 5000; pub static ref CONFIG: Arc<RwLock<Config>> = Arc::new(RwLock::new(Config::new())); } @@ -23,7 +23,7 @@ lazy_static! { #[derive(Debug)] pub struct Config { dns_list: Vec<SocketAddr>, - tasks: u32, + qps: u32, timeout_retries: u32, } @@ -31,7 +31,7 @@ impl Default for Config { fn default() -> Self { Config { dns_list: DEFAULT_DNS_SERVERS.clone(), - tasks: *DEFAULT_TASKS, + qps: *DEFAULT_QPS, timeout_retries: *DEFAULT_TIMEOUT_RETRIES, } } @@ -46,8 +46,8 @@ impl Config { self.timeout_retries } - pub fn tasks(&self) -> u32 { - self.tasks + pub fn qps(&self) -> u32 { + self.qps } pub fn dns_list(&self) -> &[SocketAddr] { @@ -59,7 +59,7 @@ impl Config { struct Config { dns: Option<Vec<String>>, retry: Option<u32>, - tasks: Option<u32>, + queries_per_second: Option<u32>, } let mut cfg_fmt: Config = toml::from_str(string)?; @@ -85,8 +85,8 @@ impl Config { self.timeout_retries = retry; } - if let Some(tasks) = cfg_fmt.tasks { - self.tasks = tasks; + if let Some(qps) = cfg_fmt.queries_per_second { + self.qps = qps; } Ok(()) diff --git a/src/resolve/batch.rs b/src/resolve/batch.rs index b1a392e..134eda6 100644 --- a/src/resolve/batch.rs +++ b/src/resolve/batch.rs @@ -14,6 +14,8 @@ pub struct Status { pub running: u64, } +pub type StatusTx = mpsc::Sender<ResolveStatus>; + #[derive(Copy, Clone, Debug)] pub enum ResolveStatus { Started, diff --git a/src/resolve/resolver.rs b/src/resolve/resolver.rs index bf6f138..840df9d 100644 --- a/src/resolve/resolver.rs +++ b/src/resolve/resolver.rs @@ -3,7 +3,6 @@ use std::cell::Cell; use std::collections::HashSet; use std::net::SocketAddr; use std::borrow::Borrow; -use std::sync::mpsc; use futures::Future; use futures::future; @@ -21,7 +20,7 @@ use trust_dns::op::message::Message; use trust_dns::error::ClientErrorKind; use resolve::error::*; -use resolve::batch::{ResolveStatus, QueryType}; +use resolve::batch::{StatusTx, ResolveStatus, QueryType}; use config::CONFIG; fn make_client(loop_handle: Handle, name_server: SocketAddr) -> BasicClientHandle { @@ -62,14 +61,14 @@ impl ClientFactory { pub struct TrustDNSResolver { loop_handle: Handle, - done_tx: mpsc::Sender<ResolveStatus>, + status_tx: StatusTx, } impl TrustDNSResolver { - pub fn new(loop_handle: Handle, done_tx: mpsc::Sender<ResolveStatus>) -> Self { + pub fn new(loop_handle: Handle, status_tx: StatusTx) -> Self { TrustDNSResolver { loop_handle: loop_handle.clone(), - done_tx: done_tx + status_tx: status_tx } } } @@ -80,8 +79,8 @@ impl TrustDNSResolver { { let client_factory = ClientFactory::new(self.loop_handle.clone(), dns); - self.done_tx.send(ResolveStatus::Started).unwrap(); - let done_tx = self.done_tx.clone(); + self.status_tx.send(ResolveStatus::Started).unwrap(); + let status_tx = self.status_tx.clone(); let future = match query_type { QueryType::PTR => self.reverse_resolve(client_factory, name), @@ -90,7 +89,7 @@ impl TrustDNSResolver { let name = name.to_owned(); let future = future.map(move |msg| msg.extract_answer(query_type)) - .then(move |rv| rv.report_status(&name, done_tx)) + .then(move |rv| rv.report_status(&name, status_tx)) .then(move |rv| rv.partial_ok()); Box::new(future) @@ -385,27 +384,27 @@ impl ExtractAnswer for Message { } trait ReportStatus { - fn report_status(self, name: &str, done_tx: mpsc::Sender<ResolveStatus>) -> Self; + fn report_status(self, name: &str, status_tx: StatusTx) -> Self; } impl<T> ReportStatus for Result<Vec<T>, ResolverError> { - fn report_status(self, name: &str, done_tx: mpsc::Sender<ResolveStatus>) -> Self { + fn report_status(self, name: &str, status_tx: StatusTx) -> Self { match self.as_ref() { Ok(vec) => if vec.is_empty() { - done_tx.send(ResolveStatus::Failure).unwrap(); + status_tx.send(ResolveStatus::Failure).unwrap(); } else { - done_tx.send(ResolveStatus::Success).unwrap(); + status_tx.send(ResolveStatus::Success).unwrap(); }, Err(error) => { match *error { ResolverError::ConnectionTimeout | ResolverError::NameServerNotResolved => { debug!("failed to resolve {:?}: {}", name, error); - done_tx.send(ResolveStatus::Failure).unwrap(); + status_tx.send(ResolveStatus::Failure).unwrap(); } _ => { error!("failed to resolve {:?}: {}", name, error); - done_tx.send(ResolveStatus::Error).unwrap(); + status_tx.send(ResolveStatus::Error).unwrap(); } } } diff --git a/src/resolve/resolver_threadpool.rs b/src/resolve/resolver_threadpool.rs index 935e4d2..4cf541d 100644 --- a/src/resolve/resolver_threadpool.rs +++ b/src/resolve/resolver_threadpool.rs @@ -1,18 +1,21 @@ use std::sync::mpsc; use std::thread; use std::net::SocketAddr; +use std::time::{Instant, Duration}; use tokio_core::reactor::Core; use futures::Stream; +use futures::Sink; use futures::stream; use futures::Future; +use futures::sync::mpsc as future_mpsc; use crossbeam; use num_cpus; -use batch::QueryType; +use resolve::batch::QueryType; +use resolve::batch::StatusTx; use resolve::resolver::TrustDNSResolver; -use resolve::batch::ResolveStatus; use resolve::error::ResolverError; use config::CONFIG; @@ -37,20 +40,20 @@ impl ResolverThreadPool { self.tasks.push(task) } - pub fn start(self, status: mpsc::Sender<ResolveStatus>) { + pub fn start(self, status: StatusTx) { let tasks_cnt = self.tasks.len(); let chunk_size = tasks_cnt / self.workers_cnt + 1; - let sim_tasks = CONFIG.read().unwrap().tasks() as usize / self.workers_cnt ; + let qps = CONFIG.read().unwrap().qps() as usize; crossbeam::scope(|scope| { scope.defer(|| debug!("Exiting crosspbeam scope")); + let mut trigger = TriggerTimer::new(qps, tasks_cnt); + for chunk in self.tasks.chunks(chunk_size) .map(|chunk| chunk.to_vec()) { + let trigger_handle = trigger.get_handle(); let status = status.clone(); - let dns = CONFIG.read().unwrap() - .dns_list() - .to_vec(); scope.spawn(move || { let thread = thread::current(); @@ -58,46 +61,92 @@ impl ResolverThreadPool { .unwrap_or("Unknown"); debug!("Started worker thread ({})", tname); - ResolverThread::thread_fn(chunk, status, dns, sim_tasks); + ResolverThread::thread_main(chunk, status, trigger_handle); debug!("Terminated worker thread: ({})", tname); }); } + + let dns = CONFIG.read().unwrap().dns_list().to_vec(); + scope.spawn(move || { + trigger.thread_main(dns) + }); }) } } -struct ResolverThread; +type TriggerTx = future_mpsc::Sender<SocketAddr>; +type TriggerRx = future_mpsc::Receiver<SocketAddr>; + +struct TriggerTimer { + handles: Vec<TriggerTx>, + qps: usize, + triggered: usize, + tasks_cnt: usize, +} + +impl TriggerTimer { + pub fn new(qps: usize, tasks_cnt: usize) -> Self { + TriggerTimer { + handles: vec![], + qps: qps, + triggered: 0, + tasks_cnt: tasks_cnt + } + } + + pub fn get_handle(&mut self) -> TriggerRx { + let (tx, rx) = future_mpsc::channel(self.qps as usize); + self.handles.push(tx); + rx + } + + pub fn thread_main(mut self, dns_list: Vec<SocketAddr>) { + let duration_second = Duration::from_secs(1); + + while self.triggered < self.tasks_cnt { + let start = Instant::now(); + self.trigger_qps(&dns_list); + let end = Instant::now(); + + let diff = end - start; + if diff < duration_second { + thread::sleep(duration_second - diff); + } + } + } + + fn trigger_qps(&mut self, dns_list: &[SocketAddr]) { + let qps_per_handle = (self.qps as f32 / self.handles.len() as f32).ceil() as u32; + debug!("Triggering {} requests per thread", qps_per_handle); + for handle in &mut self.handles { + for i in 0..qps_per_handle { + let dns = dns_list[i as usize % dns_list.len()]; + handle.send(dns).wait().unwrap(); + self.triggered += 1; + } + } + } +} + +struct ResolverThread; impl ResolverThread { - fn thread_fn(tasks: Vec<ResolveTask>, - status: mpsc::Sender<ResolveStatus>, - dns: Vec<SocketAddr>, - sim_tasks: usize) + fn thread_main(tasks: Vec<ResolveTask>, + status: StatusTx, + task_trigger: TriggerRx) { - debug!("Simultaneous tasks: {}", sim_tasks); let mut core = Core::new().unwrap(); let handle = core.handle(); + let qps = CONFIG.read().unwrap().qps(); - let future = { + let future = { let resolver = TrustDNSResolver::new(handle.clone(), status); - let futures_stream = { - let fs = tasks.into_iter() - .enumerate() - .map(move |(idx, task)| { - let idx = idx % dns.len(); - let dns = dns[idx]; - task.resolve(&resolver, dns) - }); - stream::iter::<_, _, _>(fs.map(|x| Ok(x))) - }; - - let future = futures_stream - .buffer_unordered(sim_tasks) - .collect(); - - future.map(|_| ()) - .map_err(|_| ()) + stream::iter::<_,_,_>(tasks.into_iter().map(|x| Ok(x))) + .zip(task_trigger).map(move |(task, dns)| + task.resolve(&resolver, dns).map_err(|_| ())) + .buffer_unordered(qps as usize) + .collect() }; core.run(future).unwrap();