Skip to content

Commit

Permalink
Use an unbounded channel
Browse files Browse the repository at this point in the history
  • Loading branch information
imobachgs committed Nov 15, 2023
1 parent 345d74f commit 5295214
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 29 deletions.
25 changes: 10 additions & 15 deletions rust/agama-dbus-server/src/network/dbus/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::network::{

use agama_lib::network::types::SSID;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
use zbus::{
dbus_interface,
Expand Down Expand Up @@ -92,15 +92,15 @@ impl Device {
///
/// It offers an API to query the connections collection.
pub struct Connections {
actions: Arc<Mutex<Sender<Action>>>,
actions: Arc<Mutex<UnboundedSender<Action>>>,
objects: Arc<Mutex<ObjectsRegistry>>,
}

impl Connections {
/// Creates a Connections interface object.
///
/// * `objects`: Objects paths registry.
pub fn new(objects: Arc<Mutex<ObjectsRegistry>>, actions: Sender<Action>) -> Self {
pub fn new(objects: Arc<Mutex<ObjectsRegistry>>, actions: UnboundedSender<Action>) -> Self {
Self {
objects,
actions: Arc::new(Mutex::new(actions)),
Expand Down Expand Up @@ -128,7 +128,6 @@ impl Connections {
let actions = self.actions.lock().await;
actions
.send(Action::AddConnection(id.clone(), ty.try_into()?))
.await
.unwrap();
Ok(())
}
Expand All @@ -151,7 +150,6 @@ impl Connections {
let actions = self.actions.lock().await;
actions
.send(Action::RemoveConnection(id.to_string()))
.await
.unwrap();
Ok(())
}
Expand All @@ -161,7 +159,7 @@ impl Connections {
/// It includes adding, updating and removing connections as needed.
pub async fn apply(&self) -> zbus::fdo::Result<()> {
let actions = self.actions.lock().await;
actions.send(Action::Apply).await.unwrap();
actions.send(Action::Apply).unwrap();
Ok(())
}

Expand All @@ -178,7 +176,7 @@ impl Connections {
///
/// It offers an API to query a connection.
pub struct Connection {
actions: Arc<Mutex<Sender<Action>>>,
actions: Arc<Mutex<UnboundedSender<Action>>>,
connection: Arc<Mutex<NetworkConnection>>,
}

Expand All @@ -187,7 +185,7 @@ impl Connection {
///
/// * `actions`: sending-half of a channel to send actions.
/// * `connection`: connection to expose over D-Bus.
pub fn new(actions: Sender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
pub fn new(actions: UnboundedSender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
Self {
actions: Arc::new(Mutex::new(actions)),
connection,
Expand All @@ -209,7 +207,6 @@ impl Connection {
let actions = self.actions.lock().await;
actions
.send(Action::UpdateConnection(connection.clone()))
.await
.unwrap();
Ok(())
}
Expand Down Expand Up @@ -242,7 +239,7 @@ impl Connection {

/// D-Bus interface for Match settings
pub struct Match {
actions: Arc<Mutex<Sender<Action>>>,
actions: Arc<Mutex<UnboundedSender<Action>>>,
connection: Arc<Mutex<NetworkConnection>>,
}

Expand All @@ -251,7 +248,7 @@ impl Match {
///
/// * `actions`: sending-half of a channel to send actions.
/// * `connection`: connection to expose over D-Bus.
pub fn new(actions: Sender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
pub fn new(actions: UnboundedSender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
Self {
actions: Arc::new(Mutex::new(actions)),
connection,
Expand All @@ -273,7 +270,6 @@ impl Match {
let actions = self.actions.lock().await;
actions
.send(Action::UpdateConnection(connection.clone()))
.await
.unwrap();
Ok(())
}
Expand Down Expand Up @@ -343,7 +339,7 @@ impl Match {

/// D-Bus interface for wireless settings
pub struct Wireless {
actions: Arc<Mutex<Sender<Action>>>,
actions: Arc<Mutex<UnboundedSender<Action>>>,
connection: Arc<Mutex<NetworkConnection>>,
}

Expand All @@ -352,7 +348,7 @@ impl Wireless {
///
/// * `actions`: sending-half of a channel to send actions.
/// * `connection`: connection to expose over D-Bus.
pub fn new(actions: Sender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
pub fn new(actions: UnboundedSender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
Self {
actions: Arc::new(Mutex::new(actions)),
connection,
Expand Down Expand Up @@ -380,7 +376,6 @@ impl Wireless {
let connection = NetworkConnection::Wireless(connection.clone());
actions
.send(Action::UpdateConnection(connection))
.await
.unwrap();
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use crate::network::{
};
use cidr::IpInet;
use std::{net::IpAddr, sync::Arc};
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
use zbus::dbus_interface;

/// D-Bus interface for IPv4 and IPv6 settings
pub struct Ip {
actions: Arc<Mutex<Sender<Action>>>,
actions: Arc<Mutex<UnboundedSender<Action>>>,
connection: Arc<Mutex<NetworkConnection>>,
}

Expand All @@ -25,7 +25,7 @@ impl Ip {
///
/// * `actions`: sending-half of a channel to send actions.
/// * `connection`: connection to expose over D-Bus.
pub fn new(actions: Sender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
pub fn new(actions: UnboundedSender<Action>, connection: Arc<Mutex<NetworkConnection>>) -> Self {
Self {
actions: Arc::new(Mutex::new(actions)),
connection,
Expand All @@ -47,7 +47,6 @@ impl Ip {
let actions = self.actions.lock().await;
actions
.send(Action::UpdateConnection(connection.clone()))
.await
.unwrap();
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions rust/agama-dbus-server/src/network/dbus/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use zbus::zvariant::{ObjectPath, OwnedObjectPath};
use crate::network::{action::Action, dbus::interfaces, model::*};
use log;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedSender;

const CONNECTIONS_PATH: &str = "/org/opensuse/Agama1/Network/connections";
const DEVICES_PATH: &str = "/org/opensuse/Agama1/Network/devices";

/// Handle the objects in the D-Bus tree for the network state
pub struct Tree {
connection: zbus::Connection,
actions: Sender<Action>,
actions: UnboundedSender<Action>,
objects: Arc<Mutex<ObjectsRegistry>>,
}

Expand All @@ -22,7 +22,7 @@ impl Tree {
///
/// * `connection`: D-Bus connection to use.
/// * `actions`: sending-half of a channel to send actions.
pub fn new(connection: zbus::Connection, actions: Sender<Action>) -> Self {
pub fn new(connection: zbus::Connection, actions: UnboundedSender<Action>) -> Self {
Self {
connection,
actions,
Expand Down
15 changes: 8 additions & 7 deletions rust/agama-dbus-server/src/network/system.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use crate::network::{dbus::Tree, model::Connection, Action, Adapter, NetworkState};
use std::error::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};

/// Represents the network system using holding the state and setting up the D-Bus tree.
pub struct NetworkSystem<T: Adapter> {
/// Network state
pub state: NetworkState,
/// Side of the channel to send actions.
actions_tx: Sender<Action>,
actions_rx: Receiver<Action>,
actions_tx: UnboundedSender<Action>,
actions_rx: UnboundedReceiver<Action>,
tree: Tree,
/// Adapter to read/write the network state.
adapter: T,
}

impl<T: Adapter> NetworkSystem<T> {
pub fn new(conn: zbus::Connection, adapter: T) -> Self {
let (actions_tx, actions_rx) = mpsc::channel(100);
let (actions_tx, actions_rx) = mpsc::unbounded_channel();
let tree = Tree::new(conn, actions_tx.clone());
Self {
state: NetworkState::default(),
Expand All @@ -34,9 +34,10 @@ impl<T: Adapter> NetworkSystem<T> {
Ok(())
}

/// Returns a clone of the [Sender](https://doc.rust-lang.org/std/sync/mpsc/struct.Sender.html) to execute
/// [actions](Action).
pub fn actions_tx(&self) -> Sender<Action> {
/// Returns a clone of the
/// [UnboundedSender](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedSender.html)
/// to execute [actions](Action).
pub fn actions_tx(&self) -> UnboundedSender<Action> {
self.actions_tx.clone()
}

Expand Down

0 comments on commit 5295214

Please sign in to comment.