Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add send/receive url collector stream message and testcode #89

Merged
merged 1 commit into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Versioning](https://semver.org/spec/v2.0.0.html).
### Added

- Add `NodeType` and `RequestUrlCollectorStream` to url-collector.
- Add `send_url_collector_stream_start_message()`, `receive_url_collector_stream_start_message()`,
`receive_url_collector_data()` and testcode.

## [0.15.2] - 2023-11-16

Expand Down
133 changes: 131 additions & 2 deletions src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ pub async fn send_hog_stream_start_message(
Ok(())
}

/// Sends the url collector stream start message from giganto's publish module.
///
/// # Errors
///
/// * `PublishError::WriteError`: if the url collector's stream start message could not be written
pub async fn send_url_collector_stream_start_message(
send: &mut SendStream,
start_msg: RequestStreamRecord,
) -> Result<(), PublishError> {
let record: u32 = start_msg.into();
send_bytes(send, &record.to_le_bytes()).await?;
Ok(())
}

/// Sends the range data request to giganto's publish module.
///
/// # Errors
Expand Down Expand Up @@ -244,6 +258,22 @@ pub async fn receive_crusher_stream_start_message(
Ok(start_msg)
}

/// Receives the url collector stream start message sent from giganto's publish module.
///
/// # Errors
///
/// * `PublishError::ReadError`: if the url collector's stream start data could not be read
/// * `PublishError::InvalidMessageType`: if the url collector's stream start data could not be converted to valid type
pub async fn receive_url_collector_stream_start_message(
recv: &mut RecvStream,
) -> Result<RequestStreamRecord, PublishError> {
let mut record_buf = [0; mem::size_of::<u32>()];
recv_bytes(recv, &mut record_buf).await?;
let start_msg = RequestStreamRecord::try_from(u32::from_le_bytes(record_buf))
.map_err(|_| PublishError::InvalidMessageType)?;
Ok(start_msg)
}

/// Receives the record data. (timestamp / record structure)
///
/// # Errors
Expand Down Expand Up @@ -284,6 +314,25 @@ pub async fn receive_hog_data(recv: &mut RecvStream) -> Result<Vec<u8>, PublishE
Ok(result_buf)
}

/// Receives the timestamp/record data from giganto's publish module.
///
/// # Errors
///
/// * `PublishError::ReadError`: if the stream record data could not be read
pub async fn receive_url_collector_data(recv: &mut RecvStream) -> Result<Vec<u8>, PublishError> {
let mut ts_buf = [0; std::mem::size_of::<u64>()];
frame::recv_bytes(recv, &mut ts_buf).await?;

let mut record_buf = Vec::new();
frame::recv_raw(recv, &mut record_buf).await?;

let mut result_buf: Vec<u8> = Vec::new();
result_buf.extend_from_slice(&ts_buf);
result_buf.extend_from_slice(&record_buf);

Ok(result_buf)
}

/// Receives the range data request sent to giganto's publish module.
///
/// # Errors
Expand Down Expand Up @@ -446,7 +495,7 @@ pub async fn recv_ack_response(recv: &mut RecvStream) -> Result<(), PublishError
#[cfg(test)]
mod tests {
use crate::frame;
use crate::ingest::network::Conn;
use crate::ingest::network::{Conn, Http};
use crate::publish::{recv_ack_response, PublishError};
use crate::test::{channel, TOKEN};
use std::net::IpAddr;
Expand All @@ -457,7 +506,7 @@ mod tests {
use crate::frame::send_bytes;
use crate::publish::{
range::ResponseRangeData,
stream::{RequestCrusherStream, RequestHogStream},
stream::{RequestCrusherStream, RequestHogStream, RequestUrlCollectorStream},
Copy link
Contributor

@sophie-cluml sophie-cluml Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice tc 👍

PcapFilter,
};

Expand Down Expand Up @@ -511,6 +560,25 @@ mod tests {
assert_eq!(req_record, super::stream::RequestStreamRecord::Conn);
assert_eq!(req_data, bincode::serialize(&crusher_req).unwrap());

// send/recv url collector stream request
let url_collector_req = RequestUrlCollectorStream { start: 0 };
super::send_stream_request(
&mut channel.client.send,
super::stream::RequestStreamRecord::Http,
super::stream::NodeType::UrlCollector,
url_collector_req.clone(),
)
.await
.unwrap();

let (node_type, req_record, req_data) =
super::receive_stream_request(&mut channel.server.recv)
.await
.unwrap();
assert_eq!(node_type, super::stream::NodeType::UrlCollector);
assert_eq!(req_record, super::stream::RequestStreamRecord::Http);
assert_eq!(req_data, bincode::serialize(&url_collector_req).unwrap());

// send/recv hog stream start message
super::send_hog_stream_start_message(
&mut channel.server.send,
Expand All @@ -533,6 +601,19 @@ mod tests {
.unwrap();
assert_eq!(policy_id, "1".parse::<u32>().unwrap());

// send/recv url collector stream start message
super::send_url_collector_stream_start_message(
&mut channel.server.send,
super::stream::RequestStreamRecord::Http,
)
.await
.unwrap();
let req_record =
super::receive_url_collector_stream_start_message(&mut channel.client.recv)
.await
.unwrap();
assert_eq!(req_record, super::stream::RequestStreamRecord::Http);

// send/recv stream data with hog (hog's stream data use send_bytes function)
let conn = Conn {
orig_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
Expand Down Expand Up @@ -584,6 +665,54 @@ mod tests {
assert_eq!(timestamp, 7777);
assert_eq!(data, bincode::serialize(&conn).unwrap());

// send/recv stream data with url collector (url collector's stream data use send_bytes function)
let http = Http {
orig_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
orig_port: 46378,
resp_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
resp_port: 80,
proto: 6,
last_time: 1000,
method: "POST".to_string(),
host: "einsis".to_string(),
uri: "/einsis.gif".to_string(),
referrer: "einsis.com".to_string(),
version: String::new(),
user_agent: "giganto".to_string(),
request_len: 0,
response_len: 0,
status_code: 200,
status_msg: String::new(),
username: String::new(),
password: String::new(),
cookie: String::new(),
content_encoding: String::new(),
content_type: String::new(),
cache_control: String::new(),
orig_filenames: Vec::new(),
orig_mime_types: Vec::new(),
resp_filenames: Vec::new(),
resp_mime_types: Vec::new(),
post_body: Vec::new(),
};
let raw_event = bincode::serialize(&http).unwrap();
let raw_len = u32::try_from(raw_event.len()).unwrap().to_le_bytes();
let mut send_buf: Vec<u8> = Vec::new();
send_buf.extend_from_slice(&6666_i64.to_le_bytes());
send_buf.extend_from_slice(&raw_len);
send_buf.extend_from_slice(&raw_event);
send_bytes(&mut channel.server.send, &mut send_buf)
.await
.unwrap();

let data = super::receive_url_collector_data(&mut channel.client.recv)
.await
.unwrap();
let mut result_buf: Vec<u8> = Vec::new();
result_buf.extend_from_slice(&6666_i64.to_le_bytes());
result_buf.extend_from_slice(&raw_event);
assert_eq!(data, result_buf);

// send/recv range data request
let req_range = super::range::RequestRange {
source: String::from("world"),
Expand Down