Skip to content

Commit

Permalink
Merge pull request #1339 from plebhash/intercept-message-cleanup
Browse files Browse the repository at this point in the history
`InterceptMessage` cleanup
  • Loading branch information
plebhash authored Jan 21, 2025
2 parents 6135ef2 + 20ffef2 commit 31433ab
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 40 deletions.
43 changes: 13 additions & 30 deletions roles/tests-integration/lib/sniffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type MsgType = u8;
enum SnifferError {
DownstreamClosed,
UpstreamClosed,
MessageInterrupted,
}

/// Allows to intercept messages sent between two roles.
Expand All @@ -47,10 +46,7 @@ enum SnifferError {
/// In order to alter the messages sent between the roles, the [`Sniffer::intercept_messages`]
/// field can be used. It will look for the [`InterceptMessage::expected_message_type`] in the
/// specified [`InterceptMessage::direction`] and replace it with
/// [`InterceptMessage::response_message`].
///
/// If `break_on` is set to `true`, the [`Sniffer`] will stop the communication after sending the
/// response message.
/// [`InterceptMessage::replacement_message`].
///
/// Can be useful for testing purposes, as it allows to assert that the roles have sent specific
/// messages in a specific order and to inspect the messages details.
Expand All @@ -69,25 +65,22 @@ pub struct Sniffer {
pub struct InterceptMessage {
direction: MessageDirection,
expected_message_type: MsgType,
response_message: PoolMessages<'static>,
response_message_type: MsgType,
break_on: bool,
replacement_message: PoolMessages<'static>,
replacement_message_type: MsgType,
}

impl InterceptMessage {
pub fn new(
direction: MessageDirection,
expected_message_type: MsgType,
response_message: PoolMessages<'static>,
response_message_type: MsgType,
break_on: bool,
replacement_message: PoolMessages<'static>,
replacement_message_type: MsgType,
) -> Self {
Self {
direction,
expected_message_type,
response_message,
response_message_type,
break_on,
replacement_message,
replacement_message_type,
}
}
}
Expand Down Expand Up @@ -234,21 +227,16 @@ impl Sniffer {
let channel_msg = false;
let frame = StandardEitherFrame::<AnyMessage<'_>>::Sv2(
Sv2Frame::from_message(
intercept_message.response_message.clone(),
intercept_message.response_message_type,
intercept_message.replacement_message.clone(),
intercept_message.replacement_message_type,
extension_type,
channel_msg,
)
.expect("Failed to create the frame"),
);
downstream_messages
.add_message(msg_type, intercept_message.response_message.clone());
.add_message(msg_type, intercept_message.replacement_message.clone());
let _ = send.send(frame).await;
if intercept_message.break_on {
return Err(SnifferError::MessageInterrupted);
} else {
continue;
}
}
}

Expand Down Expand Up @@ -276,21 +264,16 @@ impl Sniffer {
let channel_msg = false;
let frame = StandardEitherFrame::<AnyMessage<'_>>::Sv2(
Sv2Frame::from_message(
intercept_message.response_message.clone(),
intercept_message.response_message_type,
intercept_message.replacement_message.clone(),
intercept_message.replacement_message_type,
extension_type,
channel_msg,
)
.expect("Failed to create the frame"),
);
upstream_messages
.add_message(msg_type, intercept_message.response_message.clone());
.add_message(msg_type, intercept_message.replacement_message.clone());
let _ = send.send(frame).await;
if intercept_message.break_on {
return Err(SnifferError::MessageInterrupted);
} else {
continue;
}
}
}
if send.send(frame).await.is_err() {
Expand Down
35 changes: 25 additions & 10 deletions roles/tests-integration/tests/sniffer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ use roles_logic_sv2::{
use sniffer::{InterceptMessage, MessageDirection};
use std::convert::TryInto;

// this test aims to assert that Sniffer is able to intercept and replace some message
// sniffer_a replaces a SetupConnectionSuccess from TP with a SetupConnectionError directed at Pool
// sniffer_b asserts that Pool is about to receive a SetupConnectionError
// TP -> sniffer_a -> sniffer_b -> Pool
#[tokio::test]
async fn test_sniffer_interrupter() {
async fn test_sniffer_intercept() {
let (_tp, tp_addr) = start_template_provider(None).await;
let message =
let message_replacement =
PoolMessages::Common(CommonMessages::SetupConnectionError(SetupConnectionError {
flags: 0,
error_code: "unsupported-feature-flags"
Expand All @@ -22,18 +26,29 @@ async fn test_sniffer_interrupter() {
.try_into()
.unwrap(),
}));
let interrupt_msgs = InterceptMessage::new(
let intercept = InterceptMessage::new(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS,
message,
message_replacement,
MESSAGE_TYPE_SETUP_CONNECTION_ERROR,
true,
);
let (sniffer, sniffer_addr) =
start_sniffer("".to_string(), tp_addr, false, Some(vec![interrupt_msgs])).await;
let _ = start_pool(Some(sniffer_addr)).await;
assert_common_message!(&sniffer.next_message_from_downstream(), SetupConnection);
assert_common_message!(&sniffer.next_message_from_upstream(), SetupConnectionError);

// this sniffer will replace SetupConnectionSuccess with SetupConnectionError
let (_sniffer_a, sniffer_a_addr) =
start_sniffer("A".to_string(), tp_addr, false, Some(vec![intercept])).await;

// this sniffer will assert SetupConnectionSuccess was correctly replaced with
// SetupConnectionError
let (sniffer_b, sniffer_b_addr) =
start_sniffer("B".to_string(), sniffer_a_addr, false, None).await;

let _ = start_pool(Some(sniffer_b_addr)).await;

// assert sniffer_a functionality of replacing messages work as expected (goal of this test)
assert_common_message!(
&sniffer_b.next_message_from_upstream(),
SetupConnectionError
);
}

#[tokio::test]
Expand Down

0 comments on commit 31433ab

Please sign in to comment.