Skip to content

Commit

Permalink
More tests (#6)
Browse files Browse the repository at this point in the history
* Moved get_shards to kinesis::helpers

* Moved console tests to console/test.rs

* Moved console tests to console/test.rs

* Writer W

* exit_after_termination
  • Loading branch information
gr211 authored May 8, 2023
1 parent b998c2e commit a887ad3
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 45 deletions.
56 changes: 26 additions & 30 deletions src/console.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::kinesis::models::*;
use chrono::*;
use std::io::{self, BufWriter, Error, Stdout, Write};
use std::io::{self, BufWriter, Error, Write};
use std::rc::Rc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
Expand All @@ -12,6 +12,7 @@ pub struct Console {
print_shardid: bool,
print_timestamp: bool,
print_delimiter: bool,
exit_after_termination: bool,
rx_records: Receiver<Result<ShardProcessorADT, PanicError>>,
tx_records: Sender<Result<ShardProcessorADT, PanicError>>,
}
Expand All @@ -32,16 +33,25 @@ impl Console {
print_shardid,
print_timestamp,
print_delimiter,
exit_after_termination: true,
rx_records,
tx_records,
}
}

pub async fn run(&mut self) -> io::Result<()> {
let count = Rc::new(Mutex::new(0));

let stdout = io::stdout(); // get the global stdout entity
let mut handle: BufWriter<Stdout> = io::BufWriter::with_capacity(CONSOLE_BUF_SIZE, stdout);
let mut handle = io::BufWriter::with_capacity(CONSOLE_BUF_SIZE, stdout);

self.run_inner(&mut handle).await
}

pub async fn run_inner<W>(&mut self, handle: &mut BufWriter<W>) -> io::Result<()>
where
W: std::io::Write,
{
self.delimiter(handle).unwrap();
let count = Rc::new(Mutex::new(0));

self.handle_termination();

Expand Down Expand Up @@ -77,7 +87,7 @@ impl Console {
data.iter().for_each(|data| {
writeln!(handle, "{}", data).unwrap();
});
self.delimiter(&mut handle)?
self.delimiter(handle)?
};
}
None => {
Expand All @@ -87,7 +97,7 @@ impl Console {
data.iter().for_each(|data| {
writeln!(handle, "{}", data).unwrap();
});
self.delimiter(&mut handle)?
self.delimiter(handle)?
}
}
}
Expand All @@ -99,7 +109,10 @@ impl Console {
writeln!(handle, "{}", self.format_nb_messages(messages_processed))?;
handle.flush()?;
self.rx_records.close();
std::process::exit(0);

if self.exit_after_termination {
std::process::exit(0)
}
}
},
Err(e) => {
Expand All @@ -112,6 +125,7 @@ impl Console {

fn format_nb_messages(&self, messages_processed: u32) -> String {
match messages_processed {
0 => "0 message processed".to_string(),
1 => "1 message processed".to_string(),
_ => format!("{} messages processed", messages_processed),
}
Expand All @@ -128,7 +142,10 @@ impl Console {
.expect("Error setting Ctrl-C handler");
}

fn delimiter(&self, handle: &mut BufWriter<Stdout>) -> Result<(), Error> {
fn delimiter<W>(&self, handle: &mut BufWriter<W>) -> Result<(), Error>
where
W: std::io::Write,
{
if self.print_delimiter {
writeln!(
handle,
Expand Down Expand Up @@ -175,25 +192,4 @@ impl Console {
}

#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::mpsc;

#[test]
fn format_nb_messages_ok() {
let (tx_records, rx_records) = mpsc::channel::<Result<ShardProcessorADT, PanicError>>(1);

let console = Console {
max_messages: None,
print_key: false,
print_shardid: false,
print_timestamp: false,
print_delimiter: false,
rx_records,
tx_records,
};

assert_eq!(console.format_nb_messages(1), "1 message processed");
assert_eq!(console.format_nb_messages(2), "2 messages processed");
}
}
mod tests;
57 changes: 57 additions & 0 deletions src/console/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use super::*;
use crate::kinesis::models::ShardProcessorADT::Termination;
use tokio::sync::mpsc;

#[test]
fn format_nb_messages_ok() {
let (tx_records, rx_records) = mpsc::channel::<Result<ShardProcessorADT, PanicError>>(1);

let console = Console {
max_messages: None,
print_key: false,
print_shardid: false,
print_timestamp: false,
print_delimiter: false,
exit_after_termination: false,
rx_records,
tx_records,
};

assert_eq!(console.format_nb_messages(1), "1 message processed");
assert_eq!(console.format_nb_messages(2), "2 messages processed");
}

#[tokio::test]
async fn www() {
let (tx_records, rx_records) = mpsc::channel::<Result<ShardProcessorADT, PanicError>>(1);

let tx_records_clone = tx_records.clone();

let mut console = Console {
max_messages: None,
print_key: false,
print_shardid: false,
print_timestamp: false,
print_delimiter: false,
exit_after_termination: true,
rx_records,
tx_records,
};

tokio::spawn(async move {
tx_records_clone
.send(Ok(Termination))
.await
.expect("TODO: panic message");
});

let mut handle = BufWriter::new(Vec::new());

console.run_inner(&mut handle).await.unwrap();

handle.flush().unwrap();
let bytes = handle.into_inner().unwrap();
let string = String::from_utf8(bytes).unwrap();

assert_eq!(string, "0 message processed\n");
}
13 changes: 1 addition & 12 deletions src/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::time::{sleep, Duration};

mod helpers;
pub mod helpers;
pub mod models;

pub fn new(
Expand Down Expand Up @@ -233,14 +233,3 @@ where
Ok(())
}
}

pub async fn get_shards(client: &Client, stream: &str) -> Result<Vec<String>, Error> {
let resp = client.list_shards().stream_name(stream).send().await?;

Ok(resp
.shards()
.unwrap()
.iter()
.map(|s| s.shard_id.as_ref().unwrap().clone())
.collect())
}
13 changes: 12 additions & 1 deletion src/kinesis/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput;
use aws_sdk_kinesis::Error;
use aws_sdk_kinesis::{Client, Error};
use chrono::Utc;
use tokio::sync::mpsc::Sender;

Expand Down Expand Up @@ -86,3 +86,14 @@ pub async fn handle_iterator_refresh<T>(
.await
.unwrap();
}

pub async fn get_shards(client: &Client, stream: &str) -> Result<Vec<String>, Error> {
let resp = client.list_shards().stream_name(stream).send().await?;

Ok(resp
.shards()
.unwrap()
.iter()
.map(|s| s.shard_id.as_ref().unwrap().clone())
.collect())
}
3 changes: 1 addition & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use tokio::sync::mpsc;

use crate::cli_helpers::parse_date;
use crate::console::Console;
use kinesis::helpers::get_shards;
use kinesis::models::*;
use kinesis::*;

mod console;
mod iterator;
mod kinesis;
Expand Down

0 comments on commit a887ad3

Please sign in to comment.