Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible error in broker.emit(). I cannot recognize that an event has been sent to the transport successfully. #1105

Closed
Vslava opened this issue Jul 17, 2022 · 10 comments

Comments

@Vslava
Copy link

Vslava commented Jul 17, 2022

If I invoke broker.emit(), I will never recognize that the event was sent to the transport successfully. The problem is that you return this.transit.sendEvent(ctx) or this.Promise.all(promises), where promises are the results of this.transit.sendEvent(newCtx), in ServiceBroker.emit(). Inside Transit.sendEvent(ctx), you return

return this.publish(
  new Packet(P.PACKET_EVENT, ctx.endpoint ? ctx.nodeID : null, {
    id: ctx.id,
    event: ctx.eventName,
    data: ctx.params,
    groups,
    broadcast: ctx.eventType == "broadcast",
    meta: ctx.meta,
    level: ctx.level,
    tracing: ctx.tracing,
    parentID: ctx.parentID,
    requestID: ctx.requestID,
    caller: ctx.caller,
    needAck: ctx.needAck
  })
).catch(
  /* istanbul ignore next */ err => {
    this.logger.error(`Unable to send '${ctx.eventName}' event to groups.`, err);

    this.broker.broadcastLocal("$transit.error", {
      error: err,
      module: "transit",
      type: C.FAILED_SEND_EVENT_PACKET
    });
  }
);

In such situation, the returned promise will never be rejected if the transporter exception has raised. I think it needs to save the result promise of publish method in a variable and return it, and apply catch to this promise too. Like

const publishResult = this.publish(
  new Packet(P.PACKET_EVENT, ctx.endpoint ? ctx.nodeID : null, {
    id: ctx.id,
    event: ctx.eventName,
    data: ctx.params,
    groups,
    broadcast: ctx.eventType == "broadcast",
    meta: ctx.meta,
    level: ctx.level,
    tracing: ctx.tracing,
    parentID: ctx.parentID,
    requestID: ctx.requestID,
    caller: ctx.caller,
    needAck: ctx.needAck
  })
);

publishResult.catch(
  /* istanbul ignore next */ err => {
    this.logger.error(`Unable to send '${ctx.eventName}' event to groups.`, err);

    this.broker.broadcastLocal("$transit.error", {
      error: err,
      module: "transit",
      type: C.FAILED_SEND_EVENT_PACKET
    });
  }
);

return publishResult;

It gives a possibility to recognize a problem with transport.

@Vslava Vslava changed the title Possible error in broker.emit() Possible error in broker.emit(). I can recognize that an event has been sent to the transport successfully. Jul 17, 2022
@Vslava Vslava changed the title Possible error in broker.emit(). I can recognize that an event has been sent to the transport successfully. Possible error in broker.emit(). I cannot recognize that an event has been sent to the transport successfully. Jul 22, 2022
@AndreMaz
Copy link
Member

Hi @Vslava can you please take a look at #1065 and see if it's somehow related to the issue that you've reported?

@Vslava
Copy link
Author

Vslava commented Jul 25, 2022

Hi @AndreMaz It seems no. They tell that the local emit request is waiting when the code is completed. I tell that I don't get any information about the emit has a problem while it is running. For example, if the transport get down and after that I emit an event, I will get that the operation has been resolved successfully, but I see in the log that actually the event hasn't been sent to the transport and I cannot react to the problem, for example, to resend the event again.

For example, I have two files. I use RabbitMQ as a transport.

--- reciver.js ---

const { Context, ServiceBroker } = require('moleculer');

const broker = new ServiceBroker({
  nodeID: 'node-1',
  transporter: 'amqp://rabbitmq:5672',
  logLevel: 'debug',
  serializer: 'CBOR',
});

broker.createService({
  name: 'test',
  events: {
    'created': {
      handler(ctx) {
        console.log('---- event is got');
      }
    },
  }
});

broker.start()
  .catch((err) => console.error('!!! Error 1 !!!', err));

--- sender.js ---

const util = require('util');
const { ServiceBroker } = require('moleculer');

const setTimeoutProm = util.promisify(setTimeout);

