diff --git a/src/worker/network.rs b/src/worker/network.rs index 23edb44..8ec56a9 100644 --- a/src/worker/network.rs +++ b/src/worker/network.rs @@ -103,8 +103,8 @@ impl NetworkWorker { server: _, address: _, target_port: _, - arrival_rate: _, - departure_rate: _, + arrival_rate, + departure_rate, nconnections, send_interval, } = self.workload.workload @@ -112,11 +112,18 @@ impl NetworkWorker { unreachable!() }; - debug!("Starting client at {:?}:{:?}", addr, target_port); + debug!("Starting client, target {:?}:{:?}", addr, target_port); let (mut iface, mut device, fd) = self.setup_tuntap(addr); let cx = iface.context(); + // Dynamic sockets are going to be responsible for connections that + // will be opened/closed during the test. Every record contains: + // * socket handle (just an index inside smoltcp) + // * time when the connection was opened + // * connection lifetime + let mut dynamic_sockets = HashMap::new(); + // Open static set of connections, that are going to live throughout // the whole run let mut sockets = SocketSet::new(vec![]); @@ -150,18 +157,90 @@ impl NetworkWorker { // own. let mut send_timer = SystemTime::now(); + // Timer and waiting interval for the next new dynamic connection + let mut arrivals = SystemTime::now(); + let mut interval: f64 = + thread_rng().sample(Exp::new(arrival_rate).unwrap()); + + // Current number of opened connections, both dynamic and static + let mut total_conns = nconnections; + // The main loop, where connection state will be updated, and dynamic // connections will be opened/closed loop { + // Vector of sockets to close at the end of each loop + let mut close_sockets = vec![]; + let timestamp = Instant::now(); iface.poll(timestamp, &mut device, &mut sockets); + let elapsed = arrivals.elapsed().unwrap().as_millis(); + if elapsed > (interval * 1000.0).round() as u128 { + // Time for a new connection, add a socket, it state is going + // to be updated during the next loop round + total_conns = total_conns + 1; + + let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 1024]); + let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 1024]); + let mut socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer); + + let index = total_conns as usize; + let (local_addr, local_port) = + self.get_local_addr_port(addr, index); + + socket + .connect( + iface.context(), + (addr, target_port), + (local_addr, local_port), + ) + .unwrap(); + + let handle = sockets.add(socket); + let lifetime: f64 = + thread_rng().sample(Exp::new(departure_rate).unwrap()); + + dynamic_sockets.insert(handle, (SystemTime::now(), lifetime)); + + info!( + "New connecting from {}:{}, lifetime {}, index {}", + local_addr, + local_port, + lifetime, + index - 1 + ); + + // set new interval for the next new connection + interval = thread_rng().sample(Exp::new(arrival_rate).unwrap()); + arrivals = SystemTime::now(); + } + // Iterate through all sockets, update the state for each one for (i, (h, s)) in sockets.iter_mut().enumerate() { let socket = tcp::Socket::downcast_mut(s) .ok_or(WorkerError::Internal)?; info!("Process socket {}, {}", i, socket.state()); + match dynamic_sockets.get(&h) { + Some((timer, life)) => { + // A dynamic connection, verify lifetime + debug!("Dynamic socket {}", i); + if timer.elapsed().unwrap().as_millis() + > (life * 1000.0).round() as u128 + { + info!("Close socket {}", i); + socket.close(); + dynamic_sockets.remove(&h); + close_sockets.push(h); + continue; + } + } + None => { + // Static connection, continue + debug!("Static socket {}", i); + } + } + if socket.can_recv() { socket .recv(|data| { @@ -199,14 +278,27 @@ impl NetworkWorker { } } + for h in close_sockets { + info!("Close handle {}", h); + // TODO: reuse sockets + sockets.remove(h); + total_conns = total_conns - 1; + } + + info!("Sockets: {}", total_conns); + // We cant wait only for iface.poll_delay(timestamp, &sockets) // interval, since the loop could stuck without any activity // making no progress. To prevent that specify a minimum waiting // duration of 100 milliseconds. + let min_duration = smoltcp::time::Duration::from_millis(100); + let duration = iface .poll_delay(timestamp, &sockets) - .min(Some(smoltcp::time::Duration::from_millis(100))); + .min(Some(min_duration)) + .or_else(|| Some(min_duration)); + info!("wait duration {:?}", duration); phy_wait(fd, duration).expect("wait error"); } }