Skip to content

Commit

Permalink
fix(service-broker): event emitter implementation issue moleculerjs#1065
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton-Burdin committed Jun 1, 2022
1 parent 5e393f5 commit 4a1f7d1
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
2 changes: 2 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ module.exports = {
FAILED_SEND_PONG_PACKET: "failedSendPongPacket",
/** @type {String} Emitted when transit fails to send a HEARTBEAT packet*/
FAILED_SEND_HEARTBEAT_PACKET: "failedSendHeartbeatPacket",
/** @type {String} Emitted when broker fails to handler broadcast event*/
FAILED_HANDLER_BROADCAST_EVENT: "failedHandlerBroadcastEvent",
/** @type {String} Emitted when broker fails to stop all services*/
FAILED_STOPPING_SERVICES: "failedServicesStop",
/** @type {String} Emitted when broker fails to stop all services*/
Expand Down
19 changes: 18 additions & 1 deletion src/registry/event-catalog.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const utils = require("../utils");
const Strategies = require("../strategies");
const EndpointList = require("./endpoint-list");
const EventEndpoint = require("./endpoint-event");
const EventEmitter = require("events");
const { FAILED_HANDLER_BROADCAST_EVENT } = require("../constants");

/**
* Catalog for events
Expand All @@ -35,6 +37,21 @@ class EventCatalog {
this.events = [];

this.EndpointFactory = EventEndpoint;

this._localBus = new EventEmitter();
this._localBus.on("broker.event", ctx => {
ctx.endpoint.event
.handler(ctx)
.catch(error =>
this.broker.broadcastLocal("$broker.error", {
error,
module: "broker",
type: FAILED_HANDLER_BROADCAST_EVENT
})
)
// catch unresolved error
.catch(err => this.logger.error(err));
});
}

/**
Expand Down Expand Up @@ -205,7 +222,7 @@ class EventCatalog {
* @memberof EventCatalog
*/
callEventHandler(ctx) {
return ctx.endpoint.event.handler(ctx);
return this._localBus.emit("broker.event", ctx);
}

/**
Expand Down
22 changes: 12 additions & 10 deletions test/unit/registry/event-catalog.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ let EventCatalog = require("../../../src/registry/event-catalog");
let EndpointList = require("../../../src/registry/endpoint-list");
let EventEndpoint = require("../../../src/registry/endpoint-event");
let ServiceBroker = require("../../../src/service-broker");
const { protectReject } = require("../utils");

describe("Test EventCatalog constructor", () => {
let broker = new ServiceBroker({ logger: false });
Expand Down Expand Up @@ -521,38 +520,41 @@ describe("Test EventCatalog.callEventHandler", () => {
ctx.eventGroups = ["mail", "payment"];
ctx.eventType = "emit";

it("should add catch handler to result", () => {
it("should add catch handler to result", async () => {
let resolver;
ctx.endpoint.event.handler = jest.fn(() => new Promise(res => (resolver = res)));

const p = catalog.callEventHandler(ctx);
catalog.callEventHandler(ctx);

await broker.Promise.delay(10);

expect(ctx.endpoint.event.handler).toHaveBeenCalledTimes(1);
expect(ctx.endpoint.event.handler).toHaveBeenCalledWith(ctx);

expect(errorHandler).toHaveBeenCalledTimes(0);

resolver();

return p;
});

it("should catch error", () => {
it("should catch error", async () => {
let rejecter;

const spy = jest.spyOn(broker.localBus, "emit");
ctx.endpoint.event.handler = jest.fn(() => new Promise((res, rej) => (rejecter = rej)));
broker.logger.error = jest.fn();

const p = catalog.callEventHandler(ctx);
catalog.callEventHandler(ctx);

expect(ctx.endpoint.event.handler).toHaveBeenCalledTimes(1);
expect(ctx.endpoint.event.handler).toHaveBeenCalledWith(ctx);

const err = new Error("Something went wrong");
rejecter(err);

return p.then(protectReject).catch(e => {
expect(e).toBe(err);
});
await broker.Promise.delay(10);
expect(spy.mock.calls[0][1].error).toBe(err);

spy.mockRestore();
});
});

Expand Down

0 comments on commit 4a1f7d1

Please sign in to comment.