const broker = new ServiceBroker({
  nodeID: 'node-2',
  transporter: 'amqp://rabbitmq:5672',
  logLevel: 'debug',
  serializer: 'CBOR',
});

broker.start()
  .then(async () => {
    while (true) {
      broker.emit('created', { aaa: 111 })
        .then(() => { console.log('----- resolved'); })
        .catch((err) => { console.log('----- error', err); })

      await setTimeoutProm(2000);
    }
  })
  .catch((err) => console.error('!!! Error 2 !!!', err));

I started them and after that I shut down RabbitMQ.
On the screen for sender.js I got

[2022-07-25T01:45:20.971Z] INFO  node-2/BROKER: Moleculer v0.14.21 is starting...
[2022-07-25T01:45:20.972Z] INFO  node-2/BROKER: Namespace: <not defined>
[2022-07-25T01:45:20.972Z] INFO  node-2/BROKER: Node ID: node-2
[2022-07-25T01:45:20.973Z] INFO  node-2/REGISTRY: Strategy: RoundRobinStrategy
[2022-07-25T01:45:20.973Z] INFO  node-2/REGISTRY: Discoverer: LocalDiscoverer
[2022-07-25T01:45:20.978Z] INFO  node-2/BROKER: Serializer: CborSerializer
[2022-07-25T01:45:20.981Z] INFO  node-2/BROKER: Validator: FastestValidator
[2022-07-25T01:45:20.982Z] INFO  node-2/BROKER: Registered 13 middleware(s).
[2022-07-25T01:45:20.983Z] INFO  node-2/BROKER: Transporter: AmqpTransporter
[2022-07-25T01:45:20.989Z] DEBUG node-2/$NODE: Service '$node' is creating...
[2022-07-25T01:45:20.989Z] DEBUG node-2/$NODE: Service '$node' created.
[2022-07-25T01:45:20.990Z] INFO  node-2/TRANSIT: Connecting to the transporter...
[2022-07-25T01:45:21.023Z] INFO  node-2/TRANSPORTER: AMQP is connected.
[2022-07-25T01:45:21.025Z] INFO  node-2/TRANSPORTER: AMQP channel is created.
[2022-07-25T01:45:21.064Z] DEBUG node-2/BROKER: Broadcast '$services.changed' local event.
[2022-07-25T01:45:21.064Z] DEBUG node-2/BROKER: Broadcast '$node.connected' local event.
[2022-07-25T01:45:21.065Z] INFO  node-2/REGISTRY: Node 'node-1' connected.
[2022-07-25T01:45:21.563Z] DEBUG node-2/BROKER: Broadcast '$transporter.connected' local event.
[2022-07-25T01:45:21.564Z] DEBUG node-2/$NODE: Service '$node' is starting...
[2022-07-25T01:45:21.569Z] INFO  node-2/REGISTRY: '$node' service is registered.
[2022-07-25T01:45:21.569Z] DEBUG node-2/BROKER: Broadcast '$services.changed' local event.
[2022-07-25T01:45:21.569Z] INFO  node-2/$NODE: Service '$node' started.
[2022-07-25T01:45:21.569Z] DEBUG node-2/BROKER: Broadcast '$broker.started' local event.
[2022-07-25T01:45:21.570Z] INFO  node-2/BROKER: ✔ ServiceBroker with 1 service(s) started successfully in 580ms.
[2022-07-25T01:45:21.570Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:21.570Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
----- resolved
[2022-07-25T01:45:23.572Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:23.572Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
----- resolved
[2022-07-25T01:45:25.576Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:25.576Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
----- resolved
[2022-07-25T01:45:27.578Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:27.578Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
----- resolved
[2022-07-25T01:45:29.579Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:29.579Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
----- resolved
[2022-07-25T01:45:29.813Z] WARN  node-2/TRANSPORTER: AMQP channel is closed.
[2022-07-25T01:45:29.813Z] ERROR node-2/TRANSPORTER: AMQP connection is closed.
[2022-07-25T01:45:29.814Z] WARN  node-2/TRANSIT: Connection is failed. Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
[2022-07-25T01:45:29.814Z] DEBUG node-2/TRANSIT: Error: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
    at Object.accept (/app/node_modules/amqplib/lib/connection.js:91:15)
    at Connection.mainAccept (/app/node_modules/amqplib/lib/connection.js:64:33)
    at Socket.go (/app/node_modules/amqplib/lib/connection.js:478:48)
    at Socket.emit (node:events:390:28)
    at emitReadable_ (node:internal/streams/readable:578:12)
    at processTicksAndRejections (node:internal/process/task_queues:82:21) {
  code: 320
}
[2022-07-25T01:45:31.580Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:31.580Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
[2022-07-25T01:45:31.581Z] ERROR node-2/TRANSIT: Unable to send 'created' event to groups. [BrokerDisconnectedError: The broker's transporter has disconnected. Please try again when a connection is reestablished.] { code: 502, type: 'BAD_GATEWAY', data: undefined, retryable: true }
[2022-07-25T01:45:31.582Z] DEBUG node-2/BROKER: Broadcast '$transit.error' local event.
----- resolved
[2022-07-25T01:45:33.582Z] DEBUG node-2/BROKER: Emit 'created' event.
[2022-07-25T01:45:33.583Z] DEBUG node-2/TRANSIT: => Send 'created' event to 'node-1' node in 'test' group(s).
[2022-07-25T01:45:33.583Z] ERROR node-2/TRANSIT: Unable to send 'created' event to groups. [BrokerDisconnectedError: The broker's transporter has disconnected. Please try again when a connection is reestablished.] { code: 502, type: 'BAD_GATEWAY', data: undefined, retryable: true }
[2022-07-25T01:45:33.583Z] DEBUG node-2/BROKER: Broadcast '$transit.error' local event.
----- resolved

As we can see, the emit got failed but it returned that it had executed successfully because the promise was resolved, not rejected. I think that the emit result promise must be rejected in such situation.

@AndreMaz
Copy link
Member

AndreMaz commented Jul 26, 2022

I think that this need further discussion.

The overall idea is that events are fire-and-forget, i.e., you don't get any guarantees about the delivery of the event.

I see in the log that actually the event hasn't been sent to the transport and I cannot react to the problem, for example, to resend the event again.

Even if the event is sent again (and it reaches the transporter) there's still no guarantee that it will be delivered to the consumer. The producer does not know if the consumer is connected to the transporter or not. Not fully related but this kind of reminds me an old issue/RFC that we have moleculerjs/rfcs#2

Nevertheless, there are some internal events (e.g., $transit.error, $transporter.error) that can be used to detect issues with the transporter.

Overall, for guaranteed event delivery https://github.com/moleculerjs/moleculer-channels might be a good solution. It was designed to provide a "secure" and persistent event system.

In any case, @Vslava @Embraser01, since this issue and the #1065 are related I would like to hear your ideas about this.

What should be the behavior for the local and remote emit()?

@intech
Copy link
Member

intech commented Jul 27, 2022

@AndreMaz, this issue #1065 has a different issue. There, a local call to emit and broadcast occurs through await while waiting for a resolve Promise, so it turns to wait of executing all the code in the handler.

@intech
Copy link
Member

intech commented Jul 27, 2022

@Vslava the fact is that you need to separate two different systems.
Namely, the moleculer transport is a message broker that does not have a guaranteed delivery task, so it only is a publication of messages.
You can process sending messages asynchronously through events, which is the best option in node.js.
If you need message assurance, you must consider streaming systems (Kafka, JetStream, etc.) or message queuing systems (RabbitMQ, Bull, etc.).
And as Andre already said, the guarantee of delivery at least once is achieved due to the observation of consumers and ACK packets.
The implementation of ACK for events at the level of the moleculer transport protocol is redundant functionality because there is no built-in storage. We can use call instead of events, which implements a guaranteed call with ACK.

@AndreMaz
Copy link
Member

AndreMaz commented Jul 27, 2022

@AndreMaz, this issue #1065 has a different issue. There, a local call to emit and broadcast occurs through await while waiting for a resolve Promise, so it turns to wait of executing all the code in the handler.

You're right @intech it's a different issue but I think that emit() should have a consistent behavior regardless of being processed locally or remotely.

In the case of local emit(), that waits unit the handler finishes the processing, it is possible to catch an error. On the other hand, in remote emit() this is not possible. Ideally, the devs should expect the same behavior in both cases.

@Vslava
Copy link
Author

Vslava commented Jul 28, 2022

@AndreMaz @intech

The overall idea is that events are fire-and-forget, i.e., you don't get any guarantees about the delivery of the event.

I agree that there’s no guarantee that an event will be delivered because you don’t know that somebody is listening to the event or not. The event sender cannot know who is subscribed for the event.

However, a moleculer user has to be sure that an event will reach the transporter and be accepted for delivery and the event will be accepted by the consumer successfully but you don’t guarantee these all.

Now, if the transporter has a problem, the sender cannot recognize that this problem exists. broker.emit() doesn’t return any problem which it has bumped into. Such implementation doesn’t give the user any possibility to create a simple subsystem to resend events. Yes, Moleculer doesn’t have a built-in storage but it can give a possibility to the user to implement this storage on their side. You can reject the promise returned by broker.emit() if there is some problem when the event is being transmitted to the transporter. If the sender knows about the problem, the sender can react on the problem and resend the event.

As I understood, Moleculer doesn’t guarantee that the event will be accepted by the consumer successfully. If the event starts sending the event, the transporter forgets about the event. There is’t any ACK implementation which the cosumer can send to the transporter to confirm that the event has been accepted successfully. I think that not all transaporters have this feature but it can be implemented where possible.

About $transit.error, $transporter.error. How can I understand that an error was caused by the event I sent? Maybe my event was sent successfully and after that the error was happened. Especially It is important when the error is happend in the same time when I am sending the event. There is nothing inside the error except a message and a code. Maybe it is simpler to throw an exception from broker.emit() ?

About https://github.com/moleculerjs/moleculer-channels. Yes, it can be good choice but now It has a bug with RabbitMQ. It doesn’t recover connection with RabbitMQ when RabbitMQ got down and started again. I will make an issue about this. So It is not choice just now. And… with all respect but it is strange that Moleculer is a microservices framework and it has own feature for events but I need to use another module to use events instead of using native events out of box.

@icebob
Copy link
Member

icebob commented Aug 3, 2022

@Vslava as I see, your main problem is that in the catch case we didn't throw further the original error. So if we add a throw err into the catch block, the error goes to the caller and you can catch it on your code as well, right?

@AndreMaz
Copy link
Member

AndreMaz commented Aug 3, 2022

As @icebob suggested, adding a throw would propagate the error, allowing you to catch and react to it.

However, introducing this could be a breaking change. I'm saying could because I saw several times situation like

await ctx.emit("some.event", {data}) // <--- No catch

Throwing an error without the catch() would cause an unhandled promise rejection, causing the process to exit. I'm not discussing if this is a good (or bad) code, I'm just saying that I saw this situation several times. So this (breaking) change can only come in v0.15.

The workaround for you would be to overwrite the sendEvent() method of Transit class and throw the error further.

it is strange that Moleculer is a microservices framework and it has own feature for events but I need to use another module to use events instead of using native events out of box.

I think that this statement is a little unfair. We simply can't compete against project like NATS, Kafka, RabitMQ, etc. They have big budgets and lots of people dedicated to design a message delivery solution. I want to highlight, the sole purpose of those projects is reliable delivery and nothing more. Moleculer is a little bit different as it provides more features.

If we were to implement a custom (and reliable) event system, then the discussion would be different. We would be discussing the performance of our solution against NATS, Kafka, etc. Again, we can't compete with those projects without having the same resources as they have.

To summarize, I think that our decision of creating moleculer/channels was correct. Instead of competing with the previously mentioned solutions we provide a common and easy-to-use interface to interact with them, alongside with all the benefits that they provide.

@Vslava
Copy link
Author

Vslava commented Aug 4, 2022

@icebob Yes. You are right.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants