Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality to browse queues #412

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

astro-phage
Copy link

Relevant for ticket #35

namespace ActiveMQ.Artemis.Client
{
public interface IBrowser : IEnumerator<Message>, IEnumerable<Message>, IAsyncDisposable
{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather expect this interface to expose a method that returns IAsyncEnumerable:

IAsyncEnumerable<Message> ReceiveAllAsync(CancellationToken cancellationToken);

Than implement IEnumerable itself.

As a side note, you cannot implement IEnumerable without blocking.

A naive implementation might look as follows:

public async IAsyncEnumerable<Message> ReceiveAllAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        yield return await _consumer.ReceiveAsync(cancellationToken).ConfigureAwait(false);
        // we probably need to ack message, so the browser won't block after we run out of the consumer credit
    }
}

But I'm not sure that reusing IConsumer to do all the heavy-lifting is the right call. I'd be more inclined to do sth similar to what was done in RequestReplyClient.

@@ -25,11 +25,12 @@ public interface IConnection : IAsyncDisposable
/// </summary>
bool IsOpened { get; }
Task<ITopologyManager> CreateTopologyManagerAsync(CancellationToken cancellationToken = default);
Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken = default);
Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken = default, bool isBrowser = false);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IConsumer != IBrowser. They have different characteristics. I wouldn't pretend they represent the same thing and try to shoehorn them under a single interface umbrella.

Task<IProducer> CreateProducerAsync(ProducerConfiguration configuration, CancellationToken cancellationToken = default);
Task<IAnonymousProducer> CreateAnonymousProducerAsync(AnonymousProducerConfiguration configuration, CancellationToken cancellationToken = default);
Task<IRequestReplyClient> CreateRequestReplyClientAsync(RequestReplyClientConfiguration configuration, CancellationToken cancellationToken = default);

Task<IBrowser> CreateBrowserAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken = default);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need a special BrowserConfiguration type.

A big chunk of properties from ConsumerConfiguration doesn't make sense in a browser scenario:

I think we don't need RoutingType as the queue should already exist, Credit also seems redundant as there is no api to acknowledge message, Shared, Durable.

I'm not sure about NoLocalFilter. It definitely works for message browser scenario, but I'm not sure if it makes sense to expose it as part of the public api.

try
{
_current = null;
_current = _consumer.ReceiveAsync(token).Result;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want blocking on async call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants