Skip to content

Commit

Permalink
QPS is back! Also a bit of refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mersinvald committed Apr 30, 2017
1 parent c56ba8f commit 64eb696
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 62 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ $HOME/.config/batch_resolve.toml
/etc/batch_resolve.toml
```

Configuration includes DNS servers, maximum simultaneously running resolve tasks and retries on failure count
Configuration includes DNS servers, queries per second amount and retries on failure count
```toml
# DNS servers are only accepted as socket addresses
# If port is not specified default DNS :53 port will be used
dns = [
"8.8.8.8"
]

# Simultaneous resolve tasks
tasks = 5000
# How many queries to perform per second
queries_per_second = 2000

# Times to retry on connection timeout
retry = 5
Expand Down
6 changes: 3 additions & 3 deletions README_RUS.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ $HOME/.config/batch_resolve.toml
/etc/batch_resolve.toml
```

Конфигурация включает DNS сервера, количество одновременно выполняемых задач резолва и количество повторов по таймауту.
Конфигурация включает DNS сервера, количество запросов в секунду и количество повторов по таймауту.
```toml
# Адреса DNS серверов
# Если порт не указан -- по умолчанию будет использован полт 53
dns = [
"8.8.8.8"
]

# Количество одновременных задач
tasks = 5000
# Количество запросов в секунду
queries_per_second = 2000

# Количество повторов запроса по таймауту
retry = 5
Expand Down
4 changes: 2 additions & 2 deletions batch_resolve.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ dns = [
#"77.88.8.1:53"
]

# Simultaneous resolve tasks
tasks = 5000
# How many queries to perform per second
queries_per_second = 2000

# Times to retry on connection timeout
retry = 5
16 changes: 8 additions & 8 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ lazy_static! {
];

static ref DEFAULT_TIMEOUT_RETRIES: u32 = 10;
static ref DEFAULT_TASKS: u32 = 5000;
static ref DEFAULT_QPS: u32 = 5000;

pub static ref CONFIG: Arc<RwLock<Config>> = Arc::new(RwLock::new(Config::new()));
}

#[derive(Debug)]
pub struct Config {
dns_list: Vec<SocketAddr>,
tasks: u32,
qps: u32,
timeout_retries: u32,
}

