Skip to content

Commit

Permalink
Add dynamic connections to network worker
Browse files Browse the repository at this point in the history
  • Loading branch information
erthalion committed Jun 14, 2024
1 parent 6766339 commit baf1304
Showing 1 changed file with 84 additions and 0 deletions.
84 changes: 84 additions & 0 deletions src/worker/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ impl NetworkWorker {
let (mut iface, mut device, fd) = self.setup_tuntap(addr);
let cx = iface.context();

// Dynamic sockets are going to be responsible for connections that
// will be opened/closed during the test. Every record contains:
// * socket handle (just an index inside smoltcp)
// * time when the connection was opened
// * connection lifetime
let mut dynamic_sockets = HashMap::new();

// Open static set of connections, that are going to live throughout
// the whole run
let mut sockets = SocketSet::new(vec![]);
Expand Down Expand Up @@ -141,18 +148,86 @@ impl NetworkWorker {
// own.
let mut send_timer = SystemTime::now();

// Timer and waiting interval for the next new dynamic connection
let mut arrivals = SystemTime::now();
let mut interval: f64 =
thread_rng().sample(Exp::new(arrival_rate).unwrap());

// Current number of opened connections, both dynamic and static
let mut total_conns = nconnections;

// The main loop, where connection state will be updated, and dynamic
// connections will be opened/closed
loop {
// Vector of sockets to close at the end of each loop
let mut close_sockets = vec![];

let timestamp = Instant::now();
iface.poll(timestamp, &mut device, &mut sockets);

let elapsed = arrivals.elapsed().unwrap().as_millis();
if elapsed > (interval * 1000.0).round() as u128 {
// Time for a new connection, add a socket, it state is going
// to be updated during the next loop round
total_conns = total_conns + 1;

let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 1024]);
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 1024]);
let mut socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer);

let index = total_conns as usize;
let (local_addr, local_port) =
self.get_local_addr_port(addr, index);

socket
.connect(cx, (addr, target_port), (local_addr, local_port))
.unwrap();

let handle = sockets.add(socket);
let lifetime: f64 =
thread_rng().sample(Exp::new(departure_rate).unwrap());

dynamic_sockets.insert(handle, (SystemTime::now(), lifetime));

info!(
"New connecting from {}:{}, lifetime {}, index {}",
local_addr,
local_port,
lifetime,
index - 1
);

// set new interval for the next new connection
interval = thread_rng().sample(Exp::new(arrival_rate).unwrap());
arrivals = SystemTime::now();
}

// Iterate through all sockets, update the state for each one
for (i, (h, s)) in sockets.iter_mut().enumerate() {
let socket = tcp::Socket::downcast_mut(s)
.ok_or(WorkerError::Internal)?;

info!("Process socket {}, {}", i, socket.state());
match dynamic_sockets.get(&h) {
Some((timer, life)) => {
// A dynamic connection, verify lifetime
debug!("Dynamic socket {}", i);
if timer.elapsed().unwrap().as_millis()
> (life * 1000.0).round() as u128
{
info!("Close socket {}", i);
socket.close();
dynamic_sockets.remove(&h);
close_sockets.push(h);
continue;
}
}
None => {
// Static connection, continue
debug!("Static socket {}", i);
}
}

if socket.can_recv() {
socket
.recv(|data| {
Expand Down Expand Up @@ -190,6 +265,15 @@ impl NetworkWorker {
}
}

for h in close_sockets {
info!("Close handle {}", h);
// TODO: reuse sockets
sockets.remove(h);
total_conns = total_conns - 1;
}

info!("Sockets: {}", total_conns);

// We cant wait only for iface.poll_delay(timestamp, &sockets)
// interval, since the loop could stuck without any activity
// making no progress. To prevent that specify a minimum waiting
Expand Down

0 comments on commit baf1304

Please sign in to comment.