Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should a queue will be created automatically after first message is delivered to an exchange #3

Closed
marcio199226 opened this issue Mar 15, 2019 · 17 comments
Assignees
Labels
bug Something isn't working

Comments

@marcio199226
Copy link

marcio199226 commented Mar 15, 2019

Hi thanks for this awesome.

I have a question because i noticed that when I publish a new message to an queue that does not exists that queue is not created itself at the first message delivered so i have to create them manually from rabbitmq dashobard.

Is this the desired behaviour, isn't it??

P.S might you explain me what is for the publisherConfirms option?

@skif48
Copy link
Owner

skif48 commented Mar 15, 2019

Hello @marcio199226, thanks for your attention to the project!

Publisher class is abstracted away from queue creation, this class only works with exchanges, because that's the only thing RabbitMQ publishers should really work with - it's rarely a good practice to work directly with queues themselves.

What is your use case? Optional queue assertion and binding during publisher creation might be a nice feature request.

@skif48
Copy link
Owner

skif48 commented Mar 15, 2019

About publisher confirms you can read here, but to keep it short, it's a way for publisher to identify the fact that RabbitMQ did receive a message we just published.

If publisher confirmation fails for whatever reason, an instance of PublisherConfirmationError is thrown to the publisher actions stream. You can use instanceof to catch it.

@marcio199226
Copy link
Author

marcio199226 commented Mar 15, 2019

Ok thanks for explanation

i have no special use case for not using an exchange.

I have another question when i start to consume messages from a queue new exchange was created an example:

async function main() {
  const connectionFactory = new RabbitMqConnectionFactory();
  connectionFactory.setUri('amqp://localhost:5672');
  const connection = await connectionFactory.newConnection();
  const consumerFactory = new ConsumerFactory(connection);
  consumerFactory.setConfigs({
    noDeadLetterQueue: true,
    autoAck: false,
    prefetch: 1,
    queue: {
      name: 'accounts_input_google',
      durable: true,
    }
  });
  const consumer = await consumerFactory.newConsumer();

  consumer.startConsuming().subscribe({
    next: async (message) => {
      const payload = JSON.parse(message.content);
     // other stuff
    },
    complete: () => console.log('completed'),
    error: (error) => {
      console.error(error);
    }
  });
}

After a message was consumed i see exchange_accounts_input_google.

How can i prevent from creating new exchange?

@skif48
Copy link
Owner

skif48 commented Mar 16, 2019

Exchange is created by default and bound to your queue. If for some reason, you don't need an exchange, you can leverage a setup function. You can find an example here under With custom setup function section.

Not using an exchange is not a usual use case for RabbitMQ setup, therefore an exchange is created by default when you instantiate a Consumer instance. This exchange is later needed when you're going to publish messages.

@marcio199226
Copy link
Author

I will explain better you my rabbitmq setup.

I would have 1 exchange of direct type which has bindings based on routing keys which will routing those messages to queue with corresponding routing key.

The "issue" is when i have a consumer for queue "bar" it automatically created an exchange bar_exchange but in my case already exists an exchange so i wouldn't to create a new one for each queue the consumer is subscribed to.

Thanks for all replies i will look for custom setup function anyway or rewrite my consumers using raw ampqlib

@skif48
Copy link
Owner

skif48 commented Mar 18, 2019

@marcio199226 I see. Well, you can provide exchange info within consumer configs. Consumer class uses assertions, these operations are idempotent, meaning, they will either create a required resource or ensure its existence. Therefore if you provide necessary info about your exchanges and bindings to your consumer configs, not only it will work for you, but also you'll be sure that your setup is alright, before you start consuming.

@skif48
Copy link
Owner

skif48 commented Mar 18, 2019

@marcio199226 If you decide to use consumer configs for your case, please let me know if they don't fit your needs 🙂

@marcio199226
Copy link
Author

marcio199226 commented Mar 22, 2019

Hi, i have used custom setup function but it will throw an error:

const rabbit                    = require('nabbitmq');
const RabbitMqConnectionFactory = rabbit.RabbitMqConnectionFactory;
const ConsumerFactory           = rabbit.ConsumerFactory;

