Skip to content

Commit

Permalink
merging
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Oct 30, 2024
1 parent e06146d commit 3196f1c
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 20 deletions.
2 changes: 1 addition & 1 deletion tests/integration/client_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn client_create_stream_error_test() {

#[tokio::test(flavor = "multi_thread")]
async fn client_create_and_delete_super_stream_test() {
let _ = TestClient::create_super_stream().await;
let _test = TestClient::create_super_stream().await;
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
20 changes: 3 additions & 17 deletions tests/integration/consumer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,15 @@ async fn super_stream_consumer_test() {
for n in 0..message_count {
let msg = Message::builder().body(format!("message{}", n)).build();
let _ = super_stream_producer
.send(msg, |confirmation_status| async move {
println!("Message confirmed with status {:?}", confirmation_status);
})
.send(msg, |confirmation_status| async move {})
.await
.unwrap();
}

let mut received_messages = 0;
let handle = super_stream_consumer.handle();

println!("before looping");
while let delivery = super_stream_consumer.next().await.unwrap() {
println!("inside while delivery loop");
let d = delivery.unwrap();
println!(
"Got message: {:#?} from stream: {} with offset: {}",
d.message()
.data()
.map(|data| String::from_utf8(data.to_vec()).unwrap()),
d.stream(),
d.offset()
);

while let _ = super_stream_consumer.next().await.unwrap() {
received_messages = received_messages + 1;
if received_messages == 10 {
break;
Expand Down Expand Up @@ -406,7 +392,7 @@ async fn consumer_test_with_store_offset() {
// Store an offset
if i == offset_to_store {
//Store the 5th element produced
let _ = consumer_store
let _result = consumer_store
.store_offset(delivery.unwrap().offset())
.await;
}
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/producer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use rabbitmq_stream_client::{
use crate::common::{Countdown, TestEnvironment};

use rabbitmq_stream_client::types::{
HashRoutingMurmurStrategy, RoutingKeyRoutingStrategy,
RoutingStrategy,
HashRoutingMurmurStrategy, RoutingKeyRoutingStrategy, RoutingStrategy,
};

use std::sync::atomic::{AtomicU32, Ordering};
Expand Down

0 comments on commit 3196f1c

Please sign in to comment.