From 8c1af7436e9d3314d020fafc4bc0526876ddd902 Mon Sep 17 00:00:00 2001 From: Paramjeet Date: Fri, 23 Jun 2023 17:13:42 +0530 Subject: [PATCH 1/2] feat: added traceId, reqStartTime support in meta --- lib/rabbit.d.ts | 7 ++++++- lib/rabbit.js | 24 ++++++++++++++++++++---- src/rabbit.ts | 15 ++++++++++++++- 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/lib/rabbit.d.ts b/lib/rabbit.d.ts index 590b2ea..f548b46 100644 --- a/lib/rabbit.d.ts +++ b/lib/rabbit.d.ts @@ -17,6 +17,8 @@ interface SubscribeCallbackArgs { replyTo: any; rKey: string; correlationId: any; + traceId: any; + reqStartTime: any; ack: () => void; nack: () => void; } @@ -115,11 +117,14 @@ declare class Rabbit { * @param {Object} options - the name of the reply queue, correlationId * @param {string} options.replyTo - the name of reply queue * @param {string} options.correlationId + * @param {Object} extra - extra message properties like traceId, reqStartTime + * @param {string} extra.traceId + * @param {string} extra.reqStartTime - the time when request initiated in our system * @param {boolean} handle=true - handle the effect to * * @return {Promise} */ - send(qname: string, message: Record, options: Options.Publish, handle?: boolean): Promise; + send(qname: string, message: Record, options: Options.Publish, extra?: Record, handle?: boolean): Promise; /** * Get a message from a queue, just get it * diff --git a/lib/rabbit.js b/lib/rabbit.js index bfa3cee..2e121be 100644 --- a/lib/rabbit.js +++ b/lib/rabbit.js @@ -8,6 +8,12 @@ const safely_parse_json_1 = __importDefault(require("safely-parse-json")); * @class Rabbit */ class Rabbit { + name; + emitter; + config; + queues; + client; + channel; /** * @param {string} name - unique name to this service * @param {EventEmitter} emitter @@ -206,10 +212,12 @@ class Rabbit { this.log(`Subscribin to ${qid}`); return ch.consume(qid, msg => { const message = { - content: safely_parse_json_1.default(msg.content.toString()), + content: (0, safely_parse_json_1.default)(msg.content.toString()), replyTo: msg.properties.replyTo, rKey: msg.fields.routingKey, correlationId: msg.properties.correlationId, + traceId: msg.properties.headers.traceId, + reqStartTime: msg.properties.headers.reqStartTime, ack: () => { q.channel.ack(msg); }, @@ -269,11 +277,19 @@ class Rabbit { * @param {Object} options - the name of the reply queue, correlationId * @param {string} options.replyTo - the name of reply queue * @param {string} options.correlationId + * @param {Object} extra - extra message properties like traceId, reqStartTime + * @param {string} extra.traceId + * @param {string} extra.reqStartTime - the time when request initiated in our system * @param {boolean} handle=true - handle the effect to * * @return {Promise} */ - send(qname, message, options, handle = true) { + send(qname, message, options, extra = {}, handle = true) { + options.headers = { + ...options.headers, + traceId: extra.traceId, + reqStartTime: extra.reqStartTime + }; const p = this.channel.sendToQueue(qname, message, options); if (handle === false) { return p; @@ -301,8 +317,8 @@ class Rabbit { if (message === false) { throw new Error(`No message on ${qname}`); } - return safely_parse_json_1.default(message.content.toString()); + return (0, safely_parse_json_1.default)(message.content.toString()); } } module.exports = Rabbit; -//# sourceMappingURL=data:application/json;base64, \ No newline at end of file +//# sourceMappingURL=data:application/json;base64, \ No newline at end of file diff --git a/src/rabbit.ts b/src/rabbit.ts index 4e37673..4767d33 100644 --- a/src/rabbit.ts +++ b/src/rabbit.ts @@ -20,6 +20,8 @@ interface SubscribeCallbackArgs { replyTo: any; rKey: string; correlationId: any; + traceId: any; + reqStartTime: any; ack: () => void; nack: () => void; } @@ -251,6 +253,8 @@ class Rabbit { replyTo: msg.properties.replyTo, rKey: msg.fields.routingKey, correlationId: msg.properties.correlationId, + traceId: msg.properties.headers.traceId, + reqStartTime: msg.properties.headers.reqStartTime, ack: () => { q.channel.ack(msg); }, @@ -313,11 +317,20 @@ class Rabbit { * @param {Object} options - the name of the reply queue, correlationId * @param {string} options.replyTo - the name of reply queue * @param {string} options.correlationId + * @param {Object} extra - extra message properties like traceId, reqStartTime + * @param {string} extra.traceId + * @param {string} extra.reqStartTime - the time when request initiated in our system * @param {boolean} handle=true - handle the effect to * * @return {Promise} */ - send(qname: string, message: Record, options: Options.Publish, handle = true) { + send(qname: string, message: Record, options: Options.Publish, extra: Record = {}, handle = true) { + options.headers = { + ...options.headers, + traceId: extra.traceId, + reqStartTime: extra.reqStartTime + }; + const p = this.channel.sendToQueue(qname, message, options); if (handle === false) { return p; From 2e79b30192aea1a6ef963b92cc7a8ba018914851 Mon Sep 17 00:00:00 2001 From: Paramjeet Date: Mon, 26 Jun 2023 17:13:58 +0530 Subject: [PATCH 2/2] feat: added request meta support --- lib/rabbit.d.ts | 11 +++++------ lib/rabbit.js | 18 ++++++++---------- src/rabbit.ts | 19 ++++++++----------- 3 files changed, 21 insertions(+), 27 deletions(-) diff --git a/lib/rabbit.d.ts b/lib/rabbit.d.ts index f548b46..0fe7306 100644 --- a/lib/rabbit.d.ts +++ b/lib/rabbit.d.ts @@ -17,8 +17,7 @@ interface SubscribeCallbackArgs { replyTo: any; rKey: string; correlationId: any; - traceId: any; - reqStartTime: any; + meta: any; ack: () => void; nack: () => void; } @@ -117,14 +116,14 @@ declare class Rabbit { * @param {Object} options - the name of the reply queue, correlationId * @param {string} options.replyTo - the name of reply queue * @param {string} options.correlationId - * @param {Object} extra - extra message properties like traceId, reqStartTime - * @param {string} extra.traceId - * @param {string} extra.reqStartTime - the time when request initiated in our system + * @param {Object} meta - meta contains message properties like traceId, reqStartTime + * @param {string} meta.traceId + * @param {string} meta.reqStartTime - the time when request initiated in our system * @param {boolean} handle=true - handle the effect to * * @return {Promise} */ - send(qname: string, message: Record, options: Options.Publish, extra?: Record, handle?: boolean): Promise; + send(qname: string, message: Record, options: Options.Publish, meta?: Record, handle?: boolean): Promise; /** * Get a message from a queue, just get it * diff --git a/lib/rabbit.js b/lib/rabbit.js index 2e121be..020c3f8 100644 --- a/lib/rabbit.js +++ b/lib/rabbit.js @@ -209,15 +209,14 @@ class Rabbit { subscribe(qid, cb) { const q = this.queues[qid]; return q.channel.addSetup((ch) => { - this.log(`Subscribin to ${qid}`); + this.log(`Subscribing to ${qid}`); return ch.consume(qid, msg => { const message = { content: (0, safely_parse_json_1.default)(msg.content.toString()), replyTo: msg.properties.replyTo, rKey: msg.fields.routingKey, correlationId: msg.properties.correlationId, - traceId: msg.properties.headers.traceId, - reqStartTime: msg.properties.headers.reqStartTime, + meta: msg.properties.headers, ack: () => { q.channel.ack(msg); }, @@ -277,18 +276,17 @@ class Rabbit { * @param {Object} options - the name of the reply queue, correlationId * @param {string} options.replyTo - the name of reply queue * @param {string} options.correlationId - * @param {Object} extra - extra message properties like traceId, reqStartTime - * @param {string} extra.traceId - * @param {string} extra.reqStartTime - the time when request initiated in our system + * @param {Object} meta - meta contains message properties like traceId, reqStartTime + * @param {string} meta.traceId + * @param {string} meta.reqStartTime - the time when request initiated in our system * @param {boolean} handle=true - handle the effect to * * @return {Promise} */ - send(qname, message, options, extra = {}, handle = true) { + send(qname, message, options, meta = {}, handle = true) { options.headers = { ...options.headers, - traceId: extra.traceId, - reqStartTime: extra.reqStartTime + ...meta, }; const p = this.channel.sendToQueue(qname, message, options); if (handle === false) { @@ -321,4 +319,4 @@ class Rabbit { } } module.exports = Rabbit; -//# sourceMappingURL=data:application/json;base64, \ No newline at end of file +//# sourceMappingURL=data:application/json;base64, \ No newline at end of file diff --git a/src/rabbit.ts b/src/rabbit.ts index 4767d33..ae8089a 100644 --- a/src/rabbit.ts +++ b/src/rabbit.ts @@ -20,8 +20,7 @@ interface SubscribeCallbackArgs { replyTo: any; rKey: string; correlationId: any; - traceId: any; - reqStartTime: any; + meta: any; ack: () => void; nack: () => void; } @@ -246,15 +245,14 @@ class Rabbit { subscribe(qid: string, cb: (args: SubscribeCallbackArgs) => void) { const q = this.queues[qid]; return q.channel.addSetup((ch: ConfirmChannel) => { - this.log(`Subscribin to ${qid}`); + this.log(`Subscribing to ${qid}`); return ch.consume(qid, msg => { const message = { content: safeJSON(msg.content.toString()), replyTo: msg.properties.replyTo, rKey: msg.fields.routingKey, correlationId: msg.properties.correlationId, - traceId: msg.properties.headers.traceId, - reqStartTime: msg.properties.headers.reqStartTime, + meta: msg.properties.headers, ack: () => { q.channel.ack(msg); }, @@ -317,18 +315,17 @@ class Rabbit { * @param {Object} options - the name of the reply queue, correlationId * @param {string} options.replyTo - the name of reply queue * @param {string} options.correlationId - * @param {Object} extra - extra message properties like traceId, reqStartTime - * @param {string} extra.traceId - * @param {string} extra.reqStartTime - the time when request initiated in our system + * @param {Object} meta - meta contains message properties like traceId, reqStartTime + * @param {string} meta.traceId + * @param {string} meta.reqStartTime - the time when request initiated in our system * @param {boolean} handle=true - handle the effect to * * @return {Promise} */ - send(qname: string, message: Record, options: Options.Publish, extra: Record = {}, handle = true) { + send(qname: string, message: Record, options: Options.Publish, meta: Record = {}, handle = true) { options.headers = { ...options.headers, - traceId: extra.traceId, - reqStartTime: extra.reqStartTime + ...meta, }; const p = this.channel.sendToQueue(qname, message, options);