Skip to content

Commit

Permalink
Add send/receive url collector stream message and testcode
Browse files Browse the repository at this point in the history
  • Loading branch information
dayeon5470 authored and sophie-cluml committed Dec 22, 2023
1 parent 72f17ed commit bdabae7
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Versioning](https://semver.org/spec/v2.0.0.html).
### Added

- Add `NodeType` and `RequestUrlCollectorStream` to url-collector.
- Add `send_url_collector_stream_start_message()`, `receive_url_collector_stream_start_message()`,
`receive_url_collector_data()` and testcode.

## [0.15.2] - 2023-11-16

Expand Down
133 changes: 131 additions & 2 deletions src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ pub async fn send_hog_stream_start_message(
Ok(())
}

/// Sends the url collector stream start message from giganto's publish module.
///
/// # Errors
///
/// * `PublishError::WriteError`: if the url collector's stream start message could not be written
pub async fn send_url_collector_stream_start_message(
send: &mut SendStream,
start_msg: RequestStreamRecord,
) -> Result<(), PublishError> {
let record: u32 = start_msg.into();
send_bytes(send, &record.to_le_bytes()).await?;
Ok(())
}

/// Sends the range data request to giganto's publish module.
///
/// # Errors
Expand Down Expand Up @@ -244,6 +258,22 @@ pub async fn receive_crusher_stream_start_message(
Ok(start_msg)
}

/// Receives the url collector stream start message sent from giganto's publish module.
///
/// # Errors
///
/// * `PublishError::ReadError`: if the url collector's stream start data could not be read
/// * `PublishError::InvalidMessageType`: if the url collector's stream start data could not be converted to valid type
pub async fn receive_url_collector_stream_start_message(
recv: &mut RecvStream,
) -> Result<RequestStreamRecord, PublishError> {
let mut record_buf = [0; mem::size_of::<u32>()];
recv_bytes(recv, &mut record_buf).await?;
let start_msg = RequestStreamRecord::try_from(u32::from_le_bytes(record_buf))
.map_err(|_| PublishError::InvalidMessageType)?;
Ok(start_msg)
}

/// Receives the record data. (timestamp / record structure)
///
/// # Errors
Expand Down Expand Up @@ -284,6 +314,25 @@ pub async fn receive_hog_data(recv: &mut RecvStream) -> Result<Vec<u8>, PublishE
Ok(result_buf)
}

/// Receives the timestamp/record data from giganto's publish module.
///
/// # Errors
///
/// * `PublishError::ReadError`: if the stream record data could not be read
pub async fn receive_url_collector_data(recv: &mut RecvStream) -> Result<Vec<u8>, PublishError> {
let mut ts_buf = [0; std::mem::size_of::<u64>()];
frame::recv_bytes(recv, &mut ts_buf).await?;

let mut record_buf = Vec::new();
frame::recv_raw(recv, &mut record_buf).await?;

let mut result_buf: Vec<u8> = Vec::new();
result_buf.extend_from_slice(&ts_buf);
result_buf.extend_from_slice(&record_buf);

Ok(result_buf)
}

/// Receives the range data request sent to giganto's publish module.
///
/// # Errors
Expand Down Expand Up @@ -446,7 +495,7 @@ pub async fn recv_ack_response(recv: &mut RecvStream) -> Result<(), PublishError
#[cfg(test)]
mod tests {
use crate::frame;
use crate::ingest::network::Conn;
use crate::ingest::network::{Conn, Http};
use crate::publish::{recv_ack_response, PublishError};
use crate::test::{channel, TOKEN};
use std::net::IpAddr;
Expand All @@ -457,7 +506,7 @@ mod tests {
use crate::frame::send_bytes;
use crate::publish::{
range::ResponseRangeData,
stream::{RequestCrusherStream, RequestHogStream},
stream::{RequestCrusherStream, RequestHogStream, RequestUrlCollectorStream},
PcapFilter,
};

Expand Down Expand Up @@ -511,6 +560,25 @@ mod tests {
assert_eq!(req_record, super::stream::RequestStreamRecord::Conn);
assert_eq!(req_data, bincode::serialize(&crusher_req).unwrap());

// send/recv url collector stream request
let url_collector_req = RequestUrlCollectorStream { start: 0 };
super::send_stream_request(
&mut channel.client.send,
super::stream::RequestStreamRecord::Http,
super::stream::NodeType::UrlCollector,
url_collector_req.clone(),
)
.await
.unwrap();

let (node_type, req_record, req_data) =
super::receive_stream_request(&mut channel.server.recv)
.await
.unwrap();
assert_eq!(node_type, super::stream::NodeType::UrlCollector);
assert_eq!(req_record, super::stream::RequestStreamRecord::Http);
assert_eq!(req_data, bincode::serialize(&url_collector_req).unwrap());

// send/recv hog stream start message
super::send_hog_stream_start_message(
&mut channel.server.send,
Expand All @@ -533,6 +601,19 @@ mod tests {
.unwrap();
assert_eq!(policy_id, "1".parse::<u32>().unwrap());

// send/recv url collector stream start message
super::send_url_collector_stream_start_message(
&mut channel.server.send,
super::stream::RequestStreamRecord::Http,
)
.await
.unwrap();
let req_record =
super::receive_url_collector_stream_start_message(&mut channel.client.recv)
.await
.unwrap();
assert_eq!(req_record, super::stream::RequestStreamRecord::Http);

// send/recv stream data with hog (hog's stream data use send_bytes function)
let conn = Conn {
orig_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
Expand Down Expand Up @@ -584,6 +665,54 @@ mod tests {
assert_eq!(timestamp, 7777);
assert_eq!(data, bincode::serialize(&conn).unwrap());

// send/recv stream data with url collector (url collector's stream data use send_bytes function)
let http = Http {
orig_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
orig_port: 46378,
resp_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
resp_port: 80,
proto: 6,
last_time: 1000,
method: "POST".to_string(),
host: "einsis".to_string(),
uri: "/einsis.gif".to_string(),
referrer: "einsis.com".to_string(),
version: String::new(),
user_agent: "giganto".to_string(),
request_len: 0,
response_len: 0,
status_code: 200,
status_msg: String::new(),
username: String::new(),
password: String::new(),
cookie: String::new(),
content_encoding: String::new(),
content_type: String::new(),
cache_control: String::new(),
orig_filenames: Vec::new(),
orig_mime_types: Vec::new(),
resp_filenames: Vec::new(),
resp_mime_types: Vec::new(),
post_body: Vec::new(),
};
let raw_event = bincode::serialize(&http).unwrap();
let raw_len = u32::try_from(raw_event.len()).unwrap().to_le_bytes();
let mut send_buf: Vec<u8> = Vec::new();
send_buf.extend_from_slice(&6666_i64.to_le_bytes());
send_buf.extend_from_slice(&raw_len);
send_buf.extend_from_slice(&raw_event);
send_bytes(&mut channel.server.send, &mut send_buf)
.await
.unwrap();

let data = super::receive_url_collector_data(&mut channel.client.recv)
.await
.unwrap();
let mut result_buf: Vec<u8> = Vec::new();
result_buf.extend_from_slice(&6666_i64.to_le_bytes());
result_buf.extend_from_slice(&raw_event);
assert_eq!(data, result_buf);

// send/recv range data request
let req_range = super::range::RequestRange {
source: String::from("world"),
Expand Down

0 comments on commit bdabae7

Please sign in to comment.