Skip to content

Commit

Permalink
release setup
Browse files Browse the repository at this point in the history
  • Loading branch information
huitseeker committed Nov 29, 2024
1 parent 8a9206b commit 2b0727c
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
[package]
name = "tokio-prometheus-channel-backpressure"
name = "tokio-prometheus-metered-channel"
version = "0.1.0"
edition = "2021"
description = "Metered tokio channels with Prometheus metrics integration"
authors = ["Francois Garillot <[email protected]>"]
license = "Apache-2.0"
repository = "https://github.com/yourusername/tokio-prometheus-channel-backpressure"
documentation = "https://docs.rs/tokio-prometheus-channel-backpressure"
repository = "https://github.com/huitseeker/tokio-prometheus-metered-channel"
documentation = "https://docs.rs/tokio-prometheus-metered-channel"
readme = "README.md"
keywords = ["tokio", "prometheus", "metrics", "channel", "async"]
categories = ["asynchronous", "development-tools::monitoring", "concurrency"]
categories = ["asynchronous", "development-tools", "concurrency"]
rust-version = "1.65.0"

[dependencies]
Expand All @@ -21,3 +21,8 @@ pin-project = "1.1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-futures = "0.2"

[package.metadata.release]
sign-tag = true
push-remote = "origin"
allow-branch = ["main"]
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
# Tokio Prometheus metered channel

Add the following to your `Cargo.toml`:
```
tokio-prometheus-metered=channel = "0.1.0"
```

# Metered Bounded Channel

The metered bounded channel is a specialized threading utility designed to handle communication between threads with an upper limit on capacity while tracking the channel's occupancy through Prometheus metrics.

## Functionality
- **Bounded Capacity**: This channel ensures that no more than a predefined number of messages are held in the channel at any given time.
- **Backpressure Handling**: When the channel reaches its capacity, any additional attempts to send messages will be blocked, allowing for backpressure management until the channel has available space.
- **Prometheus Integration**: The current occupancy of the channel is exposed as a Prometheus metric, enabling real-time monitoring of how "full" the channel is.
- **Prometheus Integration**: The current occupancy of the channel is exposed as a Prometheus metric, enabling real-time monitoring of how "full" the channel is.
4 changes: 1 addition & 3 deletions release.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
sign-commit = true
sign-tag = true
pre-release-commit-message = "chore: Release version {{version}}"
post-release-commit-message = "chore: Development version {{version}}"
tag-message = "Release version {{version}}"
push = true
publish = true

pre-release-replacements = [
{file="README.md", search="tokio-prometheus-channel-backpressure = .*", replace="tokio-prometheus-channel-backpressure = \"{{version}}\""},
{file="README.md", search="tokio-prometheus-metered=channel = .*", replace="tokio-prometheus-metered-channel = \"{{version}}\""},
{file="CHANGELOG.md", search="Unreleased", replace="{{version}}"},
]
7 changes: 6 additions & 1 deletion src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::{debug, error, instrument};
/// # Examples
///
/// ```rust
/// use tokio_prometheus_channel_backpressure::{broadcast_channel, ChannelMetrics};
/// use tokio_prometheus_metered_channel::{broadcast_channel, ChannelMetrics};
/// use prometheus::Registry;
///
/// #[tokio::main]
Expand Down Expand Up @@ -130,4 +130,9 @@ impl<T: Clone> Receiver<T> {
Err(e) => Err(e),
}
}

/// Get the total messages counter if enabled
pub fn total_messages(&self) -> Option<&prometheus::IntCounter> {
self.total_messages.as_ref()
}
}
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::mpsc::error::{SendError as TokioSendError, TrySendError};
/// # Examples
///
/// ```rust
/// use tokio_prometheus_channel_backpressure::{mpsc_channel, ChannelMetrics, SendError};
/// use tokio_prometheus_metered_channel::{mpsc_channel, ChannelMetrics, SendError};
/// use prometheus::Registry;
///
/// #[tokio::main]
Expand Down
14 changes: 8 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! # Example
//!
//! ```rust
//! use tokio_prometheus_channel_backpressure::{mpsc_channel, ChannelMetrics};
//! use tokio_prometheus_metered_channel::{mpsc_channel, ChannelMetrics};
//! use prometheus::Registry;
//!
//! #[tokio::main]
Expand Down Expand Up @@ -50,11 +50,6 @@
#![warn(missing_docs)]

/// Broadcast channel implementation with prometheus metrics integration.
///
/// This channel type allows sending messages to multiple receivers.
/// Each receiver gets a copy of each message sent after they subscribed.
pub mod broadcast;

mod channel;
mod error;
Expand All @@ -66,6 +61,13 @@ mod metrics;
/// New receivers see the latest value and all subsequent changes.
pub mod watch;


/// Broadcast channel implementation with prometheus metrics integration.
///
/// This channel type allows sending messages to multiple receivers.
/// Each receiver gets a copy of each message sent after they subscribed.
pub mod broadcast;

