Skip to content

Commit

Permalink
Add LazyValue (#285)
Browse files Browse the repository at this point in the history
* partial impl of lazy_value

* tested LazyValue::from_reader

* added convenience fn for LazyValue

* moved helper read methods to mod read

* use SliceReader for single payload and IoReader for multi-payload

* update unit test

* differentiate forward_read_bytes with and without len hint

* split NewType into SequenceType and NonNativeType (#286)

* partial impl of lazy_value

* tested LazyValue::from_reader

* added convenience fn for LazyValue

* moved helper read methods to mod read

* use SliceReader for single payload and IoReader for multi-payload

* update unit test

* differentiate forward_read_bytes with and without len hint

* impl Serialize and Deserialize for LazyValue

* add more LazyValue unit test

* test ser/de of LazyValue into/from Value

* tested receive LazyValue from servicebus

* updated documentation on LazyValue
  • Loading branch information
minghuaw authored Oct 2, 2024
1 parent 2f0fb1e commit 3485cfc
Show file tree
Hide file tree
Showing 31 changed files with 611 additions and 205 deletions.
2 changes: 1 addition & 1 deletion examples/service_bus/src/bin/cbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use dotenv::dotenv;
use fe2o3_amqp::{
connection::ConnectionHandle,
sasl_profile::SaslProfile,
Connection, Receiver, Sender, Session,
types::{messaging::Body, primitives::Value},
Connection, Receiver, Sender, Session,
};
use fe2o3_amqp_cbs::{client::CbsClient, token::CbsToken};
use hmac::{
Expand Down
7 changes: 4 additions & 3 deletions examples/service_bus/src/bin/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use dotenv::dotenv;
use fe2o3_amqp_management::{client::MgmtClient, operations::ReadRequest};
use std::env;

use fe2o3_amqp::{Connection, sasl_profile::SaslProfile, Session, Sender};
use fe2o3_amqp::{sasl_profile::SaslProfile, Connection, Sender, Session};

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -33,7 +33,8 @@ async fn main() {
// .management_node_address(format!("{}/{}", queue_name, "$management"))
.management_node_address("$management")
.attach(&mut session)
.await.unwrap();
.await
.unwrap();

let read_request = ReadRequest::name("q1", "com.microsoft:servicebus", None);
let response = mgmt_client.call(read_request).await.unwrap();
Expand All @@ -47,4 +48,4 @@ async fn main() {
mgmt_client.close().await.unwrap();
session.end().await.unwrap();
connection.close().await.unwrap();
}
}
6 changes: 3 additions & 3 deletions examples/service_bus/src/bin/queue_dlq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

use dotenv::dotenv;
use fe2o3_amqp::connection::ConnectionHandle;
use fe2o3_amqp::transaction::{Controller, Transaction};
use fe2o3_amqp::transaction::TransactionDischarge;
use fe2o3_amqp::transaction::TransactionalRetirement;
use fe2o3_amqp::transaction::{Controller, Transaction};
use fe2o3_amqp::types::messaging::Body;
use fe2o3_amqp::types::messaging::Message;
use fe2o3_amqp::types::messaging::Properties;
Expand Down Expand Up @@ -86,10 +86,10 @@ async fn main() {
println!("Received from DLQ: {:?}", delivery);

// The Azure ServiceBus SDK disposes the DLQ message in a txn
let mut controller = Controller::attach(&mut session, "controller").await.unwrap();
let txn = Transaction::declare(&mut controller, None)
let mut controller = Controller::attach(&mut session, "controller")
.await
.unwrap();
let txn = Transaction::declare(&mut controller, None).await.unwrap();
txn.accept(&mut receiver, &delivery).await.unwrap();
txn.commit().await.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions examples/service_bus/src/bin/queue_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ async fn main() {
let delivery = receiver.recv::<Body<Value>>().await.unwrap();
receiver.accept(&delivery).await.unwrap();
println!("{:?}", delivery);
let msg =
std::str::from_utf8(&delivery.body().try_as_data().unwrap().next().unwrap()[..]).unwrap();
let msg = std::str::from_utf8(&delivery.body().try_as_data().unwrap().next().unwrap()[..])
.unwrap();
println!("Received: {:?}", msg);
}

Expand Down
4 changes: 2 additions & 2 deletions examples/service_bus/src/bin/receive_data_batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use dotenv::dotenv;
use fe2o3_amqp::Delivery;
use fe2o3_amqp::types::messaging::Batch;
use fe2o3_amqp::types::messaging::Data;
use fe2o3_amqp::Delivery;
use fe2o3_amqp::Receiver;
use std::env;

Expand All @@ -11,7 +11,7 @@ use fe2o3_amqp::Session;

fn process_delivery_data(delivery: &Delivery<Batch<Data>>) {
for Data(data) in delivery.body() {
let msg = std::str::from_utf8(&data).unwrap();
let msg = std::str::from_utf8(data).unwrap();
println!("Received: {:?}", msg);
}
}
Expand Down
11 changes: 9 additions & 2 deletions examples/service_bus/src/bin/send_data_batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use std::env;

use dotenv::dotenv;
use fe2o3_amqp::{Connection, sasl_profile::SaslProfile, Session, Sender, types::{primitives::Binary, messaging::{Message, Data}}};
use fe2o3_amqp::{
sasl_profile::SaslProfile,
types::{
messaging::{Data, Message},
primitives::Binary,
},
Connection, Sender, Session,
};

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -43,4 +50,4 @@ async fn main() {

session.end().await.unwrap();
connection.close().await.unwrap();
}
}
7 changes: 5 additions & 2 deletions examples/service_bus/src/bin/topic_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::env;

use dotenv::dotenv;
use fe2o3_amqp::{
sasl_profile::SaslProfile, types::{primitives::Value, messaging::Body}, Connection, Receiver, Session,
sasl_profile::SaslProfile,
types::{messaging::Body, primitives::Value},
Connection, Receiver, Session,
};

#[tokio::main]
Expand Down Expand Up @@ -37,7 +39,8 @@ async fn main() {
for _ in 0..3 {
// All of the Microsoft AMQP clients represent the event body as an uninterpreted bag of bytes.
let delivery = receiver.recv::<Body<Value>>().await.unwrap();
let msg = std::str::from_utf8(&delivery.body().try_as_data().unwrap().next().unwrap()[..]).unwrap();
let msg = std::str::from_utf8(&delivery.body().try_as_data().unwrap().next().unwrap()[..])
.unwrap();
println!("Received: {:?}", msg);
receiver.accept(&delivery).await.unwrap();
}
Expand Down
25 changes: 25 additions & 0 deletions fe2o3-amqp-types/src/messaging/body_section.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{

use serde::{de, ser, Deserialize, Serialize};
use serde_amqp::{
lazy::LazyValue,
primitives::{
Array, Binary, Dec128, Dec32, Dec64, OrderedMap, Symbol, SymbolRef, Timestamp, Uuid,
},
Expand Down Expand Up @@ -317,6 +318,30 @@ impl FromEmptyBody for Value {
}
}

/* -------------------------------------------------------------------------- */

impl IntoBody for LazyValue {
type Body = AmqpValue<Self>;

fn into_body(self) -> Self::Body {
AmqpValue(self)
}
}

impl<'de> FromBody<'de> for LazyValue {
type Body = AmqpValue<Self>;

fn from_body(deserializable: Self::Body) -> Self {
deserializable.0
}
}

impl FromEmptyBody for LazyValue {
fn from_empty_body() -> Result<Self, serde_amqp::Error> {
serde_amqp::from_value(Value::Null)
}
}

/* -------------------------------------------------------------------------- */
/* Other common types */
/* -------------------------------------------------------------------------- */
Expand Down
12 changes: 11 additions & 1 deletion fe2o3-amqp-types/src/messaging/format/amqp_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
#[cfg(test)]
mod tests {
use serde::{Deserialize, Serialize};
use serde_amqp::{from_slice, to_vec, Value};
use serde_amqp::{from_slice, lazy::LazyValue, to_vec, Value};

use crate::messaging::{
message::__private::{Deserializable, Serializable},
Expand Down Expand Up @@ -209,4 +209,14 @@ mod tests {
let decoded: Deserializable<Message<TestExample>> = from_slice(&buf).unwrap();
assert_eq!(decoded.0.body, expected)
}

#[test]
fn test_decoding_some_str_as_lazy_value() {
let src = Message::builder().value(TEST_STR).build();
let buf = to_vec(&Serializable(src)).unwrap();
let msg: Deserializable<Message<AmqpValue<LazyValue>>> = from_slice(&buf).unwrap();

let expected = to_vec(&TEST_STR).unwrap();
assert_eq!(msg.0.body.0.as_slice(), expected);
}
}
16 changes: 11 additions & 5 deletions fe2o3-amqp-types/src/messaging/message/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Implementation of Message as defined in AMQP 1.0 protocol Part 3.2

use std::{io, marker::PhantomData};
use std::marker::PhantomData;

use serde::{
de::{self},
Expand Down Expand Up @@ -40,7 +40,9 @@ pub trait DecodeIntoMessage: Sized {
type DecodeError;

/// Decode reader into [`Message<T>`]
fn decode_into_message(reader: impl io::Read) -> Result<Message<Self>, Self::DecodeError>;
fn decode_message_from_reader<'de>(
reader: impl serde_amqp::read::Read<'de>,
) -> Result<Message<Self>, Self::DecodeError>;
}

impl<T> DecodeIntoMessage for T
Expand All @@ -49,9 +51,13 @@ where
{
type DecodeError = serde_amqp::Error;

fn decode_into_message(reader: impl io::Read) -> Result<Message<Self>, Self::DecodeError> {
let message: Deserializable<Message<T>> = serde_amqp::from_reader(reader)?;
Ok(message.0)
fn decode_message_from_reader<'de>(
reader: impl serde_amqp::read::Read<'de>,
) -> Result<Message<Self>, Self::DecodeError> {
use serde::Deserialize;

let mut de = serde_amqp::de::Deserializer::new(reader);
Deserializable::<Message<T>>::deserialize(&mut de).map(|deserializable| deserializable.0)
}
}

Expand Down
2 changes: 1 addition & 1 deletion fe2o3-amqp-types/src/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde_amqp::format_code::EncodingCodes;
use serde_bytes::ByteBuf;

pub use serde_amqp::primitives::*;
pub use serde_amqp::value::Value;
pub use serde_amqp::{lazy::LazyValue, value::Value};

mod simple_value;
pub use simple_value::SimpleValue;
4 changes: 2 additions & 2 deletions fe2o3-amqp/src/endpoint/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ pub(crate) trait ReceiverLink: Link + LinkExt {
// More than one transfer frames should be hanlded by the
// `Receiver`
fn on_complete_transfer<'a, T, P>(
&'a mut self,
&mut self,
transfer: Transfer,
payload: P,
section_number: u32,
section_offset: u64,
) -> Result<Delivery<T>, Self::TransferError>
where
for<'de> T: FromBody<'de> + Send,
for<'b> P: IntoReader + AsByteIterator<'b> + Send + 'a;
P: IntoReader<'a> + AsByteIterator + Send + 'a;

async fn dispose(
&self,
Expand Down
24 changes: 20 additions & 4 deletions fe2o3-amqp/src/link/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ cfg_transaction! {
}

#[cfg(docsrs)]
use fe2o3_amqp_types::messaging::{AmqpSequence, AmqpValue, Batch, Body};
use fe2o3_amqp_types::messaging::{
primitives::{LazyValue, Value},
AmqpSequence, AmqpValue, Batch, Body,
};

/// Credit mode for the link
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -253,6 +256,15 @@ impl Receiver {
/// receiver.accept(&delivery).await.unwrap();
/// ```
///
/// Lazy deserialization is supported with [`Body<LazyValue>`]. The [`LazyValue`] type is a
/// a thin wrapper around the encoded bytes and can be deserialized lazily.
///
/// ```rust,ignore
/// // `Body<LazyValue>` covers all possibilities and can be deserialized lazily
/// let delivery: Delivery<Body<LazyValue>> = receiver.recv::<Body<LazyValue>>().await.unwrap();
/// receiver.accept(&delivery).await.unwrap();
/// ```
///
/// If the user is certain an [`AmqpValue`] body section is expected, then the user could use
/// [`AmqpValue<KnownType>`] if the exact message type `KnownType` is known and implements
/// [`serde::Deserialize`]. If the user is not sure about the exact message type, one could use
Expand Down Expand Up @@ -919,7 +931,7 @@ where
count_number_of_sections_and_offset(&payload);
let delivery = self.link.on_complete_transfer(
transfer,
payload,
&payload,
section_number,
section_offset,
)?;
Expand Down Expand Up @@ -969,8 +981,12 @@ where
None => {
let (section_number, section_offset) =
count_number_of_sections_and_offset(&payload);
self.link
.on_complete_transfer(transfer, payload, section_number, section_offset)?
self.link.on_complete_transfer(
transfer,
&payload,
section_number,
section_offset,
)?
}
};

Expand Down
14 changes: 7 additions & 7 deletions fe2o3-amqp/src/link/receiver_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ where
}

fn on_complete_transfer<'a, T, P>(
&'a mut self,
&mut self,
transfer: Transfer,
payload: P,
section_number: u32,
section_offset: u64,
) -> Result<Delivery<T>, Self::TransferError>
where
for<'de> T: FromBody<'de> + Send,
for<'b> P: IntoReader + AsByteIterator<'b> + Send + 'a,
P: IntoReader<'a> + AsByteIterator + Send + 'a,
{
match self.local_state {
LinkState::Attached | LinkState::IncompleteAttachExchanged => {}
Expand All @@ -158,7 +158,7 @@ where
let (result, mode) = if settled_by_sender {
// If the message is pre-settled, there is no need to
// add to the unsettled map and no need to reply to the Sender
let result = T::decode_into_message(payload.into_reader());
let result = T::decode_message_from_reader(payload.into_reader());
(result, None)
} else {
// If the message is being sent settled by the sender, the value of this
Expand All @@ -177,7 +177,7 @@ where
None => None,
};

let result = T::decode_into_message(payload.into_reader());
let result = T::decode_message_from_reader(payload.into_reader());

let state = DeliveryState::Received(Received {
section_number, // What is section number?
Expand Down Expand Up @@ -351,9 +351,9 @@ fn consecutive_chunk_indices(delivery_infos: &[DeliveryInfo]) -> Vec<usize> {
}

/// Count number of sections in encoded message
pub(crate) fn count_number_of_sections_and_offset<'a, B>(bytes: &'a B) -> (u32, u64)
pub(crate) fn count_number_of_sections_and_offset<'a, B>(bytes: B) -> (u32, u64)
where
B: AsByteIterator<'a>,
B: AsByteIterator + 'a,
{
let b0 = bytes.as_byte_iterator();
let len = b0.len();
Expand Down Expand Up @@ -957,7 +957,7 @@ mod tests {
// let mut serializer = serde_amqp::ser::Serializer::new(&mut buf);
// message.serialize(&mut serializer).unwrap();
let buf = to_vec(&Serializable(message)).unwrap();
let (_nums, _offset) = count_number_of_sections_and_offset(&buf);
let (_nums, _offset) = count_number_of_sections_and_offset(&*buf);
}

#[test]
Expand Down
Loading

0 comments on commit 3485cfc

Please sign in to comment.