Skip to content

Commit

Permalink
Add rabbitmq: consumer/producer implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
aojiaotage committed Oct 4, 2019
1 parent 24dfaea commit 13b233e
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 1 deletion.
7 changes: 7 additions & 0 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ module.exports = app => {
app.loader.loadToApp(path.join(app.config.baseDir, 'app/error'), 'error', {
caseStyle: 'upper',
});

loadConsumers(app);
app.sessionStore = {
async get(key) {
const res = await app.redis.get(key);
Expand All @@ -21,3 +23,8 @@ module.exports = app => {
},
};
};

function loadConsumers(app) {
const unlockConsumer = new (require('./app/consumer/unlock_inventory'))();
unlockConsumer.init(app).catch(console.log);
}
47 changes: 47 additions & 0 deletions app/consumer/base_comsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
'use strict';

const rmq = require('amqplib');

class BaseConsumer {

static get ['QUEUE_NAME_PREF']() {
return 'mi_';
}

get ['queueName']() {
throw new Error('Please define your own queue name');
}

async initConn() {
const conn = await rmq.connect(this.app.config.rabbitmq.url);
this.conn = conn;
}

async initChannel() {
if (!this.conn) {
await this.initConn();
}
const channel = await this.conn.createChannel(this.queueName);
await channel.assertQueue(this.queueName, {
durable: true,
});
this.channel = channel;
return channel;
}

onMsg(msg) {
throw new Error('Please define your own message handler');
}

async init(app) {
if (!this.app) this.app = app;
if (!this.channel) {
await this.initChannel();
}
this.channel.consume(this.queueName, (msg)=>{this.onMsg(msg)});
}

}

module.exports = BaseConsumer;

18 changes: 18 additions & 0 deletions app/consumer/unlock_inventory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict';

const BaseConsumer = require('./base_comsumer');

class UnlockInventory extends BaseConsumer {

get ['queueName']() {
return BaseConsumer.QUEUE_NAME_PREF + 'inventory_unlock';
}

onMsg(msg) {
console.log(JSON.parse(msg.content.toString('utf-8')));
this.channel.ack(msg);
}

}

module.exports = UnlockInventory;
2 changes: 2 additions & 0 deletions app/controller/site.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class SiteController extends Controller {
}

async index() {
await this.ctx.service.msgProducer.sendInventoryUnlockMsg(
{ goodsId: '111', subOrderId: '2222' });
this.ctx.body = {
code: 0,
data: {
Expand Down
2 changes: 1 addition & 1 deletion app/service/inventory.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Inventory extends Service {
return lockedAmount;
}

async removeLocksByGoodsId(goodsId, subOrderId) {
async removeLocksByGoodsId(goodsId, subOrderId, amount) {
const locksKey = Inventory.REDIS_GOODS_LOCKS_PREF + goodsId;
const locksByOrderHsetKey = Inventory.REDIS_GOODS_LOCKS_BY_ORDER_PREF +
goodsId;
Expand Down
56 changes: 56 additions & 0 deletions app/service/msg_producer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict';

const { Service } = require('egg');

const rmq = require('amqplib');

const QUEUE_NAME_PREF = 'mi_';

class MsgProducer extends Service {

static get ['QUEUE_NAMES']() {
return {
INVENTORY_UNLOCK: QUEUE_NAME_PREF + 'inventory_unlock',
};
}

async initConn() {
const conn = await rmq.connect(this.app.config.rabbitmq.url);
this.conn = conn;
}

async initChannel(queueName) {
if (!this.conn) {
await this.initConn();
}
const channel = await this.conn.createChannel(queueName);
await channel.assertQueue(queueName, {
durable: true,
});
this.channels[queueName] = channel;
return channel;
}

async sendMsg(queueName, msg) {

if (!this.channels) {
this.channels = {};
}

let channel = this.channels[queueName];

if (!channel) {
channel = await this.initChannel(queueName);
}

await channel.sendToQueue(queueName, Buffer.from(msg));
}

async sendInventoryUnlockMsg(inventoryInfo) {
await this.sendMsg(MsgProducer.QUEUE_NAMES.INVENTORY_UNLOCK, JSON.stringify(inventoryInfo));
}

}

module.exports = MsgProducer;

4 changes: 4 additions & 0 deletions config/config.default.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,9 @@ module.exports = appInfo => {
match: ['/api/v1/admin'],
};

config.rabbitmq = {
url: 'amqp://localhost',
};

return config;
};

0 comments on commit 13b233e

Please sign in to comment.