Skip to content

Commit

Permalink
implementing route and partition commands
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Oct 8, 2024
1 parent 551df26 commit b3e62b9
Show file tree
Hide file tree
Showing 7 changed files with 392 additions and 10 deletions.
2 changes: 1 addition & 1 deletion protocol/src/commands/create_super_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Command for CreateSuperStreamCommand {

#[cfg(test)]
mod tests {
use crate::commands::create_stream::CreateStreamCommand;

use crate::commands::tests::command_encode_decode_test;

use super::CreateSuperStreamCommand;
Expand Down
2 changes: 2 additions & 0 deletions protocol/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub mod sasl_authenticate;
pub mod sasl_handshake;
pub mod store_offset;
pub mod subscribe;
pub mod superstream_partitions;
pub mod superstream_route;
pub mod tune;
pub mod unsubscribe;

Expand Down
145 changes: 145 additions & 0 deletions protocol/src/commands/superstream_partitions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::io::Write;

#[cfg(test)]
use fake::Fake;

use crate::{
codec::{Decoder, Encoder},
error::{DecodeError, EncodeError},
protocol::commands::COMMAND_PARTITIONS,
FromResponse, ResponseCode,
};

use super::Command;

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct SuperStreamPartitionsRequest {
correlation_id: u32,
super_stream: String,
}

impl SuperStreamPartitionsRequest {
pub fn new(correlation_id: u32, super_stream: String) -> Self {
Self {
correlation_id,
super_stream,
}
}
}

impl Encoder for SuperStreamPartitionsRequest {
fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size() + self.super_stream.as_str().encoded_size()
}

fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.super_stream.as_str().encode(writer)?;
Ok(())
}
}

impl Decoder for SuperStreamPartitionsRequest {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, super_stream) = Option::decode(input)?;

Ok((
input,
SuperStreamPartitionsRequest {
correlation_id,
super_stream: super_stream.unwrap(),
},
))
}
}

impl Command for SuperStreamPartitionsRequest {
fn key(&self) -> u16 {
COMMAND_PARTITIONS
}
}

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct SuperStreamPartitionsResponse {
pub(crate) correlation_id: u32,
response_code: ResponseCode,
streams: Vec<String>,
}

impl SuperStreamPartitionsResponse {
pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
Self {
correlation_id,
response_code,
streams,
}
}
pub fn is_ok(&self) -> bool {
self.response_code == ResponseCode::Ok
}
}

impl Encoder for SuperStreamPartitionsResponse {
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.streams.encode(writer)?;
self.response_code.encode(writer)?;
Ok(())
}

fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size()
+ self.streams.encoded_size()
+ self.response_code.encoded_size()
}
}

impl Decoder for SuperStreamPartitionsResponse {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, streams) = Vec::decode(input)?;
let (input, response_code) = ResponseCode::decode(input)?;

Ok((
input,
SuperStreamPartitionsResponse {
correlation_id,
response_code,
streams,
},
))
}
}

impl FromResponse for SuperStreamPartitionsResponse {
fn from_response(response: crate::Response) -> Option<Self> {
match response.kind {
crate::ResponseKind::SuperStreamPartitions(partitions_response) => {
Some(partitions_response)
}
_ => None,
}
}
}

#[cfg(test)]
mod tests {

use crate::commands::tests::command_encode_decode_test;

use super::SuperStreamPartitionsRequest;
use super::SuperStreamPartitionsResponse;

#[test]
fn super_stream_partition_request_test() {
command_encode_decode_test::<SuperStreamPartitionsRequest>();
}

#[test]
fn super_stream_partition_response_test() {
command_encode_decode_test::<SuperStreamPartitionsResponse>();
}
}
150 changes: 150 additions & 0 deletions protocol/src/commands/superstream_route.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use std::io::Write;

#[cfg(test)]
use fake::Fake;

use crate::{
codec::{Decoder, Encoder},
error::{DecodeError, EncodeError},
protocol::commands::COMMAND_ROUTE,
FromResponse, ResponseCode,
};

use super::Command;

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct SuperStreamRouteRequest {
correlation_id: u32,
routing_key: String,
super_stream: String,
}

impl SuperStreamRouteRequest {
pub fn new(correlation_id: u32, routing_key: String, super_stream: String) -> Self {
Self {
correlation_id,
routing_key,
super_stream,
}
}
}

impl Encoder for SuperStreamRouteRequest {
fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size()
+ self.routing_key.as_str().encoded_size()
+ self.super_stream.as_str().encoded_size()
}

fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.routing_key.as_str().encode(writer)?;
self.super_stream.as_str().encode(writer)?;
Ok(())
}
}

impl Decoder for SuperStreamRouteRequest {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, routing_key) = Option::decode(input)?;
let (input, super_stream) = Option::decode(input)?;

Ok((
input,
SuperStreamRouteRequest {
correlation_id,
routing_key: routing_key.unwrap(),
super_stream: super_stream.unwrap(),
},
))
}
}

impl Command for SuperStreamRouteRequest {
fn key(&self) -> u16 {
COMMAND_ROUTE
}
}

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct SuperStreamRouteResponse {
pub(crate) correlation_id: u32,
response_code: ResponseCode,
streams: Vec<String>,
}

impl SuperStreamRouteResponse {
pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
Self {
correlation_id,
response_code,
streams,
}
}
pub fn is_ok(&self) -> bool {
self.response_code == ResponseCode::Ok
}
}

impl Encoder for SuperStreamRouteResponse {
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.response_code.encode(writer)?;
self.streams.encode(writer)?;
Ok(())
}

fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size()
+ self.streams.encoded_size()
+ self.response_code.encoded_size()
}
}

impl Decoder for SuperStreamRouteResponse {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, response_code) = ResponseCode::decode(input)?;
let (input, streams) = Vec::decode(input)?;

Ok((
input,
SuperStreamRouteResponse {
correlation_id,
response_code,
streams,
},
))
}
}

impl FromResponse for SuperStreamRouteResponse {
fn from_response(response: crate::Response) -> Option<Self> {
match response.kind {
crate::ResponseKind::SuperStreamRoute(route) => Some(route),
_ => None,
}
}
}

#[cfg(test)]
mod tests {

use crate::commands::tests::command_encode_decode_test;

use super::SuperStreamRouteRequest;
use super::SuperStreamRouteResponse;

#[test]
fn super_stream_route_request_test() {
command_encode_decode_test::<SuperStreamRouteRequest>();
}

#[test]
fn super_stream_route_response_test() {
command_encode_decode_test::<SuperStreamRouteResponse>();
}
}
34 changes: 32 additions & 2 deletions protocol/src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use crate::{
publish::PublishCommand, query_offset::QueryOffsetRequest,
query_publisher_sequence::QueryPublisherRequest,
sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand,
store_offset::StoreOffset, subscribe::SubscribeCommand, tune::TunesCommand,
store_offset::StoreOffset, subscribe::SubscribeCommand,
superstream_partitions::SuperStreamPartitionsRequest,
superstream_route::SuperStreamRouteRequest, tune::TunesCommand,
unsubscribe::UnSubscribeCommand,
},
error::{DecodeError, EncodeError},
Expand Down Expand Up @@ -64,6 +66,8 @@ pub enum RequestKind {
ExchangeCommandVersions(ExchangeCommandVersionsRequest),
CreateSuperStream(CreateSuperStreamCommand),
DeleteSuperStream(DeleteSuperStreamCommand),
SuperStreamPartitions(SuperStreamPartitionsRequest),
SuperStreamRoute(SuperStreamRouteRequest),
}

impl Encoder for RequestKind {
Expand Down Expand Up @@ -97,6 +101,10 @@ impl Encoder for RequestKind {
RequestKind::DeleteSuperStream(delete_super_stream) => {
delete_super_stream.encoded_size()
}
RequestKind::SuperStreamPartitions(super_stream_partitions) => {
super_stream_partitions.encoded_size()
}
RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encoded_size(),
}
}

Expand Down Expand Up @@ -130,6 +138,10 @@ impl Encoder for RequestKind {
RequestKind::DeleteSuperStream(delete_super_stream) => {
delete_super_stream.encode(writer)
}
RequestKind::SuperStreamPartitions(super_stream_partition) => {
super_stream_partition.encode(writer)
}
RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encode(writer),
}
}
}
Expand Down Expand Up @@ -204,6 +216,12 @@ impl Decoder for Request {
COMMAND_DELETE_SUPER_STREAM => {
DeleteSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
}
COMMAND_PARTITIONS => {
SuperStreamPartitionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
}
COMMAND_ROUTE => {
SuperStreamRouteRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
}
n => return Err(DecodeError::UnsupportedResponseType(n)),
};
Ok((input, Request { header, kind: cmd }))
Expand All @@ -226,7 +244,9 @@ mod tests {
peer_properties::PeerPropertiesCommand, publish::PublishCommand,
query_offset::QueryOffsetRequest, query_publisher_sequence::QueryPublisherRequest,
sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand,
store_offset::StoreOffset, subscribe::SubscribeCommand, tune::TunesCommand,
store_offset::StoreOffset, subscribe::SubscribeCommand,
superstream_partitions::SuperStreamPartitionsRequest,
superstream_route::SuperStreamRouteRequest, tune::TunesCommand,
unsubscribe::UnSubscribeCommand, Command,
},
};
Expand Down Expand Up @@ -357,4 +377,14 @@ mod tests {
fn request_delete_super_stream_test() {
request_encode_decode_test::<DeleteSuperStreamCommand>()
}

#[test]
fn request_partitions_command() {
request_encode_decode_test::<SuperStreamPartitionsRequest>()
}

#[test]
fn request_route_command() {
request_encode_decode_test::<SuperStreamRouteRequest>()
}
}
Loading

0 comments on commit b3e62b9

Please sign in to comment.