Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#7168: Add tokio_test::sink_mock #7190

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,14 @@ A good commit message should describe what changed and why.

1. The first line should:

* be prefixed with the name of the module being changed; usually this is the
same as the M-* label on the PR
* start with an imperative verb
* contain a short description of the change (preferably 50 characters or less,
and no more than 72 characters)
* be entirely in lowercase with the exception of proper nouns, acronyms, and
the words that refer to code, like function/variable names
* start with an imperative verb
* not have a period at the end
* be prefixed with the name of the module being changed; usually this is the
same as the M-* label on the PR

Examples:

Expand Down
3 changes: 2 additions & 1 deletion tokio-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ async-stream = "0.3.3"

bytes = "1.0.0"
futures-core = "0.3.0"
futures-sink = "0.3.0"

[dev-dependencies]
tokio = { version = "1.2.0", path = "../tokio", features = ["full"] }
futures-util = "0.3.0"
futures-util = { version = "0.3.0", features = ["sink"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
1 change: 1 addition & 0 deletions tokio-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! Tokio and Futures based testing utilities

pub mod io;
pub mod sink_mock;
pub mod stream_mock;

mod macros;
Expand Down
290 changes: 290 additions & 0 deletions tokio-test/src/sink_mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
#![cfg(not(loom))]

//! A mock sink implementing [`Sink`].
//!
//! # Overview
//! This module provides a `SinkMock` that can be used to test code that interacts with sinks.
//! It allows you to mock the behavior of a sink and control the items it expects and the waiting
//! intervals required between items.
//!
//! # Usage
//! To use the `SinkMock`, you need to create a builder using [`SinkMockBuilder`].
//! The builder allows you to enqueue actions such as
//! requiring items or requiring a pause between items.
//!
//! # Example
//!
//! ```rust
//! use tokio_test::sink_mock::SinkMockBuilder;
//! use futures_util::SinkExt;
//! use std::time::Duration;
//!
//! async fn test_sink_mock_wait() {
//! let mut sink_mock = SinkMockBuilder::new()
//! .require(1)
//! .require_wait(Duration::from_millis(300))
//! .require(2)
//! .build();
//!
//! assert_eq!(sink_mock.send(1).await, Ok(()));
//! tokio::time::sleep(Duration::from_millis(300)).await;
//! assert_eq!(sink_mock.send(2).await, Ok(()));
//! }
//! ```

use std::{
collections::VecDeque,
pin::Pin,
task::Poll,
time::{Duration, Instant},
};

use futures_sink::Sink;

#[derive(Debug, Clone, PartialEq, Eq)]
enum Action<T, E> {
Consume(T),
ConsumeWithError(T, E),
Pause(Duration),
}

/// A builder for [`SinkMock`].
#[derive(Debug, Clone)]
pub struct SinkMockBuilder<T, E> {
actions: VecDeque<Action<T, E>>,
}

impl<T: Unpin, E: Unpin> SinkMockBuilder<T, E> {
/// Create a new empty [`SinkMockBuilder`].
pub fn new() -> Self {
SinkMockBuilder::default()
}

/// Queue an item to be required by the [`Sink`].
pub fn require(mut self, value: T) -> Self {
self.actions.push_back(Action::Consume(value));
self
}

/// Queue an item to be required by the [`Sink`],
/// which shall produce the given error when polled.
pub fn require_with_error(mut self, value: T, error: E) -> Self {
let action = Action::ConsumeWithError(value, error);
self.actions.push_back(action);
self
}

/// Queue the sink to require waiting for a while before receiving another value.
pub fn require_wait(mut self, duration: Duration) -> Self {
self.actions.push_back(Action::Pause(duration));
self
}

/// Build the [`SinkMock`].
pub fn build(self) -> SinkMock<T, E> {
SinkMock {
actions: self.actions,
last_action: Instant::now(),
}
}
}

impl<T: Unpin, E: Unpin> Default for SinkMockBuilder<T, E> {
fn default() -> Self {
SinkMockBuilder {
actions: VecDeque::new(),
}
}
}

/// A mock sink implementing [`Sink`].
///
/// See [`SinkMockBuilder`] for more information.
#[derive(Debug)]
pub struct SinkMock<T, E> {
actions: VecDeque<Action<T, E>>,
last_action: Instant,
}

impl<T: Unpin + Eq + std::fmt::Debug, E: Unpin> Sink<T> for SinkMock<T, E> {
type Error = E;

fn poll_ready(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

// Requires `Eq + std::fmt::Debug` due to usage of `assert_eq!`.
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
loop {
let Some(action) = self.actions.pop_front() else {
panic!("Sink does not expect any items");
};
match action {
Action::Pause(duration) => {
let now = Instant::now();
if (self.last_action + duration) <= now {
self.last_action = now;
continue;
} else {
panic!("Sink received item too early");
}
}
Action::Consume(queued_item) => {
assert_eq!(item, queued_item);
self.last_action = Instant::now();
break Ok(());
}
Action::ConsumeWithError(queued_item, queued_error) => {
assert_eq!(item, queued_item);
self.last_action = Instant::now();
break Err(queued_error);
}
}
}
}

fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn poll_close(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.try_close();
Poll::Ready(Ok(()))
}
}

impl<T, E> Drop for SinkMock<T, E> {
fn drop(&mut self) {
// Avoid double panicking to make debugging easier.
if std::thread::panicking() {
return;
}
self.try_close();
}
}

impl<T, E> SinkMock<T, E> {
fn try_close(&mut self) {
loop {
let Some(action) = self.actions.pop_front() else {
break;
};
match action {
Action::Pause(duration) => {
let now = Instant::now();
if (self.last_action + duration) <= now {
self.last_action += duration;
continue;
} else {
panic!("Sink closed too early");
}
}
Action::Consume(..) | Action::ConsumeWithError(..) => {
panic!("Sink expects more items")
}
}
}
}
}

#[cfg(test)]
mod test {

use crate::sink_mock::{SinkMock, SinkMockBuilder};
use futures_util::SinkExt;
use std::time::Duration;

#[test]
#[should_panic(expected = "Sink expects more items")]
fn dropping_nonempty_sink_panics() {
let sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new().require(1).build();
drop(sink_mock);
}

#[tokio::test]
#[should_panic(expected = "Sink does not expect any items")]
async fn empty_sink_panics_on_send() {
let mut sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new().build();
let _ = sink_mock.send(1).await;
}

#[tokio::test]
#[should_panic(expected = "Sink received item too early")]
async fn should_reject_values_when_sent_too_early() {
let mut sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new()
.require_wait(Duration::from_secs(1))
.build();

sink_mock.send(1).await.unwrap();
}

#[test]
#[should_panic(expected = "Sink closed too early")]
fn paused_sink_panics_on_drop() {
let sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new()
.require_wait(Duration::from_secs(1))
.build();

drop(sink_mock);
}

#[tokio::test]
#[should_panic(expected = "Sink closed too early")]
async fn paused_sink_panics_on_close() {
let mut sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new()
.require_wait(Duration::from_secs(1))
.build();

sink_mock.close().await.unwrap();
}

#[tokio::test]
async fn should_yield_error() {
let mut sink_mock = SinkMockBuilder::new()
.require_with_error(1, "oh no")
.require_with_error(2, "well...")
.require_wait(Duration::from_millis(500))
.require_with_error(3, "ok.")
.build();

assert_eq!(sink_mock.send(1).await.unwrap_err(), "oh no");
assert_eq!(sink_mock.send(2).await.unwrap_err(), "well...");
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(sink_mock.send(3).await.unwrap_err(), "ok.");
}

#[tokio::test]
async fn should_sum_pause_durations() {
let mut sink_mock = SinkMockBuilder::<i32, ()>::new()
.require_wait(Duration::from_millis(1))
.require_wait(Duration::from_millis(1))
.require_wait(Duration::from_millis(1))
.build();

tokio::time::sleep(Duration::from_millis(3)).await;

let _ = sink_mock.close().await;
}

#[tokio::test]
async fn should_require_value_after_waiting() {
let mut sink_mock: SinkMock<i32, ()> = SinkMockBuilder::new()
.require(1)
.require_wait(Duration::from_millis(300))
.require(3)
.build();

sink_mock.send(1).await.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
sink_mock.send(3).await.unwrap();
}
}
10 changes: 1 addition & 9 deletions tokio-test/src/stream_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! A mock stream implementing [`Stream`].
//!
//! # Overview
//! This crate provides a `StreamMock` that can be used to test code that interacts with streams.
//! This module provides a `StreamMock` that can be used to test code that interacts with streams.
//! It allows you to mock the behavior of a stream and control the items it yields and the waiting
//! intervals between items.
//!
Expand Down Expand Up @@ -67,14 +67,6 @@ impl<T: Unpin> StreamMockBuilder<T> {
self
}

// Queue an item to be consumed by the sink,
// commented out until Sink is implemented.
//
// pub fn consume(mut self, value: T) -> Self {
// self.actions.push_back(Action::Consume(value));
// self
// }

/// Queue the stream to wait for a duration
pub fn wait(mut self, duration: Duration) -> Self {
self.actions.push_back(Action::Wait(duration));
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ impl<T> Receiver<T> {
/// Returns true if there aren't any messages in the channel that the [`Receiver`]
/// has yet to receive.
///
/// [`Receiver]: create::sync::broadcast::Receiver
/// [`Receiver]: crate::sync::broadcast::Receiver
///
/// # Examples
///
Expand Down
Loading