Skip to content

Commit

Permalink
Merge pull request #1008 from zeenix/object-server-refactor
Browse files Browse the repository at this point in the history
♻️  zb: object_server module refactoring
  • Loading branch information
zeenix authored Sep 18, 2024
2 parents 8b7b8c9 + deb22d7 commit 0f25727
Show file tree
Hide file tree
Showing 7 changed files with 564 additions and 521 deletions.
91 changes: 91 additions & 0 deletions zbus/src/object_server/dispatch_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! The object server API.
use event_listener::{Event, EventListener};
use serde::{Deserialize, Serialize};

use zvariant::{Signature, Type};

/// A response wrapper that notifies after the response has been sent.
///
/// Sometimes in [`interface`] method implementations we need to do some other work after the
/// response has been sent off. This wrapper type allows us to do that. Instead of returning your
/// intended response type directly, wrap it in this type and return it from your method. The
/// returned `EventListener` from the `new` method will be notified when the response has been sent.
///
/// A typical use case is sending off signals after the response has been sent. The easiest way to
/// do that is to spawn a task from the method that sends the signal but only after being notified
/// of the response dispatch.
///
/// # Caveats
///
/// The notification indicates that the response has been sent off, not that destination peer has
/// received it. That can only be guaranteed for a peer-to-peer connection.
///
/// [`interface`]: crate::interface
#[derive(Debug)]
pub struct ResponseDispatchNotifier<R> {
response: R,
event: Option<Event>,
}

impl<R> ResponseDispatchNotifier<R> {
/// Create a new `NotifyResponse`.
pub fn new(response: R) -> (Self, EventListener) {
let event = Event::new();
let listener = event.listen();
(
Self {
response,
event: Some(event),
},
listener,
)
}

/// Get the response.
pub fn response(&self) -> &R {
&self.response
}
}

impl<R> Serialize for ResponseDispatchNotifier<R>
where
R: Serialize,
{
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.response.serialize(serializer)
}
}

impl<'de, R> Deserialize<'de> for ResponseDispatchNotifier<R>
where
R: Deserialize<'de>,
{
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Self {
response: R::deserialize(deserializer)?,
event: None,
})
}
}

impl<R> Type for ResponseDispatchNotifier<R>
where
R: Type,
{
const SIGNATURE: &'static Signature = R::SIGNATURE;
}

impl<T> Drop for ResponseDispatchNotifier<T> {
fn drop(&mut self) {
if let Some(event) = self.event.take() {
event.notify(usize::MAX);
}
}
}
46 changes: 46 additions & 0 deletions zbus/src/object_server/interface/dispatch_result.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::{future::Future, pin::Pin};

use zbus::message::Flags;
use zvariant::DynamicType;

use crate::{message::Message, Connection, Result};
use tracing::trace;

/// A helper type returned by [`Interface`](`crate::object_server::Interface`) callbacks.
pub enum DispatchResult<'a> {
/// This interface does not support the given method.
NotFound,

/// Retry with [Interface::call_mut](`crate::object_server::Interface::call_mut).
///
/// This is equivalent to NotFound if returned by call_mut.
RequiresMut,

/// The method was found and will be completed by running this Future.
Async(Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>),
}

impl<'a> DispatchResult<'a> {
/// Helper for creating the Async variant.
pub fn new_async<F, T, E>(conn: &'a Connection, msg: &'a Message, f: F) -> Self
where
F: Future<Output = ::std::result::Result<T, E>> + Send + 'a,
T: serde::Serialize + DynamicType + Send + Sync,
E: zbus::DBusError + Send,
{
DispatchResult::Async(Box::pin(async move {
let hdr = msg.header();
let ret = f.await;
if !hdr.primary().flags().contains(Flags::NoReplyExpected) {
match ret {
Ok(r) => conn.reply(msg, &r).await,
Err(e) => conn.reply_dbus_error(&hdr, e).await,
}
.map(|_seq| ())
} else {
trace!("No reply expected for {:?} by the caller.", msg);
Ok(())
}
}))
}
}
51 changes: 51 additions & 0 deletions zbus/src/object_server/interface/interface_deref.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::{
marker::PhantomData,
ops::{Deref, DerefMut},
};

use crate::async_lock::{RwLockReadGuard, RwLockWriteGuard};

use super::Interface;

/// Opaque structure that derefs to an `Interface` type.
pub struct InterfaceDeref<'d, I> {
pub(super) iface: RwLockReadGuard<'d, dyn Interface>,
pub(super) phantom: PhantomData<I>,
}

impl<I> Deref for InterfaceDeref<'_, I>
where
I: Interface,
{
type Target = I;

fn deref(&self) -> &I {
self.iface.downcast_ref::<I>().unwrap()
}
}

/// Opaque structure that mutably derefs to an `Interface` type.
pub struct InterfaceDerefMut<'d, I> {
pub(super) iface: RwLockWriteGuard<'d, dyn Interface>,
pub(super) phantom: PhantomData<I>,
}

impl<I> Deref for InterfaceDerefMut<'_, I>
where
I: Interface,
{
type Target = I;

fn deref(&self) -> &I {
self.iface.downcast_ref::<I>().unwrap()
}
}

impl<I> DerefMut for InterfaceDerefMut<'_, I>
where
I: Interface,
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.iface.downcast_mut::<I>().unwrap()
}
}
111 changes: 111 additions & 0 deletions zbus/src/object_server/interface/interface_ref.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::{marker::PhantomData, sync::Arc};

