Skip to content

Commit

Permalink
Merge pull request #226 from minghuaw/0.9-dev
Browse files Browse the repository at this point in the history
async fn in trait
  • Loading branch information
minghuaw authored Feb 22, 2024
2 parents 48d1417 + 056fbaf commit 201604f
Show file tree
Hide file tree
Showing 33 changed files with 121 additions and 122 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ exclude = [
"examples/activemq",
"examples/cancel_safety",
"examples/quick_start",
"examples/wasm32_in_browser",
"examples/wasm32-in-browser",
"examples/qpid_management_framework",
]
4 changes: 2 additions & 2 deletions examples/owned_txn_posting/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ async fn main() {
.unwrap();

// Commit
let mut txn1 = OwnedTransaction::declare(&mut session, "controller-1", None)
let txn1 = OwnedTransaction::declare(&mut session, "controller-1", None)
.await
.unwrap();
txn1.post(&mut sender, "hello").await.unwrap();
txn1.post(&mut sender, "world").await.unwrap();
txn1.commit().await.unwrap();

// Rollback
let mut txn2 = OwnedTransaction::declare(&mut session, "controller-2", None)
let txn2 = OwnedTransaction::declare(&mut session, "controller-2", None)
.await
.unwrap();
txn2.post(&mut sender, "foo").await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/owned_txn_retirement/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main() {
let delivery: Delivery<Value> = receiver.recv().await.unwrap();

// Transactionally retiring
let mut txn = OwnedTransaction::declare(&mut session, "controller", None).await.unwrap();
let txn = OwnedTransaction::declare(&mut session, "controller", None).await.unwrap();
txn.accept(&mut receiver, &delivery).await.unwrap();
txn.commit().await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion examples/receiver_auto_accept/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main() {
.unwrap();

// The delivery will be automatically accepted
let delivery: Delivery<Value> = receiver.recv().await.unwrap();
let _delivery: Delivery<Value> = receiver.recv().await.unwrap();

receiver.close().await.unwrap();
session.end().await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/rustls_connection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ tokio = { version = "1", features = ["net", "rt", "rt-multi-thread", "macros"] }
fe2o3-amqp = { features = ["rustls"], path = "../../fe2o3-amqp" }
rustls = "0.22"
tokio-rustls = "0.25"
webpki-roots = "0.26"
webpki-roots = "0.26"
2 changes: 1 addition & 1 deletion examples/tls_sasl_connection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ tokio = { version = "1", features = ["net", "rt", "rt-multi-thread", "macros"] }
fe2o3-amqp = { features = ["rustls"], path = "../../fe2o3-amqp" }
rustls = "0.22"
tokio-rustls = "0.25"
webpki-roots = "0.26"
webpki-roots = "0.26"
4 changes: 2 additions & 2 deletions examples/txn_posting/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ async fn main() {
.unwrap();

// Commit
let mut txn1 = Transaction::declare(&mut controller, None).await.unwrap();
let txn1 = Transaction::declare(&mut controller, None).await.unwrap();
txn1.post(&mut sender, "hello").await.unwrap();
txn1.post(&mut sender, "world").await.unwrap();
txn1.commit().await.unwrap();

// Rollback
let mut txn2 = Transaction::declare(&mut controller, None).await.unwrap();
let txn2 = Transaction::declare(&mut controller, None).await.unwrap();
txn2.post(&mut sender, "foo").await.unwrap();
txn2.rollback().await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion examples/txn_retirement/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async fn main() {
let delivery: Delivery<Value> = receiver.recv().await.unwrap();

// Transactionally retiring
let mut txn = Transaction::declare(&mut controller, None).await.unwrap();
let txn = Transaction::declare(&mut controller, None).await.unwrap();
txn.accept(&mut receiver, &delivery).await.unwrap();
txn.commit().await.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions fe2o3-amqp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fe2o3-amqp"
version = "0.9.3"
version = "0.9.4"
edition = "2021"
description = "An implementation of AMQP1.0 protocol based on serde and tokio"
license = "MIT/Apache-2.0"
Expand All @@ -9,6 +9,7 @@ homepage = "https://github.com/minghuaw/fe2o3-amqp"
repository = "https://github.com/minghuaw/fe2o3-amqp"
keywords = ["amqp", "serde", "tokio"]
readme = "Readme.md"
rust-version = "1.75.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down Expand Up @@ -50,7 +51,6 @@ futures-util = { version = "0.3", features = ["sink"] }
pin-project-lite = "0.2"
url = "2"
slab = "0.4"
async-trait = "0.1"
serde_bytes = "0.11"
parking_lot = { version = "0.12", features = ["send_guard"] }

Expand Down
6 changes: 6 additions & 0 deletions fe2o3-amqp/Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Change Log

## 0.9.4

1. Removed `async-trait` dependency and use return position `impl Future` and `async` function in
trait feature instead
2. Set MSRV to "1.75.0"

## 0.9.3

1. Updated deps
Expand Down
8 changes: 5 additions & 3 deletions fe2o3-amqp/Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ dependencies = [
"check_all_features",

"check_feature_acceptor_wasm32",
"check_feature_rustls_wasm32",
"check_feature_nativetls_wasm32",
"check_feature_transaction_wasm32",
"check_feature_tracing_wasm32",
"check_feature_log_wasm32",
"check_feature_group1_wasm32",
"check_feature_group2_wasm32",
"check_feature_group3_wasm32",
"check_feature_group4_wasm32",
"check_feature_group5_wasm32",
"check_feature_group6_wasm32",

# rustls has problem with wasm target
"check_feature_rustls_wasm32",
"check_feature_group1_wasm32",
"check_feature_group3_wasm32",
"check_feature_group7_wasm32",
"check_all_features_wasm32",
]
Expand Down
4 changes: 2 additions & 2 deletions fe2o3-amqp/src/acceptor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::{io, marker::PhantomData, time::Duration};

use async_trait::async_trait;

use fe2o3_amqp_types::{
definitions::{self},
performatives::{Begin, Close, End, Open},
Expand Down Expand Up @@ -465,7 +465,7 @@ pub struct ListenerConnection {
pub(crate) session_listener: mpsc::Sender<IncomingSession>,
}

#[async_trait]

impl endpoint::Connection for ListenerConnection {
type AllocError = <connection::Connection as endpoint::Connection>::AllocError;
type OpenError = <connection::Connection as endpoint::Connection>::OpenError;
Expand Down
6 changes: 3 additions & 3 deletions fe2o3-amqp/src/acceptor/session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Session Listener
use async_trait::async_trait;

use fe2o3_amqp_types::{
definitions::{self, ConnectionError},
performatives::{Attach, Begin, Detach, Disposition, End, Flow, Transfer},
Expand Down Expand Up @@ -347,7 +347,7 @@ pub struct ListenerSession {

impl endpoint::SessionExt for ListenerSession {}

#[async_trait]

impl endpoint::Session for ListenerSession {
type AllocError = <session::Session as endpoint::Session>::AllocError;
type BeginError = <session::Session as endpoint::Session>::BeginError;
Expand Down Expand Up @@ -536,7 +536,7 @@ cfg_transaction! {
}
}

#[async_trait]

impl endpoint::HandleDischarge for ListenerSession {
async fn commit_transaction(
&mut self,
Expand Down
4 changes: 0 additions & 4 deletions fe2o3-amqp/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
use std::{cmp::min, collections::HashMap, sync::Arc};

use async_trait::async_trait;

use fe2o3_amqp_types::{
definitions::{self},
performatives::{Begin, Close, End, Open},
Expand Down Expand Up @@ -528,8 +526,6 @@ impl Connection {
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
impl endpoint::Connection for Connection {
type AllocError = AllocSessionError;
type OpenError = ConnectionStateError;
Expand Down
17 changes: 8 additions & 9 deletions fe2o3-amqp/src/endpoint/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Defines trait for connection implementations
use async_trait::async_trait;
use std::future::Future;

use fe2o3_amqp_types::{
definitions::Error,
performatives::{Begin, Close, End, Open},
Expand All @@ -13,8 +14,6 @@ use crate::{frames::amqp::Frame, session::frame::SessionIncomingItem, SendBound}
use super::{IncomingChannel, OutgoingChannel, Session};

/// Trait for connection
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch="wasm32", async_trait(?Send))]
pub(crate) trait Connection {
type AllocError: SendBound;
type OpenError: SendBound;
Expand Down Expand Up @@ -47,18 +46,18 @@ pub(crate) trait Connection {
/// Reacting to remote Begin frame
///
/// Do NOT forward to session here. Forwarding is handled elsewhere.
async fn on_incoming_begin(
fn on_incoming_begin(
&mut self,
channel: IncomingChannel,
begin: Begin,
) -> Result<(), Self::Error>;
) -> impl Future<Output = Result<(), Self::Error>> + Send;

/// Reacting to remote End frame
async fn on_incoming_end(
fn on_incoming_end(
&mut self,
channel: IncomingChannel,
end: End,
) -> Result<(), Self::Error>;
) -> impl Future<Output = Result<(), Self::Error>> + Send;

/// Reacting to remote Close frame
fn on_incoming_close(
Expand All @@ -77,11 +76,11 @@ pub(crate) trait Connection {
W: Sink<Frame> + SendBound + Unpin,
Self::OpenError: From<W::Error>; // DO NOT remove this. This is where `Transport` will be used

async fn send_close<W>(
fn send_close<W>(
&mut self,
writer: &mut W,
error: Option<Error>,
) -> Result<(), Self::CloseError>
) -> impl Future<Output = Result<(), Self::CloseError>> + SendBound
where
W: Sink<Frame> + SendBound + Unpin,
Self::CloseError: From<W::Error>; // DO NOT remove this. This is where `Transport` will be used
Expand Down
6 changes: 0 additions & 6 deletions fe2o3-amqp/src/endpoint/link.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Defines traits for link implementations
use async_trait::async_trait;
use fe2o3_amqp_types::{
definitions::{
DeliveryNumber, DeliveryTag, Error, Fields, MessageFormat, ReceiverSettleMode, Role,
Expand All @@ -25,7 +24,6 @@ use crate::{

use super::{OutputHandle, Settlement};

#[async_trait]
pub(crate) trait LinkDetach {
type DetachError: Send;

Expand All @@ -39,7 +37,6 @@ pub(crate) trait LinkDetach {
) -> Result<(), Self::DetachError>;
}

#[async_trait]
pub(crate) trait LinkAttach {
type AttachExchange: Send;
type AttachError: Send;
Expand All @@ -61,7 +58,6 @@ pub(crate) trait Link: LinkAttach + LinkDetach {
fn role() -> Role;
}

#[async_trait]
pub(crate) trait LinkExt: Link {
type FlowState;
type Unsettled;
Expand Down Expand Up @@ -110,7 +106,6 @@ pub(crate) trait LinkExt: Link {
) -> Self::AttachError;
}

#[async_trait]
pub(crate) trait SenderLink: Link + LinkExt {
type FlowError: Send;
type TransferError: Send;
Expand Down Expand Up @@ -173,7 +168,6 @@ pub(crate) trait SenderLink: Link + LinkExt {
) -> Result<(), Self::DispositionError>;
}

#[async_trait]
pub(crate) trait ReceiverLink: Link + LinkExt {
type FlowError: Send;
type TransferError: Send;
Expand Down
26 changes: 16 additions & 10 deletions fe2o3-amqp/src/endpoint/session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Defines traits for session implementations
use async_trait::async_trait;
use std::future::Future;

use fe2o3_amqp_types::{
definitions::Error,
performatives::{Attach, Begin, Detach, Disposition, End, Flow, Transfer},
Expand All @@ -16,7 +17,6 @@ use crate::{

use super::{IncomingChannel, InputHandle, LinkFlow, OutgoingChannel, OutputHandle};

#[async_trait]
pub(crate) trait Session {
type AllocError: SendBound;
type BeginError: SendBound;
Expand Down Expand Up @@ -52,27 +52,33 @@ pub(crate) trait Session {
begin: Begin,
) -> Result<(), Self::BeginError>;

async fn on_incoming_attach(&mut self, attach: Attach) -> Result<(), Self::Error>;
fn on_incoming_attach(
&mut self,
attach: Attach,
) -> impl Future<Output = Result<(), Self::Error>> + Send;

/// An `Ok(Some(link_flow))` means an immediate echo of the link flow is requested
async fn on_incoming_flow(
fn on_incoming_flow(
&mut self,
flow: Flow,
) -> Result<Option<SessionOutgoingItem>, Self::Error>;
) -> impl Future<Output = Result<Option<SessionOutgoingItem>, Self::Error>> + Send;

async fn on_incoming_transfer(
fn on_incoming_transfer(
&mut self,
transfer: Transfer,
payload: Payload,
) -> Result<Option<Disposition>, Self::Error>;
) -> impl Future<Output = Result<Option<Disposition>, Self::Error>> + Send;

/// An `Ok(Some(Disposition))` means an immediate disposition should be sent back
fn on_incoming_disposition(
&mut self,
disposition: Disposition,
) -> Result<Option<Vec<Disposition>>, Self::Error>;

async fn on_incoming_detach(&mut self, detach: Detach) -> Result<(), Self::Error>;
fn on_incoming_detach(
&mut self,
detach: Detach,
) -> impl Future<Output = Result<(), Self::Error>> + Send;

fn on_incoming_end(&mut self, channel: IncomingChannel, end: End)
-> Result<(), Self::EndError>;
Expand All @@ -83,11 +89,11 @@ pub(crate) trait Session {
writer: &mpsc::Sender<SessionFrame>,
) -> Result<(), Self::BeginError>;

async fn send_end(
fn send_end(
&mut self,
writer: &mpsc::Sender<SessionFrame>,
error: Option<Error>,
) -> Result<(), Self::EndError>;
) -> impl Future<Output = Result<(), Self::EndError>> + Send;

// Intercepting LinkFrames
fn on_outgoing_attach(&mut self, attach: Attach) -> Result<SessionFrame, Self::Error>;
Expand Down
9 changes: 5 additions & 4 deletions fe2o3-amqp/src/endpoint/txn_resource.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use std::future::Future;

use fe2o3_amqp_types::{
messaging::Accepted,
transaction::{TransactionError, TransactionId},
Expand All @@ -12,12 +13,12 @@ pub(crate) trait HandleDeclare: Session {
fn allocate_transaction_id(&mut self) -> Result<TransactionId, AllocTxnIdError>;
}

#[async_trait]

pub(crate) trait HandleDischarge: Session {
async fn commit_transaction(
fn commit_transaction(
&mut self,
txn_id: TransactionId,
) -> Result<Result<Accepted, TransactionError>, Self::Error>;
) -> impl Future<Output = Result<Result<Accepted, TransactionError>, Self::Error>> + Send;
fn rollback_transaction(
&mut self,
txn_id: TransactionId,
Expand Down
Loading

0 comments on commit 201604f

Please sign in to comment.