#[cfg(test)]
mod tests;

Expand Down
31 changes: 6 additions & 25 deletions src/tests/broadcast_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::broadcast_channel;
use crate::ChannelMetrics;
use prometheus::Registry;
use std::time::Duration;
use tokio::sync::broadcast::error::{RecvError, TryRecvError};
use tracing_subscriber::fmt::format::FmtSpan;

Expand Down Expand Up @@ -36,33 +35,14 @@ async fn test_broadcast_metrics() {
let registry = Registry::new();
let metrics = ChannelMetrics::new("test_broadcast_metrics", "test broadcast metrics", &registry).unwrap();

let (tx, mut rx) = broadcast_channel(2, metrics);
let (tx, mut rx) = broadcast_channel::<i32>(2, metrics);

tx.send(1).unwrap();
tx.send(2).unwrap();

rx.recv().await.unwrap();
rx.recv().await.unwrap();
assert_eq!(rx.total_messages.as_ref().unwrap().get(), 2);
}

#[tokio::test]
async fn test_broadcast_capacity() {
let registry = Registry::new();
let metrics = ChannelMetrics::new_basic("test_capacity", "test capacity", &registry).unwrap();

let (tx, mut rx) = broadcast_channel(2, metrics);

// Fill channel to capacity
tx.send(1).unwrap();
tx.send(2).unwrap();

// Should overwrite oldest message
tx.send(3).unwrap();

// First message should be dropped
assert!(matches!(rx.recv().await, Ok(2)));
assert!(matches!(rx.recv().await, Ok(3)));
assert_eq!(rx.total_messages().unwrap().get(), 4);
}

#[tokio::test]
Expand All @@ -76,15 +56,15 @@ async fn test_broadcast_lagged() {
tx.send(2).unwrap();
tx.send(3).unwrap(); // This will cause lag for slow receiver

assert!(matches!(rx.recv().await, Err(RecvError::Lagged)));
assert!(matches!(rx.recv().await, Err(RecvError::Lagged(_))));
}

#[tokio::test]
async fn test_broadcast_closed() {
let registry = Registry::new();
let metrics = ChannelMetrics::new_basic("test_closed", "test closed", &registry).unwrap();

let (tx, mut rx) = broadcast_channel(2, metrics);
let (tx, mut rx) = broadcast_channel::<i32>(2, metrics);
drop(tx);

assert!(matches!(rx.recv().await, Err(RecvError::Closed)));
Expand Down Expand Up @@ -126,5 +106,6 @@ async fn test_broadcast_multiple_subscribers() {
assert_eq!(rx.recv().await.unwrap(), 42);
}

assert_eq!(tx.receiver_count(), 5);
// only one active receiver left
assert_eq!(tx.receiver_count(), 1);
}
2 changes: 2 additions & 0 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
mod channel_tests;
mod metrics_tests;
mod broadcast_tests;
mod watch_tests;
10 changes: 5 additions & 5 deletions src/tests/watch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn test_watch_channel() {
let registry = Registry::new();
let metrics = ChannelMetrics::new_basic("test_watch", "test watch channel", &registry).unwrap();

let (tx, mut rx1) = watch_channel::channel(0, metrics);
let (tx, mut rx1) = watch_channel(0, metrics);
let mut rx2 = rx1.clone();

// Update value
Expand All @@ -35,12 +35,12 @@ async fn test_watch_metrics() {
let registry = Registry::new();
let metrics = ChannelMetrics::new("test_watch_metrics", "test watch metrics", &registry).unwrap();

let (tx, mut rx) = watch_channel::channel(0, metrics);
let (tx, rx) = watch_channel(0, metrics);

tx.send(1).unwrap();
tx.send(2).unwrap();

rx.changed().await.unwrap();
rx.changed().await.unwrap();
assert_eq!(rx.total_messages.as_ref().unwrap().get(), 2);
// Directly assert the latest value
assert_eq!(*rx.borrow(), 2);
assert_eq!(rx.total_messages().unwrap().get(), 2);
}
7 changes: 6 additions & 1 deletion src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::{debug, error, instrument};
/// # Examples
///
/// ```rust
/// use tokio_prometheus_channel_backpressure::{watch_channel, ChannelMetrics};
/// use tokio_prometheus_metered_channel::{watch_channel, ChannelMetrics};
/// use prometheus::Registry;
///
/// #[tokio::main]
Expand Down Expand Up @@ -122,4 +122,9 @@ impl<T: Clone> Receiver<T> {
pub fn has_changed(&self) -> bool {
self.inner.has_changed().unwrap_or(false)
}

/// Get the total messages counter if enabled
pub fn total_messages(&self) -> Option<&prometheus::IntCounter> {
self.total_messages.as_ref()
}
}

0 comments on commit 2b0727c

Please sign in to comment.