Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement super_stream #232

Merged
merged 28 commits into from
Oct 30, 2024
Merged

Implement super_stream #232

merged 28 commits into from
Oct 30, 2024

Conversation

DanielePalaia
Copy link
Contributor

@DanielePalaia DanielePalaia commented Oct 17, 2024

This closes #204

This PR implements the superstream functionality both for producer and consumer (For the moment it exclude the single active consumer implementation that will be implemented in a different PR).

It import the murmur3 dependency in order to use the same hashing algorithm also the other clients are using.

Examples are provided in examples folder: send_super_stream.rs and receive_super_stream.rs

}

let producer = self.1.get(route.as_str()).unwrap();
let _ = producer.send(message.clone(), cb.clone()).await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolf4ood I was wondering if we can limit the usage of clone() here for message and cb. I made some tests but apparently this is not so trivial

@DanielePalaia DanielePalaia force-pushed the implement_super_stream branch from 688ea68 to ffa07fb Compare October 19, 2024 14:34
@DanielePalaia DanielePalaia force-pushed the implement_super_stream branch 6 times, most recently from 9f574f7 to 0a325fe Compare October 23, 2024 15:58
Copy link

codecov bot commented Oct 23, 2024

Codecov Report

Attention: Patch coverage is 94.35484% with 14 lines in your changes missing coverage. Please review.

Project coverage is 89.75%. Comparing base (f2aca24) to head (2238a20).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/error.rs 0.00% 6 Missing ⚠️
src/consumer.rs 57.14% 3 Missing ⚠️
src/superstream_producer.rs 96.59% 3 Missing ⚠️
src/superstream_consumer.rs 97.43% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #232      +/-   ##
==========================================
+ Coverage   88.73%   89.75%   +1.02%     
==========================================
  Files          75       78       +3     
  Lines        6410     6658     +248     
==========================================
+ Hits         5688     5976     +288     
+ Misses        722      682      -40     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@DanielePalaia
Copy link
Contributor Author

Hey @Gsantomaggio @wolf4ood I know this is quite huge, but if you get time to start reviewing it when you have time it would be great. I created examples in the doc folders.

There are still a couple of small issues (written above) and probably the filtering part integration needs to be tested more carefully but I think we are in a good way now!

@DanielePalaia DanielePalaia force-pushed the implement_super_stream branch from 0a325fe to 0a3a6ec Compare October 24, 2024 08:22
@DanielePalaia DanielePalaia force-pushed the implement_super_stream branch from ed241ef to 153432e Compare October 25, 2024 08:21
@DanielePalaia DanielePalaia changed the title WIP: Implement super_stream Implement super_stream Oct 25, 2024
@DanielePalaia DanielePalaia force-pushed the implement_super_stream branch from 3325f06 to 19fa5bf Compare October 25, 2024 12:27
@Gsantomaggio
Copy link
Member

@DanielePalaia, the super-stream producer and super-stream consumer, seems to work correctly :)!

There is some problem with the connections.
Given a cluster ( I created it using https://github.com/rabbitmq/rabbitmq-stream-go-client and make rabbitmq-ha-proxy )

Changed the examples in this way:

    let environment = Environment::builder().port(5553).load_balancer_mode(true).build().await?;

Then:

 use std::io::{stdin,stdout,Write};
    let mut s=String::new();
    print!("Please enter close the super stream consumer");
    let _=stdout().flush();
    stdin().read_line(&mut s).expect("Did not enter a correct string");
    if let Some('\n')=s.chars().next_back() {
        s.pop();
    }


    let _ = super_stream_consumer.handle().close().await;

    print!("Please enter close the the example");
    let _=stdout().flush();
    stdin().read_line(&mut s).expect("Did not enter a correct string");
    if let Some('\n')=s.chars().next_back() {
        s.pop();
    }


    println!("Super stream consumer stopped");

Same for the producer:

  use std::io::{stdin,stdout,Write};
    let mut s=String::new();
    print!("Please enter close the super stream producer");
    let _=stdout().flush();
    stdin().read_line(&mut s).expect("Did not enter a correct string");
    if let Some('\n')=s.chars().next_back() {
        s.pop();
    }

    let _ = super_stream_producer.close().await;

    print!("Please enter close the example");
    let _=stdout().flush();
    stdin().read_line(&mut s).expect("Did not enter a correct string");
    if let Some('\n')=s.chars().next_back() {
        s.pop();
    }
    Ok(())

When the close() is called there are pending connections:

Screenshot 2024-10-28 at 11 41 22

But all these connections are not linked to any stream.

All the connections should be closed.

@Gsantomaggio
Copy link
Member

Ok, with this PR #235, the pending connections problem is solved.

There is still some problem with the load-banacer part #236
but we can solve in another PR.

Great job @DanielePalaia and Thank you @wolf4ood for the review!

@DanielePalaia DanielePalaia force-pushed the implement_super_stream branch 2 times, most recently from cc4f663 to 924c9eb Compare October 30, 2024 09:28
@DanielePalaia DanielePalaia force-pushed the implement_super_stream branch 3 times, most recently from d8dab9d to 6183315 Compare October 30, 2024 10:29
@DanielePalaia DanielePalaia force-pushed the implement_super_stream branch from 6183315 to 2238a20 Compare October 30, 2024 10:30
@DanielePalaia DanielePalaia merged commit 9f5c488 into main Oct 30, 2024
4 checks passed
@DanielePalaia DanielePalaia deleted the implement_super_stream branch October 30, 2024 13:03
@github-actions github-actions bot mentioned this pull request Oct 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement SuperStream feature
3 participants