const connectionFactory = new RabbitMqConnectionFactory();
connectionFactory.setUri('amqp://localhost:5672');
const connection = await connectionFactory.newConnection();
const consumerFactory = new ConsumerFactory(connection);
consumerFactory.setCustomSetupFunction(async (connection) => {
const channel = await connection.createChannel();
await channel.assertExchange(`dynamic_exchange_name_${id}`, 'direct', { durable: true });
const queueMetadata = await channel.assertQueue(`dynamic_queue_name_${id}`, {
durable: true,
});

await channel.bindQueue(queueMetadata.queue, `dynamic_exchange_name_${id}`, `my.routing.key.${id}`);
await channel.prefetch(1);

return {channel, prefetch: 1, autoAck: false};
});
const consumer = await consumerFactory.newConsumer();

consumer.startConsuming().subscribe({
next: async (message) => {
// code
},
complete: () => console.log('completed'),
error: (error) => {
console.error(error);
}
});

After executing i have the following error:
node consumer.js (node:24241) UnhandledPromiseRejectionWarning: Error: Cannot read property 'queue' of null at ConsumerFactory.<anonymous> (/home/oskar/projects/instagram/node_modules/nabbitmq/lib/factories/consumer-factory.js:35:23) at Generator.throw (<anonymous>) at rejected (/home/oskar/projects/node_modules/nabbitmq/lib/factories/consumer-factory.js:5:65) at <anonymous> at process._tickCallback (internal/process/next_tick.js:189:7) (node:24241) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1) (node:24241) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

But the exchange is created successfully with right bindings and the queue is created too.

What i'm doing wrong ?

P.S i have the latest version installed 0.1.1

P.S2 seems that while constructing publisher/consumer factory some configs seems are still needed by them but configs & rawConfigs are sets to null, you should set them at least to an empty object i think beacuse of this for example: https://github.com/skif48/nabbitmq/blob/develop/src/models/publisher.ts#L164 when config = null https://github.com/skif48/nabbitmq/blob/develop/src/models/publisher.ts#L74

@skif48
Copy link
Owner

skif48 commented Mar 22, 2019

Hello again. Working on it @marcio199226, thanks for the snippet.

@skif48 skif48 self-assigned this Mar 24, 2019
@skif48 skif48 added the bug Something isn't working label Mar 24, 2019
skif48 added a commit that referenced this issue Mar 24, 2019
extend custom setup function interface and usage, fix bug from issue #3
@skif48 skif48 closed this as completed Mar 24, 2019
@skif48 skif48 reopened this Mar 24, 2019
@skif48
Copy link
Owner

skif48 commented Mar 24, 2019

@marcio199226 fixed in v 0.1.2. You can go ahead and npm install it. New version introduces a slightly different usage of custom setup function, namely - return object interface. The details can be found under the same section with function usage in readme. Let me know if anything goes wrong. 🙂

@skif48
Copy link
Owner

skif48 commented Apr 13, 2019

Hey @marcio199226. Is it still an issue for you? If not, I'll close this one. 🙂

@marcio199226
Copy link
Author

I noticed that when consuming messages if I try to get message's routing key through message.fields.routingKey I'm obtaining queue name instead of message's routing key

@skif48
Copy link
Owner

skif48 commented May 1, 2019

Hello @marcio199226!
Did you use custom setup function or you used consumer configs to set up consuming logic?

P.S. I just released version 1.0.0 which changed consumer and publisher configs a little. You can have a look here and here. Also there are examples updated in README.

@marcio199226
Copy link
Author

marcio199226 commented May 2, 2019

I have used a custom setup function

  consumerFactory.setCustomSetupFunction(async (connection) => {
    const channel = await connection.createChannel();
    await channel.assertExchange('exchange', 'topic', { durable: true });
    const queueMetadata = await channel.assertQueue('input', {
      durable: true,
      maxPriority: 10
    });

    await channel.bindQueue(queueMetadata.queue, 'exchange', 'account.*');
    await channel.prefetch(1);

    return {channel, prefetch: 1, autoAck: false, queue: 'input'};
  });

And after consuming some message, message.fields.routingKey returns my queue name "input" insteaf of currently messgae routingKey

I have used 0.1.2v

Best regards

@skif48
Copy link
Owner

skif48 commented May 2, 2019

@marcio199226 thanks for the snippet. Working on that.

@skif48
Copy link
Owner

skif48 commented May 2, 2019

@marcio199226 I think I know where the bug is, will publish a fix soon. Thank you very much!

@skif48
Copy link
Owner

skif48 commented May 2, 2019

I'll close this issue and move this conversation in another one: #7 .

@skif48 skif48 closed this as completed May 2, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants