Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
**/*.rs.bk
.idea/azure/
.idea/inspectionProfiles/Project_Default.xml
.idea/copilot.data.migration.*

### Node
node_modules
Expand Down
73 changes: 28 additions & 45 deletions multinode_integration_tests/tests/connection_termination_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved.

use masq_lib::blockchains::chains::Chain;
use masq_lib::constants::HTTP_PORT;
use masq_lib::test_utils::utils::TEST_DEFAULT_MULTINODE_CHAIN;
use masq_lib::utils::find_free_port;
use multinode_integration_tests_lib::masq_mock_node::MASQMockNode;
Expand Down Expand Up @@ -86,13 +87,12 @@ fn reported_server_drop() {
let (_, _, lcp) = mock_node
.wait_for_package(&masquerader, Duration::from_secs(2))
.unwrap();
let (stream_key, return_route_id) =
context_from_request_lcp(lcp, real_node.main_cryptde_null().unwrap(), &exit_cryptde);
let stream_key = stream_key_from_request_lcp(lcp, &exit_cryptde);

mock_node
.transmit_package(
mock_node.port_list()[0],
create_server_drop_report(&mock_node, &real_node, stream_key, return_route_id),
create_server_drop_report(&mock_node, &real_node, stream_key),
&masquerader,
real_node.main_public_key(),
real_node.socket_addr(PortSelector::First),
Expand All @@ -115,7 +115,7 @@ fn actual_server_drop() {
let server_port = find_free_port();
let mut server = real_node.make_server(server_port);
let masquerader = JsonMasquerader::new();
let (stream_key, return_route_id) = arbitrary_context();
let stream_key = arbitrary_stream_key();
let index: u64 = 0;
request_server_payload(
index,
Expand All @@ -125,7 +125,6 @@ fn actual_server_drop() {
&mut server,
&masquerader,
stream_key,
return_route_id,
);
let index: u64 = 1;
request_server_payload(
Expand All @@ -136,7 +135,6 @@ fn actual_server_drop() {
&mut server,
&masquerader,
stream_key,
return_route_id,
);

server.shutdown();
Expand Down Expand Up @@ -174,7 +172,6 @@ fn request_server_payload(
server: &mut MASQNodeServer,
masquerader: &JsonMasquerader,
stream_key: StreamKey,
return_route_id: u32,
) {
mock_node
.transmit_package(
Expand All @@ -184,7 +181,6 @@ fn request_server_payload(
&mock_node,
&real_node,
stream_key,
return_route_id,
&server,
cluster.chain,
),
Expand Down Expand Up @@ -212,7 +208,7 @@ fn reported_client_drop() {
let server_port = find_free_port();
let mut server = real_node.make_server(server_port);
let masquerader = JsonMasquerader::new();
let (stream_key, return_route_id) = arbitrary_context();
let stream_key = arbitrary_stream_key();
let index: u64 = 0;
mock_node
.transmit_package(
Expand All @@ -222,7 +218,6 @@ fn reported_client_drop() {
&mock_node,
&real_node,
stream_key,
return_route_id,
&server,
cluster.chain,
),
Expand All @@ -240,7 +235,7 @@ fn reported_client_drop() {
mock_node
.transmit_package(
mock_node.port_list()[0],
create_client_drop_report(&mock_node, &real_node, stream_key, return_route_id),
create_client_drop_report(&mock_node, &real_node, stream_key),
&masquerader,
real_node.main_public_key(),
real_node.socket_addr(PortSelector::First),
Expand Down Expand Up @@ -322,40 +317,32 @@ fn full_neighbor(one: &mut NodeRecord, another: &mut NodeRecord) {
.unwrap();
}

fn context_from_request_lcp(
lcp: LiveCoresPackage,
originating_cryptde: &dyn CryptDE,
exit_cryptde: &dyn CryptDE,
) -> (StreamKey, u32) {
fn stream_key_from_request_lcp(lcp: LiveCoresPackage, exit_cryptde: &dyn CryptDE) -> StreamKey {
let payload = match decodex::<MessageType>(exit_cryptde, &lcp.payload).unwrap() {
MessageType::ClientRequest(vd) => vd
.extract(&node_lib::sub_lib::migrations::client_request_payload::MIGRATIONS)
.unwrap(),
mt => panic!("Unexpected: {:?}", mt),
};
let stream_key = payload.stream_key;
let return_route_id = decodex::<u32>(originating_cryptde, &lcp.route.hops[6]).unwrap();
(stream_key, return_route_id)
stream_key
}

fn arbitrary_context() -> (StreamKey, u32) {
(
StreamKey::make_meaningful_stream_key("arbitrary_context"),
12345678,
)
fn arbitrary_stream_key() -> StreamKey {
StreamKey::make_meaningful_stream_key("arbitrary_context")
}

fn create_request_icp(
index: u64,
originating_node: &MASQMockNode,
exit_node: &MASQRealNode,
stream_key: StreamKey,
return_route_id: u32,
server: &MASQNodeServer,
chain: Chain,
) -> IncipientCoresPackage {
let originating_main_cryptde = originating_node.main_cryptde_null().unwrap();
IncipientCoresPackage::new(
originating_node.main_cryptde_null().unwrap(),
originating_main_cryptde,
Route::round_trip(
RouteSegment::new(
vec![
Expand All @@ -371,9 +358,8 @@ fn create_request_icp(
],
Component::ProxyServer,
),
originating_node.main_cryptde_null().unwrap(),
originating_main_cryptde,
originating_node.consuming_wallet(),
return_route_id,
Some(chain.rec().contract),
)
.unwrap(),
Expand All @@ -382,7 +368,7 @@ fn create_request_icp(
&ClientRequestPayload_0v1 {
stream_key,
sequenced_packet: SequencedPacket::new(Vec::from(HTTP_REQUEST), index, false),
target_hostname: Some(format!("{}", server.local_addr().ip())),
target_hostname: format!("{}", server.local_addr().ip()),
target_port: server.local_addr().port(),
protocol: ProxyProtocol::HTTP,
originator_public_key: originating_node.main_public_key().clone(),
Expand All @@ -400,8 +386,9 @@ fn create_meaningless_icp(
let socket_addr = SocketAddr::from_str("3.2.1.0:7654").unwrap();
let stream_key =
StreamKey::make_meaningful_stream_key("Chancellor on brink of second bailout for banks");
let main_cryptde = originating_node.main_cryptde_null().unwrap();
IncipientCoresPackage::new(
originating_node.main_cryptde_null().unwrap(),
main_cryptde,
Route::round_trip(
RouteSegment::new(
vec![
Expand All @@ -417,9 +404,8 @@ fn create_meaningless_icp(
],
Component::ProxyServer,
),
originating_node.main_cryptde_null().unwrap(),
main_cryptde,
originating_node.consuming_wallet(),
1357,
Some(TEST_DEFAULT_MULTINODE_CHAIN.rec().contract),
)
.unwrap(),
Expand All @@ -428,7 +414,7 @@ fn create_meaningless_icp(
&ClientRequestPayload_0v1 {
stream_key,
sequenced_packet: SequencedPacket::new(Vec::from(HTTP_REQUEST), 0, false),
target_hostname: Some(format!("nowhere.com")),
target_hostname: "nowhere.com".to_string(),
target_port: socket_addr.port(),
protocol: ProxyProtocol::HTTP,
originator_public_key: originating_node.main_public_key().clone(),
Expand All @@ -443,8 +429,9 @@ fn create_server_drop_report(
exit_node: &MASQMockNode,
originating_node: &MASQRealNode,
stream_key: StreamKey,
return_route_id: u32,
) -> IncipientCoresPackage {
let exit_main_cryptde = exit_node.main_cryptde_null().unwrap();
let originating_main_cryptde = originating_node.main_cryptde_null().unwrap();
let mut route = Route::round_trip(
RouteSegment::new(
vec![
Expand All @@ -460,15 +447,12 @@ fn create_server_drop_report(
],
Component::ProxyServer,
),
originating_node.main_cryptde_null().unwrap(),
originating_main_cryptde,
originating_node.consuming_wallet(),
return_route_id,
Some(TEST_DEFAULT_MULTINODE_CHAIN.rec().contract),
)
.unwrap();
route
.shift(originating_node.main_cryptde_null().unwrap())
.unwrap();
route.shift(originating_main_cryptde).unwrap();
let payload = MessageType::ClientResponse(VersionedData::new(
&node_lib::sub_lib::migrations::client_response_payload::MIGRATIONS,
&ClientResponsePayload_0v1 {
Expand All @@ -478,7 +462,7 @@ fn create_server_drop_report(
));

IncipientCoresPackage::new(
exit_node.main_cryptde_null().unwrap(),
exit_main_cryptde,
route,
payload,
originating_node.alias_public_key(),
Expand All @@ -490,8 +474,8 @@ fn create_client_drop_report(
originating_node: &MASQMockNode,
exit_node: &MASQRealNode,
stream_key: StreamKey,
return_route_id: u32,
) -> IncipientCoresPackage {
let originating_main_cryptde = originating_node.main_cryptde_null().unwrap();
let route = Route::round_trip(
RouteSegment::new(
vec![
Expand All @@ -507,9 +491,8 @@ fn create_client_drop_report(
],
Component::ProxyServer,
),
originating_node.main_cryptde_null().unwrap(),
originating_main_cryptde,
originating_node.consuming_wallet(),
return_route_id,
Some(TEST_DEFAULT_MULTINODE_CHAIN.rec().contract),
)
.unwrap();
Expand All @@ -518,15 +501,15 @@ fn create_client_drop_report(
&ClientRequestPayload_0v1 {
stream_key,
sequenced_packet: SequencedPacket::new(vec![], 1, true),
target_hostname: Some(String::from("doesnt.matter.com")),
target_port: 80,
target_hostname: String::from("doesnt.matter.com"),
target_port: HTTP_PORT,
protocol: ProxyProtocol::HTTP,
originator_public_key: originating_node.main_public_key().clone(),
},
));

IncipientCoresPackage::new(
originating_node.main_cryptde_null().unwrap(),
originating_main_cryptde,
route,
payload,
exit_node.main_public_key(),
Expand Down
11 changes: 7 additions & 4 deletions multinode_integration_tests/tests/self_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use node_lib::sub_lib::dispatcher::Component;
use node_lib::sub_lib::hopper::IncipientCoresPackage;
use node_lib::sub_lib::route::Route;
use node_lib::sub_lib::route::RouteSegment;
use node_lib::sub_lib::stream_key::StreamKey;
use node_lib::test_utils::{make_meaningless_message_type, make_paying_wallet};
use std::collections::HashSet;
use std::io::ErrorKind;
Expand Down Expand Up @@ -68,6 +69,7 @@ fn server_relays_cores_package() {
let masquerader = JsonMasquerader::new();
let server = MASQCoresServer::new(cluster.chain);
let cryptde = server.main_cryptde();
let stream_key = StreamKey::make_meaningless_stream_key();
let mut client = MASQCoresClient::new(server.local_addr(), cryptde);
let mut route = Route::one_way(
RouteSegment::new(
Expand All @@ -82,7 +84,7 @@ fn server_relays_cores_package() {
let incipient = IncipientCoresPackage::new(
cryptde,
route.clone(),
make_meaningless_message_type(),
make_meaningless_message_type(stream_key),
&cryptde.public_key(),
)
.unwrap();
Expand All @@ -99,7 +101,7 @@ fn server_relays_cores_package() {

route.shift(cryptde).unwrap();
assert_eq!(expired.remaining_route, route);
assert_eq!(expired.payload, make_meaningless_message_type());
assert_eq!(expired.payload, make_meaningless_message_type(stream_key));
}

#[test]
Expand All @@ -111,6 +113,7 @@ fn one_mock_node_talks_to_another() {
let mock_node_1 = cluster.get_mock_node_by_name("mock_node_1").unwrap();
let mock_node_2 = cluster.get_mock_node_by_name("mock_node_2").unwrap();
let cryptde = CryptDENull::new(TEST_DEFAULT_CHAIN);
let stream_key = StreamKey::make_meaningless_stream_key();
let route = Route::one_way(
RouteSegment::new(
vec![
Expand All @@ -127,7 +130,7 @@ fn one_mock_node_talks_to_another() {
let incipient_cores_package = IncipientCoresPackage::new(
&cryptde,
route,
make_meaningless_message_type(),
make_meaningless_message_type(stream_key),
&mock_node_2.main_public_key(),
)
.unwrap();
Expand Down Expand Up @@ -156,7 +159,7 @@ fn one_mock_node_talks_to_another() {
assert_eq!(package_to, mock_node_2.socket_addr(PortSelector::First));
assert_eq!(
expired_cores_package.payload,
make_meaningless_message_type()
make_meaningless_message_type(stream_key)
);
}

Expand Down
16 changes: 8 additions & 8 deletions node/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,12 @@ mod tests {
let recording_arc = proxy_server.get_recording();
let awaiter = proxy_server.get_awaiter();
let client_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap();
let reception_port = Some(8080);
let reception_port_opt = Some(8080);
let data: Vec<u8> = vec![9, 10, 11];
let ibcd_in = InboundClientData {
timestamp: SystemTime::now(),
client_addr,
reception_port,
reception_port_opt,
sequence_number: Some(0),
last_data: false,
is_clandestine: false,
Expand Down Expand Up @@ -310,12 +310,12 @@ mod tests {
let subject_addr = subject.start();
let (hopper, hopper_awaiter, hopper_recording_arc) = make_recorder();
let client_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap();
let reception_port = Some(8080);
let reception_port_opt = Some(8080);
let data: Vec<u8> = vec![9, 10, 11];
let ibcd_in = InboundClientData {
timestamp: SystemTime::now(),
client_addr,
reception_port,
reception_port_opt,
last_data: false,
is_clandestine: true,
sequence_number: None,
Expand Down Expand Up @@ -350,12 +350,12 @@ mod tests {
let subject_addr = subject.start();
let subject_ibcd = subject_addr.recipient::<InboundClientData>();
let client_addr = SocketAddr::from_str("1.2.3.4:8765").unwrap();
let reception_port = Some(1234);
let reception_port_opt = Some(1234);
let data: Vec<u8> = vec![9, 10, 11];
let ibcd_in = InboundClientData {
timestamp: SystemTime::now(),
client_addr,
reception_port,
reception_port_opt,
last_data: false,
is_clandestine: false,
sequence_number: Some(0),
Expand All @@ -376,12 +376,12 @@ mod tests {
let subject_addr = subject.start();
let subject_ibcd = subject_addr.recipient::<InboundClientData>();
let client_addr = SocketAddr::from_str("1.2.3.4:8765").unwrap();
let reception_port = Some(1234);
let reception_port_opt = Some(1234);
let data: Vec<u8> = vec![9, 10, 11];
let ibcd_in = InboundClientData {
timestamp: SystemTime::now(),
client_addr,
reception_port,
reception_port_opt,
last_data: false,
is_clandestine: true,
sequence_number: None,
Expand Down
Loading