Skip to content

Commit

Permalink
Squash cherry-pick of 0.15.2 to 0.15.2-tis.0.0.1 (#116)
Browse files Browse the repository at this point in the history
Conflicts are manually resolved in files:
 CHANGELOG.md
 Cargo.toml
 README.md

Code fix in files:
 src/publish.rs
  • Loading branch information
sophie-cluml authored May 30, 2024
1 parent 9747b03 commit 2be0f4d
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 3 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ file is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and
this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Add `NodeType` and `RequestUrlCollectorStream` to url-collector.
- Add `send_url_collector_stream_start_message()`, `receive_url_collector_stream_start_message()`,
`receive_url_collector_data()` and testcode.

## [0.17.0] - 2024-05-16

### Added
Expand Down Expand Up @@ -241,6 +249,7 @@ Versioning](https://semver.org/spec/v2.0.0.html).

- Move from giganto

[Unreleased]: https://github.com/aicers/giganto-client/compare/0.17.0...0.17.0+tis.0.0.1
[0.17.0]: https://github.com/aicers/giganto-client/compare/0.16.0...0.17.0
[0.16.0]: https://github.com/aicers/giganto-client/compare/0.15.2...0.16.0
[0.15.2]: https://github.com/aicers/giganto-client/compare/0.15.1...0.15.2
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "giganto-client"
version = "0.17.0"
version = "0.17.0+tis.0.1.0-alpha.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
146 changes: 144 additions & 2 deletions src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ pub async fn send_hog_stream_start_message(
Ok(())
}

/// Sends the url collector stream start message from giganto's publish module.
///
/// # Errors
///
/// * `PublishError::WriteError`: if the url collector's stream start message could not be written
pub async fn send_url_collector_stream_start_message(
send: &mut SendStream,
start_msg: RequestStreamRecord,
) -> Result<(), PublishError> {
let record: u32 = start_msg.into();
send_bytes(send, &record.to_le_bytes()).await?;
Ok(())
}

/// Sends the range data request to giganto's publish module.
///
/// # Errors
Expand Down Expand Up @@ -246,6 +260,22 @@ pub async fn receive_crusher_stream_start_message(
Ok(start_msg)
}

/// Receives the url collector stream start message sent from giganto's publish module.
///
/// # Errors
///
/// * `PublishError::ReadError`: if the url collector's stream start data could not be read
/// * `PublishError::InvalidMessageType`: if the url collector's stream start data could not be converted to valid type
pub async fn receive_url_collector_stream_start_message(
recv: &mut RecvStream,
) -> Result<RequestStreamRecord, PublishError> {
let mut record_buf = [0; mem::size_of::<u32>()];
recv_bytes(recv, &mut record_buf).await?;
let start_msg = RequestStreamRecord::try_from(u32::from_le_bytes(record_buf))
.map_err(|_| PublishError::InvalidMessageType)?;
Ok(start_msg)
}

/// Receives the record data. (timestamp / record structure)
///
/// # Errors
Expand Down Expand Up @@ -286,6 +316,29 @@ pub async fn receive_hog_data(recv: &mut RecvStream) -> Result<Vec<u8>, PublishE
Ok(result_buf)
}

/// Receives the timestamp/record data from giganto's publish module.
///
/// # Errors
///
/// * `PublishError::ReadError`: if the stream record data could not be read
pub async fn receive_url_collector_data(recv: &mut RecvStream) -> Result<Vec<u8>, PublishError> {
let mut ts_buf = [0; std::mem::size_of::<u64>()];
frame::recv_bytes(recv, &mut ts_buf).await?;

let mut source_buf = Vec::new();
frame::recv_raw(recv, &mut source_buf).await?;

let mut record_buf = Vec::new();
frame::recv_raw(recv, &mut record_buf).await?;

let mut result_buf: Vec<u8> = Vec::new();
result_buf.extend_from_slice(&ts_buf);
result_buf.extend_from_slice(&source_buf);
result_buf.extend_from_slice(&record_buf);

Ok(result_buf)
}