use super::{Interface, InterfaceDeref, InterfaceDerefMut, SignalContext};
use crate::async_lock::RwLock;

/// Wrapper over an interface, along with its corresponding `SignalContext`
/// instance. A reference to the underlying interface may be obtained via
/// [`InterfaceRef::get`] and [`InterfaceRef::get_mut`].
pub struct InterfaceRef<I> {
pub(crate) ctxt: SignalContext<'static>,
pub(crate) lock: Arc<RwLock<dyn Interface>>,
pub(crate) phantom: PhantomData<I>,
}

impl<I> InterfaceRef<I>
where
I: 'static,
{
/// Get a reference to the underlying interface.
///
/// **WARNING:** If methods (e.g property setters) in `ObjectServer` require `&mut self`
/// `ObjectServer` will not be able to access the interface in question until all references
/// of this method are dropped; it is highly recommended that the scope of the interface
/// returned is restricted.
pub async fn get(&self) -> InterfaceDeref<'_, I> {
let iface = self.lock.read().await;

iface
.downcast_ref::<I>()
.expect("Unexpected interface type");

InterfaceDeref {
iface,
phantom: PhantomData,
}
}

/// Get a reference to the underlying interface.
///
/// **WARNINGS:** Since the `ObjectServer` will not be able to access the interface in question
/// until the return value of this method is dropped, it is highly recommended that the scope
/// of the interface returned is restricted.
///
/// # Errors
///
/// If the interface at this instance's path is not valid, an `Error::InterfaceNotFound` error
/// is returned.
///
/// # Examples
///
/// ```no_run
/// # use std::error::Error;
/// # use async_io::block_on;
/// # use zbus::{Connection, interface};
///
/// struct MyIface(u32);
///
/// #[interface(name = "org.myiface.MyIface")]
/// impl MyIface {
/// #[zbus(property)]
/// async fn count(&self) -> u32 {
/// self.0
/// }
/// }
///
/// # block_on(async {
/// // Set up connection and object_server etc here and then in another part of the code:
/// # let connection = Connection::session().await?;
/// #
/// # let path = "/org/zbus/path";
/// # connection.object_server().at(path, MyIface(22)).await?;
/// let object_server = connection.object_server();
/// let iface_ref = object_server.interface::<_, MyIface>(path).await?;
/// let mut iface = iface_ref.get_mut().await;
/// iface.0 = 42;
/// iface.count_changed(iface_ref.signal_context()).await?;
/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
/// # })?;
/// #
/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
/// ```
pub async fn get_mut(&self) -> InterfaceDerefMut<'_, I> {
let mut iface = self.lock.write().await;

iface
.downcast_ref::<I>()
.expect("Unexpected interface type");
iface
.downcast_mut::<I>()
.expect("Unexpected interface type");

InterfaceDerefMut {
iface,
phantom: PhantomData,
}
}

pub fn signal_context(&self) -> &SignalContext<'static> {
&self.ctxt
}
}

impl<I> Clone for InterfaceRef<I> {
fn clone(&self) -> Self {
Self {
ctxt: self.ctxt.clone(),
lock: self.lock.clone(),
phantom: PhantomData,
}
}
}
Original file line number Diff line number Diff line change
@@ -1,61 +1,25 @@
mod dispatch_result;
pub use dispatch_result::*;
mod interface_ref;
pub use interface_ref::*;
mod interface_deref;
pub use interface_deref::*;

use std::{
any::{Any, TypeId},
collections::HashMap,
fmt::{self, Write},
future::Future,
pin::Pin,
sync::Arc,
};

use async_trait::async_trait;
use zbus::message::Flags;
use zbus_names::{InterfaceName, MemberName};
use zvariant::{DynamicType, OwnedValue, Value};
use zvariant::{OwnedValue, Value};

use crate::{
async_lock::RwLock, fdo, message::Message, object_server::SignalContext, Connection,
ObjectServer, Result,
ObjectServer,
};
use tracing::trace;

/// A helper type returned by [`Interface`] callbacks.
pub enum DispatchResult<'a> {
/// This interface does not support the given method.
NotFound,

/// Retry with [Interface::call_mut].
///
/// This is equivalent to NotFound if returned by call_mut.
RequiresMut,

/// The method was found and will be completed by running this Future.
Async(Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>),
}

impl<'a> DispatchResult<'a> {
/// Helper for creating the Async variant.
pub fn new_async<F, T, E>(conn: &'a Connection, msg: &'a Message, f: F) -> Self
where
F: Future<Output = ::std::result::Result<T, E>> + Send + 'a,
T: serde::Serialize + DynamicType + Send + Sync,
E: zbus::DBusError + Send,
{
DispatchResult::Async(Box::pin(async move {
let hdr = msg.header();
let ret = f.await;
if !hdr.primary().flags().contains(Flags::NoReplyExpected) {
match ret {
Ok(r) => conn.reply(msg, &r).await,
Err(e) => conn.reply_dbus_error(&hdr, e).await,
}
.map(|_seq| ())
} else {
trace!("No reply expected for {:?} by the caller.", msg);
Ok(())
}
}))
}
}

/// This trait is used to dispatch messages to an interface instance.
///
Expand Down
Loading

0 comments on commit 0f25727

Please sign in to comment.