Skip to content

Commit

Permalink
port to nom 8
Browse files Browse the repository at this point in the history
  • Loading branch information
Keruspe committed Apr 24, 2024
1 parent f90cfe2 commit c25820f
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 312 deletions.
2 changes: 1 addition & 1 deletion protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ default-features = false
features = ["std"]

[dependencies.nom]
version = "^7.0"
git="https://github.com/rust-bakery/nom"
features = ["std"]

[dependencies.serde]
Expand Down
55 changes: 29 additions & 26 deletions protocol/src/frame/parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,35 @@ use nom::{
bytes::streaming::{tag, take},
combinator::{all_consuming, flat_map, map, map_opt, map_res},
error::context,
sequence::{pair, tuple},
Parser,
};
use traits::ParsableInput;

/// Parse a channel id
pub fn parse_channel<I: ParsableInput>(i: I) -> ParserResult<I, AMQPChannel> {
context("parse_channel", map(parse_id, From::from))(i)
context("parse_channel", map(parse_id, From::from)).parse(i)
}

/// Parse the protocol header
pub fn parse_protocol_header<I: ParsableInput>(i: I) -> ParserResult<I, ProtocolVersion> {
context(
"parse_protocol_header",
map(
tuple((
(
tag(&metadata::NAME.as_bytes()[1..]),
tag(&[0][..]),
parse_short_short_uint,
parse_short_short_uint,
parse_short_short_uint,
)),
),
|(_, _, major, minor, revision)| ProtocolVersion {
major,
minor,
revision,
},
),
)(i)
)
.parse(i)
}

/// Parse the frame type
Expand All @@ -51,7 +52,8 @@ pub fn parse_frame_type<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrameType
constants::FRAME_HEARTBEAT => Some(AMQPFrameType::Heartbeat),
_ => None,
}),
)(i)
)
.parse(i)
}

/// Parse a full AMQP Frame (with contents)
Expand All @@ -61,7 +63,7 @@ pub fn parse_frame<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrame> {
flat_map(parse_frame_type, move |frame_type| {
move |i: I| match frame_type {
AMQPFrameType::ProtocolHeader => {
map(parse_protocol_header, AMQPFrame::ProtocolHeader)(i)
map(parse_protocol_header, AMQPFrame::ProtocolHeader).parse(i)
}
frame_type => map_res(
parse_raw_frame(frame_type),
Expand All @@ -74,20 +76,24 @@ pub fn parse_frame<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrame> {
AMQPFrameType::ProtocolHeader => {
Ok(AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1()))
}
AMQPFrameType::Method => all_consuming(parse_class)(payload)
AMQPFrameType::Method => all_consuming(parse_class)
.parse(payload)
.map(|(_, m)| AMQPFrame::Method(channel_id, m)),
AMQPFrameType::Header => all_consuming(parse_content_header)(payload)
AMQPFrameType::Header => all_consuming(parse_content_header)
.parse(payload)
.map(|(_, h)| AMQPFrame::Header(channel_id, h.class_id, Box::new(h))),
AMQPFrameType::Body => Ok(AMQPFrame::Body(
channel_id,
payload.iter_elements().collect(),
)),
AMQPFrameType::Heartbeat => Ok(AMQPFrame::Heartbeat(channel_id)),
},
)(i),
)
.parse(i),
}
}),
)(i)
)
.parse(i)
}

/// Parse a raw AMQP frame
Expand All @@ -97,19 +103,16 @@ pub fn parse_raw_frame<I: ParsableInput>(
move |i: I| {
context(
"parse_raw_frame",
flat_map(
pair(parse_id, parse_long_uint),
move |(channel_id, size)| {
map(
pair(take(size), tag(&[constants::FRAME_END][..])),
move |(payload, _)| AMQPRawFrame {
frame_type,
channel_id,
payload,
},
)
},
),
flat_map((parse_id, parse_long_uint), move |(channel_id, size)| {
map(
(take(size), tag(&[constants::FRAME_END][..])),
move |(payload, _)| AMQPRawFrame {
frame_type,
channel_id,
payload,
},
)
}),
)(i)
}
}
Expand All @@ -119,12 +122,12 @@ pub fn parse_content_header<I: ParsableInput>(i: I) -> ParserResult<I, AMQPConte
context(
"parse_content_header",
map(
tuple((
(
parse_id,
parse_short_uint,
parse_long_long_uint,
context("parse_properties", parse_properties),
)),
),
// FIXME: should we validate that weight is 0?
|(class_id, _weight, body_size, properties)| AMQPContentHeader {
class_id,
Expand Down
Loading

0 comments on commit c25820f

Please sign in to comment.