impl Default for Config {
fn default() -> Self {
Config {
dns_list: DEFAULT_DNS_SERVERS.clone(),
tasks: *DEFAULT_TASKS,
qps: *DEFAULT_QPS,
timeout_retries: *DEFAULT_TIMEOUT_RETRIES,
}
}
Expand All @@ -46,8 +46,8 @@ impl Config {
self.timeout_retries
}

pub fn tasks(&self) -> u32 {
self.tasks
pub fn qps(&self) -> u32 {
self.qps
}

pub fn dns_list(&self) -> &[SocketAddr] {
Expand All @@ -59,7 +59,7 @@ impl Config {
struct Config {
dns: Option<Vec<String>>,
retry: Option<u32>,
tasks: Option<u32>,
queries_per_second: Option<u32>,
}

let mut cfg_fmt: Config = toml::from_str(string)?;
Expand All @@ -85,8 +85,8 @@ impl Config {
self.timeout_retries = retry;
}

if let Some(tasks) = cfg_fmt.tasks {
self.tasks = tasks;
if let Some(qps) = cfg_fmt.queries_per_second {
self.qps = qps;
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions src/resolve/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub struct Status {
pub running: u64,
}

pub type StatusTx = mpsc::Sender<ResolveStatus>;

#[derive(Copy, Clone, Debug)]
pub enum ResolveStatus {
Started,
Expand Down
27 changes: 13 additions & 14 deletions src/resolve/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::cell::Cell;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::borrow::Borrow;
use std::sync::mpsc;

use futures::Future;
use futures::future;
Expand All @@ -21,7 +20,7 @@ use trust_dns::op::message::Message;
use trust_dns::error::ClientErrorKind;

use resolve::error::*;
use resolve::batch::{ResolveStatus, QueryType};
use resolve::batch::{StatusTx, ResolveStatus, QueryType};
use config::CONFIG;

fn make_client(loop_handle: Handle, name_server: SocketAddr) -> BasicClientHandle {
Expand Down Expand Up @@ -62,14 +61,14 @@ impl ClientFactory {

pub struct TrustDNSResolver {
loop_handle: Handle,
done_tx: mpsc::Sender<ResolveStatus>,
status_tx: StatusTx,
}

impl TrustDNSResolver {
pub fn new(loop_handle: Handle, done_tx: mpsc::Sender<ResolveStatus>) -> Self {
pub fn new(loop_handle: Handle, status_tx: StatusTx) -> Self {
TrustDNSResolver {
loop_handle: loop_handle.clone(),
done_tx: done_tx
status_tx: status_tx
}
}
}
Expand All @@ -80,8 +79,8 @@ impl TrustDNSResolver {
{
let client_factory = ClientFactory::new(self.loop_handle.clone(), dns);

self.done_tx.send(ResolveStatus::Started).unwrap();
let done_tx = self.done_tx.clone();
self.status_tx.send(ResolveStatus::Started).unwrap();
let status_tx = self.status_tx.clone();

let future = match query_type {
QueryType::PTR => self.reverse_resolve(client_factory, name),
Expand All @@ -90,7 +89,7 @@ impl TrustDNSResolver {

let name = name.to_owned();
let future = future.map(move |msg| msg.extract_answer(query_type))
.then(move |rv| rv.report_status(&name, done_tx))
.then(move |rv| rv.report_status(&name, status_tx))
.then(move |rv| rv.partial_ok());

Box::new(future)
Expand Down Expand Up @@ -385,27 +384,27 @@ impl ExtractAnswer for Message {
}

trait ReportStatus {
fn report_status(self, name: &str, done_tx: mpsc::Sender<ResolveStatus>) -> Self;
fn report_status(self, name: &str, status_tx: StatusTx) -> Self;
}

impl<T> ReportStatus for Result<Vec<T>, ResolverError> {
fn report_status(self, name: &str, done_tx: mpsc::Sender<ResolveStatus>) -> Self {
fn report_status(self, name: &str, status_tx: StatusTx) -> Self {
match self.as_ref() {
Ok(vec) => if vec.is_empty() {
done_tx.send(ResolveStatus::Failure).unwrap();
status_tx.send(ResolveStatus::Failure).unwrap();
} else {
done_tx.send(ResolveStatus::Success).unwrap();
status_tx.send(ResolveStatus::Success).unwrap();
},
Err(error) => {
match *error {
ResolverError::ConnectionTimeout |
ResolverError::NameServerNotResolved => {
debug!("failed to resolve {:?}: {}", name, error);
done_tx.send(ResolveStatus::Failure).unwrap();
status_tx.send(ResolveStatus::Failure).unwrap();
}
_ => {
error!("failed to resolve {:?}: {}", name, error);
done_tx.send(ResolveStatus::Error).unwrap();
status_tx.send(ResolveStatus::Error).unwrap();
}
}
}
Expand Down
113 changes: 81 additions & 32 deletions src/resolve/resolver_threadpool.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use std::sync::mpsc;
use std::thread;
use std::net::SocketAddr;
use std::time::{Instant, Duration};

use tokio_core::reactor::Core;
use futures::Stream;
use futures::Sink;
use futures::stream;
use futures::Future;
use futures::sync::mpsc as future_mpsc;

use crossbeam;
use num_cpus;

use batch::QueryType;
use resolve::batch::QueryType;
use resolve::batch::StatusTx;
use resolve::resolver::TrustDNSResolver;
use resolve::batch::ResolveStatus;
use resolve::error::ResolverError;
use config::CONFIG;

Expand All @@ -37,67 +40,113 @@ impl ResolverThreadPool {
self.tasks.push(task)
}

pub fn start(self, status: mpsc::Sender<ResolveStatus>) {
pub fn start(self, status: StatusTx) {
let tasks_cnt = self.tasks.len();
let chunk_size = tasks_cnt / self.workers_cnt + 1;
let sim_tasks = CONFIG.read().unwrap().tasks() as usize / self.workers_cnt ;
let qps = CONFIG.read().unwrap().qps() as usize;

crossbeam::scope(|scope| {
scope.defer(|| debug!("Exiting crosspbeam scope"));
let mut trigger = TriggerTimer::new(qps, tasks_cnt);

for chunk in self.tasks.chunks(chunk_size)
.map(|chunk| chunk.to_vec())
{
let trigger_handle = trigger.get_handle();
let status = status.clone();
let dns = CONFIG.read().unwrap()
.dns_list()
.to_vec();

scope.spawn(move || {
let thread = thread::current();
let tname = thread.name()
.unwrap_or("Unknown");

debug!("Started worker thread ({})", tname);
ResolverThread::thread_fn(chunk, status, dns, sim_tasks);
ResolverThread::thread_main(chunk, status, trigger_handle);
debug!("Terminated worker thread: ({})", tname);
});
}

let dns = CONFIG.read().unwrap().dns_list().to_vec();
scope.spawn(move || {
trigger.thread_main(dns)
});
})
}
}

struct ResolverThread;

type TriggerTx = future_mpsc::Sender<SocketAddr>;
type TriggerRx = future_mpsc::Receiver<SocketAddr>;

struct TriggerTimer {
handles: Vec<TriggerTx>,
qps: usize,
triggered: usize,
tasks_cnt: usize,
}

impl TriggerTimer {
pub fn new(qps: usize, tasks_cnt: usize) -> Self {
TriggerTimer {
handles: vec![],
qps: qps,
triggered: 0,
tasks_cnt: tasks_cnt
}
}

pub fn get_handle(&mut self) -> TriggerRx {
let (tx, rx) = future_mpsc::channel(self.qps as usize);
self.handles.push(tx);
rx
}

pub fn thread_main(mut self, dns_list: Vec<SocketAddr>) {
let duration_second = Duration::from_secs(1);

while self.triggered < self.tasks_cnt {
let start = Instant::now();
self.trigger_qps(&dns_list);
let end = Instant::now();

let diff = end - start;
if diff < duration_second {
thread::sleep(duration_second - diff);
}
}
}

fn trigger_qps(&mut self, dns_list: &[SocketAddr]) {
let qps_per_handle = (self.qps as f32 / self.handles.len() as f32).ceil() as u32;
debug!("Triggering {} requests per thread", qps_per_handle);
for handle in &mut self.handles {
for i in 0..qps_per_handle {
let dns = dns_list[i as usize % dns_list.len()];
handle.send(dns).wait().unwrap();
self.triggered += 1;
}
}
}
}

struct ResolverThread;
impl ResolverThread {
fn thread_fn(tasks: Vec<ResolveTask>,
status: mpsc::Sender<ResolveStatus>,
dns: Vec<SocketAddr>,
sim_tasks: usize)
fn thread_main(tasks: Vec<ResolveTask>,
status: StatusTx,
task_trigger: TriggerRx)
{
debug!("Simultaneous tasks: {}", sim_tasks);
let mut core = Core::new().unwrap();
let handle = core.handle();
let qps = CONFIG.read().unwrap().qps();

let future = {
let future = {
let resolver = TrustDNSResolver::new(handle.clone(), status);

let futures_stream = {
let fs = tasks.into_iter()
.enumerate()
.map(move |(idx, task)| {
let idx = idx % dns.len();
let dns = dns[idx];
task.resolve(&resolver, dns)
});
stream::iter::<_, _, _>(fs.map(|x| Ok(x)))
};

let future = futures_stream
.buffer_unordered(sim_tasks)
.collect();

future.map(|_| ())
.map_err(|_| ())
stream::iter::<_,_,_>(tasks.into_iter().map(|x| Ok(x)))
.zip(task_trigger).map(move |(task, dns)|
task.resolve(&resolver, dns).map_err(|_| ()))
.buffer_unordered(qps as usize)
.collect()
};

core.run(future).unwrap();
Expand Down

0 comments on commit 64eb696

Please sign in to comment.