Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when use @RabbitRPC in one controller and listten exchange and queue that same for all actions with diffirent routing keys #751

Open
sur-ser opened this issue Jun 28, 2024 · 12 comments

Comments

@sur-ser
Copy link

sur-ser commented Jun 28, 2024

when i use RabbitRPC in one controller with multiple actions like this

@RabbitRPC({
    exchange: AuthRegister.exchange,
    routingKey: AuthRegister.topic,
    queue: AuthRegister.queue,
    errorBehavior: MessageHandlerErrorBehavior.NACK,
    errorHandler: rpcErrorHandler,
  })
  async register(@Body() dto: AuthRegister.Request) : Promise<AuthRegister.Response> {
    const newUser = await this.authService.register(dto);
    return { email: newUser.email };
  }


  @RabbitRPC({
    exchange: AuthJWTLogin.exchange,
    routingKey: AuthJWTLogin.topic,
    queue: AuthJWTLogin.queue,
    errorBehavior: MessageHandlerErrorBehavior.NACK,
    errorHandler: rpcErrorHandler,
  })
  async login(@Body() dto: AuthJWTLogin.Request): Promise<AuthJWTLogin.Response> {
    const { id } = await this.authService.validateUser(dto.email, dto.password);
    return this.authService.login(id);
  }

and create request like this

return await this.amqpConnection.request<AuthRegister.Response>({
       exchange: AuthRegister.exchange,
       routingKey: AuthRegister.topic,
       payload: dto,
       timeout: 10000
     })
  one time it works fine and second time i have got error like this

[Nest] 119672 - 06/27/2024, 7:57:17 PM ERROR [AmqpConnection] Received message with invalid routing key: sso.auth.register.command

but if i keep one action it works fine or when i change que name and keep one routing key in queue it works fine

my exchange is topic
and in controller exchange and queue is same for all actions only routing key is diffirent

@sur-ser sur-ser changed the title Erro when publish in same exchange same queue with diffirent routing key using RPC Error when use @RabbitRPC in one controller and listten exchange and queue that same for all actions with diffirent routing keys Jun 28, 2024
@slyk
Copy link

slyk commented Jul 20, 2024

Same for me.
v 4.1 works fine.
v 5 gives errors. I found that the matchesRoutingKey() function is called ant tries to match the message routing key always with different handlers. So it is not taking messages from queue that match given routing keys, it just takes all messages from queue and tries to match them with a random routingKey from a file.

In my project I have three @RabbitRPC() listeners in one file and I sometimes get listener working ok and some times errors with "invalid routing key".

Here is my code and errors as example:

@RabbitRPC({
    exchange: 'amq.topic',      //using default exchange for topics
    routingKey: 'user.upsert',  //when message has this routing key (topic)
    queue: 'user',           //message will be sent to this queue and proceed by this service
  })
@RabbitRPC({
    exchange: 'amq.topic',          //using default exchange for topics
    routingKey: 'user.find.byphone',//when client wants to find user by phone
    queue: 'user',                  //message will be sent to this queue and proceed by this service
    allowNonJsonMessages: true,     //we allow non-json messages (like strings, numbers, etc)
  })
@RabbitRPC({
    exchange: 'amq.topic',          //using default exchange for topics
    routingKey: 'user.get.byuuid',  //when client wants to get user by uuid
    queue: 'user',                  //message will be sent to this queue and proceed by this service
  })

and here is log when I send the same message to queue:
image

"ret: ...uuid... " - is the right processing of the listener. As you can see (I've modified matcher a little to see what its trying to match) there always different matcher for same routingKey. Is it by design? :)

P.S. Also while I was looking for problem I saw project with same problem (https://github.com/pavlokobyliatskyi/demo-chat/), he just tell people to use old version :) but maybe some fix can be introduced to new versions? Interesting that not many people have this.

@slyk
Copy link

slyk commented Jul 20, 2024

The error message is in "connections.ts", looking at the v4 code in that place (line 535) instead of error there were just nack() so it was just grabbing message, check for routing key and putting it back for each of @RabbitRPC() call while it looking for perfect match?

if (rpcOptions.routingKey !== msg.fields.routingKey) {
            channel.nack(msg, false, true);
            return;
          }

and in v5 there are more complex check because now routingKey could be an array, but nack() function called with requeue option "false" so we get error and lost message.

if (
            !matchesRoutingKey(msg.fields.routingKey, rpcOptions.routingKey)
          ) {
            channel.nack(msg, false, false);
            this.logger.error(
              'Received message with invalid routing key: ' +
                msg.fields.routingKey
            );
            return;
          }

https://github.com/golevelup/nestjs/blob/master/packages/rabbitmq/src/amqp/connection.ts

I'm new to rabbit. What is the right way to handle messages from queue that has specific routingKey?

Quick fix would be set requeue back to 'true' in nack(), but then we would have many errors in log... and maybe there should be some other logic?

@slyk
Copy link

slyk commented Jul 21, 2024

Seems like good practice is to make different queues for this messages, so I changed my code and got more queues, it works and for my small first app just to became familiar with rabbitMQ its ok :)

@RabbitRPC({
    exchange: 'amq.direct',      // DIRECT here instead of topic
    routingKey: 'user.upsert',  
    queue: 'user.upsert',         // name of the qeue that is same as reoutingKey
  })

@sur-ser
Copy link
Author

sur-ser commented Aug 14, 2024

While creating separate queues for every aspect of the project might seem like a quick fix, it defeats the purpose of having routing keys in the first place. The whole idea behind using routing keys is to prevent the chaos that would arise from having too many queues, especially in large projects with 50-70 microservices. If every interaction requires a separate queue, the system becomes overly complex and hard to manage. In many cases, it’s much more efficient and organized to use different routing keys to read from the same queue, which allows for better scalability and maintainability.

@WonderPanda
Copy link
Collaborator

While creating separate queues for every aspect of the project might seem like a quick fix, it defeats the purpose of having routing keys in the first place. The whole idea behind using routing keys is to prevent the chaos that would arise from having too many queues, especially in large projects with 50-70 microservices. If every interaction requires a separate queue, the system becomes overly complex and hard to manage. In many cases, it’s much more efficient and organized to use different routing keys to read from the same queue, which allows for better scalability and maintainability.

You're right, this is definitely sensible behavior. We're definitely open to contributions to improve the handling of routing keys

@slyk
Copy link

slyk commented Aug 14, 2024

While creating separate queues for every aspect of the project might seem like a quick fix, it defeats the purpose of having routing keys in the first place. The whole idea behind using routing keys is to prevent the chaos that would arise from having too many queues, especially in large projects with 50-70 microservices. If every interaction requires a separate queue, the system becomes overly complex and hard to manage. In many cases, it’s much more efficient and organized to use different routing keys to read from the same queue, which allows for better scalability and maintainability.

Don't see API in rabbit MQ that could allow us to get/subscribe only to some routing keys in queue.

So first consumer will receive the first message, if its not match needed routing key we should nack it with requeue.

Each message could go to each consumer while searching for match. So for many routing keys and consumers we will create many useless data flow...

Also here is quote from docs:
When there is only one consumer on a queue, make sure you do not create infinite message delivery loops by rejecting and requeueing a message from the same consumer over and over again.

So if needed consumer service is down now other consumers will infinitely get the message, nack it back and so on.

So one queue for one consumer seems like bt design feature of rabbitMQ (or amqp).

The one idea that comes to me to handle our request is to handle routing of the messge on nestjs side according to routing key. Some wrapper function that knows all functions in current app that are waiting for the message with specified routing key and will send message to that function only.

@sur-ser
Copy link
Author

sur-ser commented Aug 27, 2024

thank you for sharing your thoughts, but unfortunately, you're mistaken in your understanding of how RabbitMQ works. The main idea behind using routing keys in RabbitMQ is precisely that different messages going into the same queue can be processed by different consumers depending on the matching routing keys.

In RabbitMQ, consumers can listen to the same queue but only receive messages that match their specific routing key. This allows one queue to be efficiently used by multiple consumers, each processing only the messages relevant to them, which is the core concept of routing keys in the AMQP protocol.

I opened this issue not because I want new functionality, but because there’s a bug in the current version of the nestjs-rabbitmq package that prevents routing keys from working correctly. In version 5, changes were made to the message handling logic, causing messages to be incorrectly processed and leading to errors. In version 4, everything worked as expected.

What you’re suggesting—creating separate queues for each routing key—might be a workaround, but it goes against the principle of using routing keys and leads to unnecessary system complexity. My goal is to address the issue in the current implementation, not to introduce new functionality.

@underfisk
Copy link
Contributor

@sur-ser If you have the time/willing to help us by contributing with a proper fix for this matter we would appreciate 🙏

@upundefined
Copy link

Any news?

After this commit it stopped working
#712

@slyk
Copy link

slyk commented Oct 2, 2024

Any news?

After this commit it stopped working #712

Seems like not everybody need it.
Maybe another issue with feature request tag will help to understand how many people really need this and someone could decide to implement it to public (for bounty?)

For my small project I've decided to split to bigger number of queues.

As far as I read amqp 0-9-1 specs there is only one mention of "message selectors" that could help us to not create endless loop of nack(), but no info how this should be implemented and seems like rabbitMQ does not have this in API. And we should use its API in current lib...

So if this will be the real problem in my project, I guess we should implement logic on nestjs side but rabbitMQ still wil give us ALL messages from queue in FIFO order, we just search for functions by routing_key and if nothing found we should nack()

In RabbitMQ, consumers can listen to the same queue but only receive messages that match their specific routing key.

This would be good, but I can't find that type of functionality described nor in amqp protocol specs nor in rabbitMQ. All filtering with rouiting_key end up in exchange when we bind it to queue(s). The exchange is smart part that understant logic... the queue can only FIFO to consumer.

@upundefined
Copy link

I found a library that uses a single queue, it is specified when initializing the module with the queueName parameter
https://github.com/AlariCode/nestjs-rmq
https://github.com/AlariCode/nestjs-rmq/blob/3ef5ecc62b1e6e19254a4ae76288a35da96c463c/lib/rmq.service.ts#L227

@PierreKiwi
Copy link

Hello!
I have started to use this library today and I am facing this issue too.

I have manually modified to the previous behavior (pre #713), and now it does work as expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants