Skip to content

Commit

Permalink
test: [#1096] add E2E test for banning IPs sending bad connection IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
josecelano committed Dec 9, 2024
1 parent 2898a44 commit 446a906
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 28 deletions.
9 changes: 6 additions & 3 deletions src/servers/udp/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ impl CookieTimeValues {
/// - Delegating the request to the correct handler depending on the request type.
///
/// It will return an `Error` response if the request is invalid.
#[instrument(fields(request_id), skip(udp_request, tracker, cookie_time_values, cbf), ret(level = Level::TRACE))]
#[instrument(fields(request_id), skip(udp_request, tracker, cookie_time_values, connection_id_errors_per_ip), ret(level = Level::TRACE))]
pub(crate) async fn handle_packet(
udp_request: RawRequest,
tracker: &Tracker,
local_addr: SocketAddr,
cookie_time_values: CookieTimeValues,
cbf: Arc<RwLock<CountingBloomFilter>>,
connection_id_errors_per_ip: Arc<RwLock<CountingBloomFilter>>,
) -> Response {
tracing::Span::current().record("request_id", Uuid::new_v4().to_string());
tracing::debug!("Handling Packets: {udp_request:?}");
Expand All @@ -76,7 +76,10 @@ pub(crate) async fn handle_packet(
| Error::CookieValueExpired { .. }
| Error::CookieValueFromFuture { .. } => {
// code-review: should we include `RequestParseError` and `BadRequest`?
cbf.write().await.insert(&udp_request.from.ip().to_string());
connection_id_errors_per_ip
.write()
.await
.insert(&udp_request.from.ip().to_string());
}
_ => {}
}
Expand Down
6 changes: 3 additions & 3 deletions src/servers/udp/server/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl Launcher {

// Create a counting bloom filter that uses 4 bits per element and has a
// false positive rate of 0.01 when 100 items have been inserted
let cbf = Arc::new(RwLock::new(CountingBloomFilter::with_rate(4, 0.01, 100)));
let connection_id_errors_per_ip = Arc::new(RwLock::new(CountingBloomFilter::with_rate(4, 0.01, 100)));

let addr = receiver.bound_socket_address();
let local_addr = format!("udp://{addr}");
Expand Down Expand Up @@ -149,7 +149,7 @@ impl Launcher {
}
};

let connection_id_errors_from_ip = cbf.read().await.estimate_count(&req.from.ip().to_string());
let connection_id_errors_from_ip = connection_id_errors_per_ip.read().await.estimate_count(&req.from.ip().to_string());

if connection_id_errors_from_ip > MAX_CONNECTION_ID_ERRORS_PER_IP {
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop continue: (banned ip)");
Expand All @@ -168,7 +168,7 @@ impl Launcher {
// chance to finish. However, the buffer is yielding before
// aborting one tasks, giving it the chance to finish.
let abort_handle: tokio::task::AbortHandle =
tokio::task::spawn(processor.process_request(req, cbf.clone())).abort_handle();
tokio::task::spawn(processor.process_request(req, connection_id_errors_per_ip.clone())).abort_handle();

if abort_handle.is_finished() {
continue;
Expand Down
6 changes: 3 additions & 3 deletions src/servers/udp/server/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ impl Processor {
}
}

#[instrument(skip(self, request, cbf))]
pub async fn process_request(self, request: RawRequest, cbf: Arc<RwLock<CountingBloomFilter>>) {
#[instrument(skip(self, request, connection_id_errors_per_ip))]
pub async fn process_request(self, request: RawRequest, connection_id_errors_per_ip: Arc<RwLock<CountingBloomFilter>>) {
let from = request.from;
let response = handlers::handle_packet(
request,
&self.tracker,
self.socket.address(),
CookieTimeValues::new(self.cookie_lifetime),
cbf,
connection_id_errors_per_ip,
)
.await;

Expand Down
87 changes: 68 additions & 19 deletions tests/servers/udp/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,31 @@ mod receiving_an_announce_request {
use crate::servers::udp::contract::send_connection_request;
use crate::servers::udp::Started;

pub async fn send_and_get_announce(tx_id: TransactionId, c_id: ConnectionId, client: &UdpTrackerClient) {
// Send announce request
pub async fn assert_send_and_get_announce(tx_id: TransactionId, c_id: ConnectionId, client: &UdpTrackerClient) {
let response = send_and_get_announce(tx_id, c_id, client).await;
assert!(is_ipv4_announce_response(&response));
}

pub async fn send_and_get_announce(
tx_id: TransactionId,
c_id: ConnectionId,
client: &UdpTrackerClient,
) -> aquatic_udp_protocol::Response {
let announce_request = build_sample_announce_request(tx_id, c_id, client.client.socket.local_addr().unwrap().port());

match client.send(announce_request.into()).await {
Ok(_) => (),
Err(err) => panic!("{err}"),
};

let announce_request = AnnounceRequest {
match client.receive().await {
Ok(response) => response,
Err(err) => panic!("{err}"),
}
}

fn build_sample_announce_request(tx_id: TransactionId, c_id: ConnectionId, port: u16) -> AnnounceRequest {
AnnounceRequest {
connection_id: ConnectionId(c_id.0),
action_placeholder: AnnounceActionPlaceholder::default(),
transaction_id: tx_id,
Expand All @@ -146,26 +167,34 @@ mod receiving_an_announce_request {
ip_address: Ipv4Addr::new(0, 0, 0, 0).into(),
key: PeerKey::new(0i32),
peers_wanted: NumberOfPeers(1i32.into()),
port: Port(client.client.socket.local_addr().unwrap().port().into()),
};
port: Port(port.into()),
}
}

match client.send(announce_request.into()).await {
Ok(_) => (),
Err(err) => panic!("{err}"),
};
#[tokio::test]
async fn should_return_an_announce_response() {
INIT.call_once(|| {
tracing_stderr_init(LevelFilter::ERROR);
});

let response = match client.receive().await {
Ok(response) => response,
let env = Started::new(&configuration::ephemeral().into()).await;

let client = match UdpTrackerClient::new(env.bind_address(), DEFAULT_TIMEOUT).await {
Ok(udp_tracker_client) => udp_tracker_client,
Err(err) => panic!("{err}"),
};

// println!("test response {response:?}");
let tx_id = TransactionId::new(123);

assert!(is_ipv4_announce_response(&response));
let c_id = send_connection_request(tx_id, &client).await;

assert_send_and_get_announce(tx_id, c_id, &client).await;

env.stop().await;
}

#[tokio::test]
async fn should_return_an_announce_response() {
async fn should_return_many_announce_response() {
INIT.call_once(|| {
tracing_stderr_init(LevelFilter::ERROR);
});
Expand All @@ -181,13 +210,16 @@ mod receiving_an_announce_request {

let c_id = send_connection_request(tx_id, &client).await;

send_and_get_announce(tx_id, c_id, &client).await;
for x in 0..1000 {
tracing::info!("req no: {x}");
assert_send_and_get_announce(tx_id, c_id, &client).await;
}

env.stop().await;
}

#[tokio::test]
async fn should_return_many_announce_response() {
async fn should_ban_the_client_ip_if_it_sends_more_than_10_requests_with_a_cookie_value_not_normal() {
INIT.call_once(|| {
tracing_stderr_init(LevelFilter::ERROR);
});
Expand All @@ -201,13 +233,30 @@ mod receiving_an_announce_request {

let tx_id = TransactionId::new(123);

let c_id = send_connection_request(tx_id, &client).await;
// The eleven first requests should be fine

for x in 0..1000 {
let invalid_connection_id = ConnectionId::new(0); // Zero is one of the not normal values.

for x in 0..=10 {
tracing::info!("req no: {x}");
send_and_get_announce(tx_id, c_id, &client).await;
send_and_get_announce(tx_id, invalid_connection_id, &client).await;
}

// The twelfth request should be banned (timeout error)

let announce_request = build_sample_announce_request(
tx_id,
invalid_connection_id,
client.client.socket.local_addr().unwrap().port(),
);

match client.send(announce_request.into()).await {
Ok(_) => (),
Err(err) => panic!("{err}"),
};

assert!(client.receive().await.is_err());

env.stop().await;
}
}
Expand Down

0 comments on commit 446a906

Please sign in to comment.