Skip to content

Commit

Permalink
More tests for data transfer reports
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeldjeffrey committed Oct 4, 2024
1 parent 750c89a commit 2dd8ef8
Showing 1 changed file with 143 additions and 51 deletions.
194 changes: 143 additions & 51 deletions mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,41 @@ mod tests {

use super::*;

struct MockResolver;
struct MockResolver {
gateway_known: bool,
routing_key_known: bool,
}

impl MockResolver {
fn new() -> Self {
Self {
gateway_known: true,
routing_key_known: true,
}
}

fn unknown_gateway(self) -> Self {
Self {
gateway_known: false,
routing_key_known: self.routing_key_known,
}
}
fn unknown_routing_key(self) -> Self {
Self {
gateway_known: self.gateway_known,
routing_key_known: false,
}
}
}

#[async_trait::async_trait]
impl MobileConfigResolverExt for MockResolver {
async fn is_gateway_known(&self, _public_key: &PublicKeyBinary) -> bool {
true
self.gateway_known
}

async fn is_routing_key_known(&self, _public_key: &PublicKeyBinary) -> bool {
true
self.routing_key_known
}
}

Expand All @@ -150,7 +175,7 @@ mod tests {
let invalid_data_session_report_sink = FileSinkClient::new(tx, "test");

accumulate_sessions(
&MockResolver,
&MockResolver::new(),
&mut txn,
&invalid_data_session_report_sink,
Utc::now(),
Expand All @@ -173,33 +198,18 @@ mod tests {
}

#[sqlx::test]
async fn accumulate_writes_zero_data_event_as_invalid(pool: PgPool) -> anyhow::Result<()> {
async fn accumulate_writes_zero_data_event_as_verified_but_not_for_burning(
pool: PgPool,
) -> anyhow::Result<()> {
let mut txn = pool.begin().await?;

let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let invalid_data_session_report_sink = FileSinkClient::new(tx, "test");

let report = DataTransferSessionIngestReport {
report: DataTransferSessionReq {
data_transfer_usage: DataTransferEvent {
pub_key: vec![0].into(),
upload_bytes: 0,
download_bytes: 0,
radio_access_technology: DataTransferRadioAccessTechnology::Wlan,
event_id: "test".to_string(),
payer: vec![0].into(),
timestamp: Utc::now(),
signature: vec![],
},
rewardable_bytes: 0,
pub_key: vec![0].into(),
signature: vec![],
},
received_timestamp: Utc::now(),
};
let report = make_data_transfer_with_rewardable_bytes(0);

accumulate_sessions(
&MockResolver,
&MockResolver::new(),
&mut txn,
&invalid_data_session_report_sink,
Utc::now(),
Expand All @@ -209,11 +219,14 @@ mod tests {

txn.commit().await?;

// single record written to invalid sink
match rx.try_recv() {
Ok(_) => (),
other => panic!("unexpected: {other:?}"),
}
// single record written to verified sink
rx.assert_not_empty()?;

let sessions: Vec<DataTransferSession> =
sqlx::query_as("SELECT * from data_transfer_sessions")
.fetch_all(&pool)
.await?;
assert!(sessions.is_empty());

Ok(())
}
Expand All @@ -225,27 +238,10 @@ mod tests {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let invalid_data_session_report_sink = FileSinkClient::new(tx, "test");

let report = DataTransferSessionIngestReport {
report: DataTransferSessionReq {
data_transfer_usage: DataTransferEvent {
pub_key: vec![0].into(),
upload_bytes: 1,
download_bytes: 2,
radio_access_technology: DataTransferRadioAccessTechnology::Wlan,
event_id: "test".to_string(),
payer: vec![0].into(),
timestamp: Utc::now(),
signature: vec![],
},
rewardable_bytes: 3,
pub_key: vec![0].into(),
signature: vec![],
},
received_timestamp: Utc::now(),
};
let report = make_data_transfer_with_rewardable_bytes(3);

accumulate_sessions(
&MockResolver,
&MockResolver::new(),
&mut txn,
&invalid_data_session_report_sink,
Utc::now(),
Expand All @@ -255,8 +251,8 @@ mod tests {

txn.commit().await?;

// no records written to invalid sink
rx.assert_is_empty()?;
// record written as verified
rx.assert_not_empty()?;

let sessions: Vec<DataTransferSession> =
sqlx::query_as("SELECT * from data_transfer_sessions")
Expand All @@ -267,15 +263,111 @@ mod tests {
Ok(())
}

#[sqlx::test]
async fn unknown_gateway(pool: PgPool) -> anyhow::Result<()> {
let mut txn = pool.begin().await?;

let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let invalid_data_session_report_sink = FileSinkClient::new(tx, "test");

let report = make_data_transfer_with_rewardable_bytes(3);

accumulate_sessions(
&MockResolver::new().unknown_gateway(),
&mut txn,
&invalid_data_session_report_sink,
Utc::now(),
futures::stream::iter(vec![report]),
)
.await?;

txn.commit().await?;

// record written as verified
rx.assert_not_empty()?;

let sessions: Vec<DataTransferSession> =
sqlx::query_as("SELECT * from data_transfer_sessions")
.fetch_all(&pool)
.await?;
assert!(sessions.is_empty());

Ok(())
}

#[sqlx::test]
fn unknown_routing_key(pool: PgPool) -> anyhow::Result<()> {
let mut txn = pool.begin().await?;

let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let invalid_data_session_report_sink = FileSinkClient::new(tx, "test");

let report = make_data_transfer_with_rewardable_bytes(3);

accumulate_sessions(
&MockResolver::new().unknown_routing_key(),
&mut txn,
&invalid_data_session_report_sink,
Utc::now(),
futures::stream::iter(vec![report]),
)
.await?;

txn.commit().await?;

// record written as verified
rx.assert_not_empty()?;

let sessions: Vec<DataTransferSession> =
sqlx::query_as("SELECT * from data_transfer_sessions")
.fetch_all(&pool)
.await?;
assert!(sessions.is_empty());

Ok(())
}

fn make_data_transfer_with_rewardable_bytes(
rewardable_bytes: u64,
) -> DataTransferSessionIngestReport {
DataTransferSessionIngestReport {
report: DataTransferSessionReq {
data_transfer_usage: DataTransferEvent {
pub_key: vec![0].into(),
upload_bytes: 1,
download_bytes: 2,
radio_access_technology: DataTransferRadioAccessTechnology::Wlan,
event_id: "test".to_string(),
payer: vec![0].into(),
timestamp: Utc::now(),
signature: vec![],
},
rewardable_bytes,
pub_key: vec![0].into(),
signature: vec![],
},
received_timestamp: Utc::now(),
}
}

trait ChannelExt {
fn assert_not_empty(&mut self) -> anyhow::Result<()>;
fn assert_is_empty(&mut self) -> anyhow::Result<()>;
}

impl<T: std::fmt::Debug> ChannelExt for tokio::sync::mpsc::Receiver<T> {
fn assert_not_empty(&mut self) -> anyhow::Result<()> {
match self.try_recv() {
Ok(_) => (),
other => panic!("expected message in channel: {other:?}"),
}
Ok(())
}

fn assert_is_empty(&mut self) -> anyhow::Result<()> {
match self.try_recv() {
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
other => panic!("unexpected message: {other:?}"),
other => panic!("expected channel to be empty: {other:?}"),
}
Ok(())
}
Expand Down

0 comments on commit 2dd8ef8

Please sign in to comment.