该插件是kafka-node 的封装, 方便在egg.js 环境下使用的一个egg风格的插件, 并提供了方便的api发送给kafka消息的方法 部分详细配置请参考 https://github.com/SOHU-Co/kafka-node
egg-kafka-node 版本 | egg 1.x |
---|---|
1.x | 😁 |
0.x | 😁 |
node >= 8.0.0 😁
// config/plugin.js
exports.kafkaNode = {
enable: true,
package: 'egg-kafka-node',
};
// {app_root}/config/config.default.js
exports.kafkaNode = {
kafkaHost: '127.0.0.1:9092', // kafka 连接的地址
clientOption: {}, // KafkaClient 相关配置, 更多配置可以查看kafka-node
consumerOption: [{
groupId: 'group1', // consumerGroup 消费组id
topics: [ 'testTopic1' ], // 同一消费组 consumerGroup 下的所有 topic
options: {
fetchMaxWaitMs: 100,
fetchMinBytes: 1,
fetchMaxBytes: 1024 * 1024,
}, // 每个消费组对应的相关 consumerGroup 配置
}, {
groupId: 'group2',
topics: [ 'testTopic2' ],
options: {},
}, {
groupId: 'group3',
topics: [ 'testTopic3' ],
}],
// HighLevelProducer 生产者配置, 更多配置可以查看kafka-node
producerOption: {
requireAcks: 1,
ackTimeoutMs: 100,
partitionerType: 2,
autoCreateTopic: true, // 是否开启自动创建 topic功能
topics: [ 'testTopic1', 'testTopic2', 'testTopic3' ], // 所有消费组需要包含的topics 集合
},
messageOption: {
partition: 0,
attributes: 0, // 发送消息的相关配置
},
};
请到 [config/config.default.js](test/fixtures/apps/config/config.default.js] 查看详细配置项说明。
egg-project
├── package.json
├── app.js (optional)
├── app
| ├── router.js
│ ├── controller
│ | └── home.js
│ ├── service (optional)
│ | └── user.js
│ | └── response_time.js
│ └── kafka (optional) --------> like `controller, service...`
│ ├── someTopic (optional) -------> topic name of kafka
│ ├── someKey1Consumer.js(optional) ------> `someKey1` is a key of someTopic
| └── someKey2Consumer.js(optional) ------> `someKey2` is an another key of someTopic
├── config
| ├── plugin.js
| ├── config.default.js
│ ├── config.prod.js
| ├── config.test.js (optional)
| ├── config.local.js (optional)
| └── config.unittest.js (optional)
Note: kafka配置下 生产者配置 producerOption 配置的topics必须在{app-root}/kafka 目录下创建对应的topic。kafka 会自动读取topic 目录下对应的Consumer.js ,并自动设置文件名前缀对应的 key 名, 该key需要在sendMessage 时提供 这个 key, 方便业务区分
Note: 你必须设置 app.config.baseDir, egg-kafka-node 需要基于 这个baseDir 去加载所使用的consumers
Note: sendMessage 发送消息 方法 messages参数 最大字节数 取决于 你设置的consumer配置
// {app_root}/controller/index.js
class IndexController extends Controller {
async index() {
await this.ctx.kafka.sendMessage({
topic: 'someTopic', // 指定 kafka 目录下 的topic
key: 'someKey', // 指定 kafka 下的 topic 目录 对应key的consumer
messages: JSON.stringify({
username: 'JohnApache',
userId: 10001,
gender: 0
})
});
}
async some() {
this.ctx.kafka.sendMessageSync({
topic: 'someTopic', // 指定 kafka 目录下的 topic
key: 'someKey', // 指定 kafka 下的 topic 目录 对应key 的consumer
messages: JSON.stringify({
username: 'JohnApache',
userId: 10001,
gender: 0
})
}, () => {
// success callback
}, () => {
// error callback
})
}
}
// {app_root}/kafka/someTopic/someKeyConsumer.js
class SomeKeySubscription extends Subscription {
async subscribe(message) {
const {value, topic, key} = message;
this.ctx.logger.info(`consume message ${value} by topic ${topic} key ${key} consumer`);
await asyncTask();
}
}
请到 egg issues 异步交流。