Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LazyValue #285

Merged
merged 21 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/service_bus/src/bin/cbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use dotenv::dotenv;
use fe2o3_amqp::{
connection::ConnectionHandle,
sasl_profile::SaslProfile,
Connection, Receiver, Sender, Session,
types::{messaging::Body, primitives::Value},
Connection, Receiver, Sender, Session,
};
use fe2o3_amqp_cbs::{client::CbsClient, token::CbsToken};
use hmac::{
Expand Down
7 changes: 4 additions & 3 deletions examples/service_bus/src/bin/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use dotenv::dotenv;
use fe2o3_amqp_management::{client::MgmtClient, operations::ReadRequest};
use std::env;

use fe2o3_amqp::{Connection, sasl_profile::SaslProfile, Session, Sender};
use fe2o3_amqp::{sasl_profile::SaslProfile, Connection, Sender, Session};

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -33,7 +33,8 @@ async fn main() {
// .management_node_address(format!("{}/{}", queue_name, "$management"))
.management_node_address("$management")
.attach(&mut session)
.await.unwrap();
.await
.unwrap();

let read_request = ReadRequest::name("q1", "com.microsoft:servicebus", None);
let response = mgmt_client.call(read_request).await.unwrap();
Expand All @@ -47,4 +48,4 @@ async fn main() {
mgmt_client.close().await.unwrap();
session.end().await.unwrap();
connection.close().await.unwrap();
}
}
6 changes: 3 additions & 3 deletions examples/service_bus/src/bin/queue_dlq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
use dotenv::dotenv;
use fe2o3_amqp::connection::ConnectionHandle;
use fe2o3_amqp::transaction::{Controller, Transaction};
use fe2o3_amqp::transaction::TransactionDischarge;
use fe2o3_amqp::transaction::TransactionalRetirement;
use fe2o3_amqp::transaction::{Controller, Transaction};
use fe2o3_amqp::types::messaging::Body;
use fe2o3_amqp::types::messaging::Message;
use fe2o3_amqp::types::messaging::Properties;
Expand Down Expand Up @@ -86,10 +86,10 @@ async fn main() {
println!("Received from DLQ: {:?}", delivery);

// The Azure ServiceBus SDK disposes the DLQ message in a txn
let mut controller = Controller::attach(&mut session, "controller").await.unwrap();
let txn = Transaction::declare(&mut controller, None)
let mut controller = Controller::attach(&mut session, "controller")
.await
.unwrap();
let txn = Transaction::declare(&mut controller, None).await.unwrap();
txn.accept(&mut receiver, &delivery).await.unwrap();
txn.commit().await.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions examples/service_bus/src/bin/queue_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ async fn main() {
let delivery = receiver.recv::<Body<Value>>().await.unwrap();
receiver.accept(&delivery).await.unwrap();
println!("{:?}", delivery);
let msg =
std::str::from_utf8(&delivery.body().try_as_data().unwrap().next().unwrap()[..]).unwrap();
let msg = std::str::from_utf8(&delivery.body().try_as_data().unwrap().next().unwrap()[..])
.unwrap();
println!("Received: {:?}", msg);
}

Expand Down
4 changes: 2 additions & 2 deletions examples/service_bus/src/bin/receive_data_batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use dotenv::dotenv;
use fe2o3_amqp::Delivery;
use fe2o3_amqp::types::messaging::Batch;
use fe2o3_amqp::types::messaging::Data;
use fe2o3_amqp::Delivery;
use fe2o3_amqp::Receiver;
use std::env;

Expand All @@ -11,7 +11,7 @@ use fe2o3_amqp::Session;

fn process_delivery_data(delivery: &Delivery<Batch<Data>>) {
for Data(data) in delivery.body() {
let msg = std::str::from_utf8(&data).unwrap();
let msg = std::str::from_utf8(data).unwrap();
println!("Received: {:?}", msg);
}
}
Expand Down
11 changes: 9 additions & 2 deletions examples/service_bus/src/bin/send_data_batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use std::env;

use dotenv::dotenv;
use fe2o3_amqp::{Connection, sasl_profile::SaslProfile, Session, Sender, types::{primitives::Binary, messaging::{Message, Data}}};
use fe2o3_amqp::{
sasl_profile::SaslProfile,
types::{
messaging::{Data, Message},
primitives::Binary,
},
Connection, Sender, Session,
};

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -43,4 +50,4 @@ async fn main() {

session.end().await.unwrap();
connection.close().await.unwrap();
}
}
7 changes: 5 additions & 2 deletions examples/service_bus/src/bin/topic_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::env;

use dotenv::dotenv;
use fe2o3_amqp::{
sasl_profile::SaslProfile, types::{primitives::Value, messaging::Body}, Connection, Receiver, Session,
sasl_profile::SaslProfile,
types::{messaging::Body, primitives::Value},
Connection, Receiver, Session,
};

#[tokio::main]
Expand Down Expand Up @@ -37,7 +39,8 @@ async fn main() {
for _ in 0..3 {
// All of the Microsoft AMQP clients represent the event body as an uninterpreted bag of bytes.
let delivery = receiver.recv::<Body<Value>>().await.unwrap();
let msg = std::str::from_utf8(&delivery.body().try_as_data().unwrap().next().unwrap()[..]).unwrap();
let msg = std::str::from_utf8(&delivery.body().try_as_data().unwrap().next().unwrap()[..])
.unwrap();
println!("Received: {:?}", msg);
receiver.accept(&delivery).await.unwrap();
}
Expand Down
25 changes: 25 additions & 0 deletions fe2o3-amqp-types/src/messaging/body_section.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{

use serde::{de, ser, Deserialize, Serialize};
use serde_amqp::{
lazy::LazyValue,
primitives::{
Array, Binary, Dec128, Dec32, Dec64, OrderedMap, Symbol, SymbolRef, Timestamp, Uuid,
},
Expand Down Expand Up @@ -317,6 +318,30 @@ impl FromEmptyBody for Value {
}
}

/* -------------------------------------------------------------------------- */

impl IntoBody for LazyValue {
type Body = AmqpValue<Self>;

fn into_body(self) -> Self::Body {
AmqpValue(self)
}
}

impl<'de> FromBody<'de> for LazyValue {
type Body = AmqpValue<Self>;

fn from_body(deserializable: Self::Body) -> Self {
deserializable.0
}
}

impl FromEmptyBody for LazyValue {
fn from_empty_body() -> Result<Self, serde_amqp::Error> {
serde_amqp::from_value(Value::Null)
}
}

/* -------------------------------------------------------------------------- */
/* Other common types */
/* -------------------------------------------------------------------------- */
Expand Down
12 changes: 11 additions & 1 deletion fe2o3-amqp-types/src/messaging/format/amqp_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
#[cfg(test)]
mod tests {
use serde::{Deserialize, Serialize};
use serde_amqp::{from_slice, to_vec, Value};
use serde_amqp::{from_slice, lazy::LazyValue, to_vec, Value};

use crate::messaging::{
message::__private::{Deserializable, Serializable},
Expand Down Expand Up @@ -209,4 +209,14 @@ mod tests {
let decoded: Deserializable<Message<TestExample>> = from_slice(&buf).unwrap();
assert_eq!(decoded.0.body, expected)
}

#[test]
fn test_decoding_some_str_as_lazy_value() {
let src = Message::builder().value(TEST_STR).build();
let buf = to_vec(&Serializable(src)).unwrap();
let msg: Deserializable<Message<AmqpValue<LazyValue>>> = from_slice(&buf).unwrap();

let expected = to_vec(&TEST_STR).unwrap();
assert_eq!(msg.0.body.0.as_slice(), expected);
}
}
16 changes: 11 additions & 5 deletions fe2o3-amqp-types/src/messaging/message/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Implementation of Message as defined in AMQP 1.0 protocol Part 3.2
use std::{io, marker::PhantomData};
use std::marker::PhantomData;

use serde::{
de::{self},
Expand Down Expand Up @@ -40,7 +40,9 @@ pub trait DecodeIntoMessage: Sized {
type DecodeError;

/// Decode reader into [`Message<T>`]
fn decode_into_message(reader: impl io::Read) -> Result<Message<Self>, Self::DecodeError>;
fn decode_message_from_reader<'de>(
reader: impl serde_amqp::read::Read<'de>,
) -> Result<Message<Self>, Self::DecodeError>;
}

impl<T> DecodeIntoMessage for T
Expand All @@ -49,9 +51,13 @@ where
{
type DecodeError = serde_amqp::Error;

fn decode_into_message(reader: impl io::Read) -> Result<Message<Self>, Self::DecodeError> {
let message: Deserializable<Message<T>> = serde_amqp::from_reader(reader)?;
Ok(message.0)
fn decode_message_from_reader<'de>(
reader: impl serde_amqp::read::Read<'de>,
) -> Result<Message<Self>, Self::DecodeError> {
use serde::Deserialize;

let mut de = serde_amqp::de::Deserializer::new(reader);
Deserializable::<Message<T>>::deserialize(&mut de).map(|deserializable| deserializable.0)
}
}

Expand Down
2 changes: 1 addition & 1 deletion fe2o3-amqp-types/src/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde_amqp::format_code::EncodingCodes;
use serde_bytes::ByteBuf;

pub use serde_amqp::primitives::*;
pub use serde_amqp::value::Value;
pub use serde_amqp::{lazy::LazyValue, value::Value};

mod simple_value;
pub use simple_value::SimpleValue;
4 changes: 2 additions & 2 deletions fe2o3-amqp/src/endpoint/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ pub(crate) trait ReceiverLink: Link + LinkExt {
// More than one transfer frames should be hanlded by the
// `Receiver`
fn on_complete_transfer<'a, T, P>(
&'a mut self,
&mut self,
transfer: Transfer,
payload: P,
section_number: u32,
section_offset: u64,
) -> Result<Delivery<T>, Self::TransferError>
where
for<'de> T: FromBody<'de> + Send,
for<'b> P: IntoReader + AsByteIterator<'b> + Send + 'a;
P: IntoReader<'a> + AsByteIterator + Send + 'a;

async fn dispose(
&self,
Expand Down
24 changes: 20 additions & 4 deletions fe2o3-amqp/src/link/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ cfg_transaction! {
}

#[cfg(docsrs)]
use fe2o3_amqp_types::messaging::{AmqpSequence, AmqpValue, Batch, Body};
use fe2o3_amqp_types::messaging::{
primitives::{LazyValue, Value},
AmqpSequence, AmqpValue, Batch, Body,
};

/// Credit mode for the link
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -253,6 +256,15 @@ impl Receiver {
/// receiver.accept(&delivery).await.unwrap();
/// ```
///
/// Lazy deserialization is supported with [`Body<LazyValue>`]. The [`LazyValue`] type is a
/// a thin wrapper around the encoded bytes and can be deserialized lazily.
///
/// ```rust,ignore
/// // `Body<LazyValue>` covers all possibilities and can be deserialized lazily
/// let delivery: Delivery<Body<LazyValue>> = receiver.recv::<Body<LazyValue>>().await.unwrap();
/// receiver.accept(&delivery).await.unwrap();
/// ```
///
/// If the user is certain an [`AmqpValue`] body section is expected, then the user could use
/// [`AmqpValue<KnownType>`] if the exact message type `KnownType` is known and implements
/// [`serde::Deserialize`]. If the user is not sure about the exact message type, one could use
Expand Down Expand Up @@ -919,7 +931,7 @@ where
count_number_of_sections_and_offset(&payload);
let delivery = self.link.on_complete_transfer(
transfer,
payload,
&payload,
section_number,
section_offset,
)?;
Expand Down Expand Up @@ -969,8 +981,12 @@ where
None => {
let (section_number, section_offset) =
count_number_of_sections_and_offset(&payload);
self.link
.on_complete_transfer(transfer, payload, section_number, section_offset)?
self.link.on_complete_transfer(
transfer,
&payload,
section_number,
section_offset,
)?
}
};

Expand Down
14 changes: 7 additions & 7 deletions fe2o3-amqp/src/link/receiver_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ where
}

fn on_complete_transfer<'a, T, P>(
&'a mut self,
&mut self,
transfer: Transfer,
payload: P,
section_number: u32,
section_offset: u64,
) -> Result<Delivery<T>, Self::TransferError>
where
for<'de> T: FromBody<'de> + Send,
for<'b> P: IntoReader + AsByteIterator<'b> + Send + 'a,
P: IntoReader<'a> + AsByteIterator + Send + 'a,
{
match self.local_state {
LinkState::Attached | LinkState::IncompleteAttachExchanged => {}
Expand All @@ -158,7 +158,7 @@ where
let (result, mode) = if settled_by_sender {
// If the message is pre-settled, there is no need to
// add to the unsettled map and no need to reply to the Sender
let result = T::decode_into_message(payload.into_reader());
let result = T::decode_message_from_reader(payload.into_reader());
(result, None)
} else {
// If the message is being sent settled by the sender, the value of this
Expand All @@ -177,7 +177,7 @@ where
None => None,
};

let result = T::decode_into_message(payload.into_reader());
let result = T::decode_message_from_reader(payload.into_reader());

let state = DeliveryState::Received(Received {
section_number, // What is section number?
Expand Down Expand Up @@ -351,9 +351,9 @@ fn consecutive_chunk_indices(delivery_infos: &[DeliveryInfo]) -> Vec<usize> {
}

/// Count number of sections in encoded message
pub(crate) fn count_number_of_sections_and_offset<'a, B>(bytes: &'a B) -> (u32, u64)
pub(crate) fn count_number_of_sections_and_offset<'a, B>(bytes: B) -> (u32, u64)
where
B: AsByteIterator<'a>,
B: AsByteIterator + 'a,
{
let b0 = bytes.as_byte_iterator();
let len = b0.len();
Expand Down Expand Up @@ -957,7 +957,7 @@ mod tests {
// let mut serializer = serde_amqp::ser::Serializer::new(&mut buf);
// message.serialize(&mut serializer).unwrap();
let buf = to_vec(&Serializable(message)).unwrap();
let (_nums, _offset) = count_number_of_sections_and_offset(&buf);
let (_nums, _offset) = count_number_of_sections_and_offset(&*buf);
}

#[test]
Expand Down
Loading
Loading