Skip to content

Commit

Permalink
refactor: [#1096] run IP bans cleaner to a new thread
Browse files Browse the repository at this point in the history
Running the cleaner check on each iteration decreased the UDP tracker
performance.
  • Loading branch information
josecelano committed Dec 10, 2024
1 parent 8dcc205 commit 77cf089
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 51 deletions.
55 changes: 9 additions & 46 deletions src/servers/udp/server/banning.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
use std::net::IpAddr;
use std::time::Duration;

use bloom::{CountingBloomFilter, ASMS};
use tokio::time::Instant;
use url::Url;

use crate::servers::udp::UDP_TRACKER_LOG_TARGET;

/// The maximum number of connection id errors per ip. Clients will be banned if
/// they exceed this limit.
pub const MAX_CONNECTION_ID_ERRORS_PER_IP: u32 = 10;
pub const RESET_CONNECTION_ID_ERRORS_COUNTER_FREQUENCY_IN_SECS: u64 = 3600;

pub struct BanService {
max_connection_id_errors_per_ip: u32,
ban_duration: Duration,
connection_id_errors_per_ip: CountingBloomFilter,
local_addr: Url,
last_connection_id_errors_reset: Instant,
}

impl BanService {
#[must_use]
pub fn new(max_connection_id_errors_per_ip: u32, duration_in_seconds: u64, local_addr: Url) -> Self {
pub fn new(max_connection_id_errors_per_ip: u32, local_addr: Url) -> Self {
Self {
max_connection_id_errors_per_ip,
ban_duration: Duration::from_secs(duration_in_seconds),
local_addr,
connection_id_errors_per_ip: CountingBloomFilter::with_rate(4, 0.01, 100),
last_connection_id_errors_reset: tokio::time::Instant::now(),
Expand All @@ -48,14 +40,8 @@ impl BanService {
connection_id_errors_from_ip > self.max_connection_id_errors_per_ip
}

pub fn run_bans_cleaner(&mut self) {
if self.last_connection_id_errors_reset.elapsed() >= self.ban_duration {
self.reset_filter();
}
}

/// Resets the filter and updates the reset timestamp.
pub fn reset_filter(&mut self) {
pub fn reset_bans(&mut self) {
self.connection_id_errors_per_ip.clear();

self.last_connection_id_errors_reset = Instant::now();
Expand All @@ -68,23 +54,18 @@ impl BanService {
#[cfg(test)]
mod tests {
use std::net::IpAddr;
use std::time::Duration;

use tokio::time::sleep;

use super::BanService;

/// Sample service with one day ban duration.
fn service_with_one_day_ban(counter_limit: u32) -> BanService {
let one_day_in_seconds = 86400;
fn ban_service(counter_limit: u32) -> BanService {
let udp_tracker_url = "udp://127.0.0.1".parse().unwrap();

BanService::new(counter_limit, one_day_in_seconds, udp_tracker_url)
BanService::new(counter_limit, udp_tracker_url)
}

#[test]
fn it_should_increase_the_ip_counter() {
let mut ban_service = service_with_one_day_ban(1);
let mut ban_service = ban_service(1);

let ip: IpAddr = "127.0.0.2".parse().unwrap();

Expand All @@ -95,7 +76,7 @@ mod tests {

#[test]
fn it_should_ban_ips_with_counters_exceeding_a_predefined_limit() {
let mut ban_service = service_with_one_day_ban(1);
let mut ban_service = ban_service(1);

let ip: IpAddr = "127.0.0.2".parse().unwrap();

Expand All @@ -107,7 +88,7 @@ mod tests {

#[test]
fn it_should_not_ban_ips_whose_counters_do_not_exceed_the_predefined_limit() {
let mut ban_service = service_with_one_day_ban(1);
let mut ban_service = ban_service(1);

let ip: IpAddr = "127.0.0.2".parse().unwrap();

Expand All @@ -118,31 +99,13 @@ mod tests {

#[test]
fn it_should_allow_resetting_all_the_counters() {
let mut ban_service = service_with_one_day_ban(1);

let ip: IpAddr = "127.0.0.2".parse().unwrap();

ban_service.increase_counter(&ip); // Counter = 1

ban_service.reset_filter();

assert_eq!(ban_service.get_counter(&ip), 0);
}

#[tokio::test]
async fn it_should_allow_run_a_bans_cleaner_to_reset_the_counters_periodically() {
let udp_tracker_url = "udp://127.0.0.1".parse().unwrap();
let ban_duration_in_secs = 1;

let mut ban_service = BanService::new(1, ban_duration_in_secs, udp_tracker_url);
let mut ban_service = ban_service(1);

let ip: IpAddr = "127.0.0.2".parse().unwrap();

ban_service.increase_counter(&ip); // Counter = 1

sleep(Duration::from_secs(2)).await;

ban_service.run_bans_cleaner();
ban_service.reset_bans();

assert_eq!(ban_service.get_counter(&ip), 0);
}
Expand Down
25 changes: 20 additions & 5 deletions src/servers/udp/server/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use derive_more::Constructor;
use futures_util::StreamExt;
use tokio::select;
use tokio::sync::{oneshot, RwLock};
use tokio::time::interval;
use tracing::instrument;

use super::banning::{BanService, MAX_CONNECTION_ID_ERRORS_PER_IP, RESET_CONNECTION_ID_ERRORS_COUNTER_FREQUENCY_IN_SECS};
use super::banning::BanService;
use super::request_buffer::ActiveRequests;
use crate::bootstrap::jobs::Started;
use crate::core::Tracker;
Expand All @@ -21,6 +22,11 @@ use crate::servers::udp::server::processor::Processor;
use crate::servers::udp::server::receiver::Receiver;
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;

/// The maximum number of connection id errors per ip. Clients will be banned if
/// they exceed this limit.
const MAX_CONNECTION_ID_ERRORS_PER_IP: u32 = 10;
const IP_BANS_RESET_INTERVAL_IN_SECS: u64 = 3600;

/// A UDP server instance launcher.
#[derive(Constructor)]
pub struct Launcher;
Expand Down Expand Up @@ -123,14 +129,23 @@ impl Launcher {

let ban_service = Arc::new(RwLock::new(BanService::new(
MAX_CONNECTION_ID_ERRORS_PER_IP,
RESET_CONNECTION_ID_ERRORS_COUNTER_FREQUENCY_IN_SECS,
local_addr.parse().unwrap(),
)));

loop {
// code-review: the ban service could spawn a task to clear the bans.
ban_service.write().await.run_bans_cleaner();
let ban_cleaner = ban_service.clone();

tokio::spawn(async move {
let mut cleaner_interval = interval(Duration::from_secs(IP_BANS_RESET_INTERVAL_IN_SECS));

cleaner_interval.tick().await;

loop {
cleaner_interval.tick().await;
ban_cleaner.write().await.reset_bans();
}
});

loop {
let processor = Processor::new(receiver.socket.clone(), tracker.clone(), cookie_lifetime);

if let Some(req) = {
Expand Down

0 comments on commit 77cf089

Please sign in to comment.