/// Receives the range data request sent to giganto's publish module.
///
/// # Errors
Expand Down Expand Up @@ -449,7 +502,7 @@ pub async fn recv_ack_response(recv: &mut RecvStream) -> Result<(), PublishError
#[cfg(test)]
mod tests {
use crate::frame;
use crate::ingest::network::Conn;
use crate::ingest::network::{Conn, Http};
use crate::publish::{recv_ack_response, PublishError};
use crate::test::{channel, TOKEN};
use std::net::IpAddr;
Expand All @@ -460,7 +513,7 @@ mod tests {
use crate::frame::send_bytes;
use crate::publish::{
range::ResponseRangeData,
stream::{RequestCrusherStream, RequestHogStream},
stream::{RequestCrusherStream, RequestHogStream, RequestUrlCollectorStream},
PcapFilter,
};

Expand Down Expand Up @@ -514,6 +567,28 @@ mod tests {
assert_eq!(req_record, super::stream::RequestStreamRecord::Conn);
assert_eq!(req_data, bincode::serialize(&crusher_req).unwrap());

// send/recv url collector stream request
let url_collector_req = RequestUrlCollectorStream {
start: 0,
source: Some("hello".to_string()),
};
super::send_stream_request(
&mut channel.client.send,
super::stream::RequestStreamRecord::Http,
super::stream::NodeType::UrlCollector,
url_collector_req.clone(),
)
.await
.unwrap();

let (node_type, req_record, req_data) =
super::receive_stream_request(&mut channel.server.recv)
.await
.unwrap();
assert_eq!(node_type, super::stream::NodeType::UrlCollector);
assert_eq!(req_record, super::stream::RequestStreamRecord::Http);
assert_eq!(req_data, bincode::serialize(&url_collector_req).unwrap());

// send/recv hog stream start message
super::send_hog_stream_start_message(
&mut channel.server.send,
Expand All @@ -536,6 +611,19 @@ mod tests {
.unwrap();
assert_eq!(policy_id, "1".parse::<u32>().unwrap());

// send/recv url collector stream start message
super::send_url_collector_stream_start_message(
&mut channel.server.send,
super::stream::RequestStreamRecord::Http,
)
.await
.unwrap();
let req_record =
super::receive_url_collector_stream_start_message(&mut channel.client.recv)
.await
.unwrap();
assert_eq!(req_record, super::stream::RequestStreamRecord::Http);

// send/recv stream data with hog (hog's stream data use send_bytes function)
let conn = Conn {
orig_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
Expand Down Expand Up @@ -588,6 +676,60 @@ mod tests {
assert_eq!(timestamp, 7777);
assert_eq!(data, bincode::serialize(&conn).unwrap());

// send/recv stream data with url collector (url collector's stream data use send_bytes function)
let http = Http {
orig_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
orig_port: 46378,
resp_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
resp_port: 80,
proto: 6,
last_time: 1000,
method: "POST".to_string(),
host: "einsis".to_string(),
uri: "/einsis.gif".to_string(),
referrer: "einsis.com".to_string(),
version: String::new(),
user_agent: "giganto".to_string(),
request_len: 0,
response_len: 0,
status_code: 200,
status_msg: String::new(),
username: String::new(),
password: String::new(),
cookie: String::new(),
content_encoding: String::new(),
content_type: String::new(),
cache_control: String::new(),
orig_filenames: Vec::new(),
orig_mime_types: Vec::new(),
resp_filenames: Vec::new(),
resp_mime_types: Vec::new(),
post_body: Vec::new(),
state: "OK".to_string(),
};
let raw_event = bincode::serialize(&http).unwrap();
let source = bincode::serialize(&"hello").unwrap();
let raw_len = u32::try_from(raw_event.len()).unwrap().to_le_bytes();
let source_len = u32::try_from(source.len()).unwrap().to_le_bytes();
let mut send_buf: Vec<u8> = Vec::new();
send_buf.extend_from_slice(&6666_i64.to_le_bytes());
send_buf.extend_from_slice(&source_len);
send_buf.extend_from_slice(&source);
send_buf.extend_from_slice(&raw_len);
send_buf.extend_from_slice(&raw_event);
send_bytes(&mut channel.server.send, &mut send_buf)
.await
.unwrap();

let data = super::receive_url_collector_data(&mut channel.client.recv)
.await
.unwrap();
let mut result_buf: Vec<u8> = Vec::new();
result_buf.extend_from_slice(&6666_i64.to_le_bytes());
result_buf.extend_from_slice(&source);
result_buf.extend_from_slice(&raw_event);
assert_eq!(data, result_buf);

// send/recv range data request
let req_range = super::range::RequestRange {
source: String::from("world"),
Expand Down
9 changes: 9 additions & 0 deletions src/publish/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub const STREAM_REQUEST_ALL_SOURCE: &str = "all";
pub enum NodeType {
Hog = 0,
Crusher = 1,
UrlCollector = 2,
}

impl NodeType {
Expand All @@ -20,6 +21,7 @@ impl NodeType {
match self {
NodeType::Hog => "hog",
NodeType::Crusher => "crusher",
NodeType::UrlCollector => "url_collector",
}
}
}
Expand Down Expand Up @@ -123,3 +125,10 @@ pub struct RequestCrusherStream {
pub dst_ip: Option<IpAddr>,
pub source: Option<String>,
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[allow(clippy::module_name_repetitions)]
pub struct RequestUrlCollectorStream {
pub start: i64,
pub source: Option<String>,
}

0 comments on commit 2be0f4d

Please sign in to comment.