-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.js
115 lines (98 loc) · 3.21 KB
/
producer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/* Copyright 2020 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* =============================================================================
*
* Produce messages to Confluent Cloud
* Using the node-rdkafka client for Apache Kafka
*
* =============================================================================
*/
const Kafka = require('node-rdkafka');
const { configFromCli } = require('./config');
const ERR_TOPIC_ALREADY_EXISTS = 36;
function ensureTopicExists(config) {
const adminClient = Kafka.AdminClient.create({
'bootstrap.servers': config['bootstrap.servers'],
'sasl.username': config['sasl.username'],
'sasl.password': config['sasl.password'],
'security.protocol': config['security.protocol'],
'sasl.mechanisms': config['sasl.mechanisms']
});
return new Promise((resolve, reject) => {
adminClient.createTopic({
topic: config.topic,
num_partitions: 1,
replication_factor: 3
}, (err) => {
if (!err) {
console.log(`Created topic ${config.topic}`);
return resolve();
}
if (err.code === ERR_TOPIC_ALREADY_EXISTS) {
return resolve();
}
return reject(err);
});
});
}
function createProducer(config, onDeliveryReport) {
const producer = new Kafka.Producer({
'bootstrap.servers': config['bootstrap.servers'],
'sasl.username': config['sasl.username'],
'sasl.password': config['sasl.password'],
'security.protocol': config['security.protocol'],
'sasl.mechanisms': config['sasl.mechanisms'],
'dr_msg_cb': true
});
return new Promise((resolve, reject) => {
producer
.on('ready', () => resolve(producer))
.on('delivery-report', onDeliveryReport)
.on('event.error', (err) => {
console.warn('event.error', err);
reject(err);
});
producer.connect();
});
}
async function produceExample() {
const config = await configFromCli();
if (config.usage) {
return console.log(config.usage);
}
await ensureTopicExists(config);
const producer = await createProducer(config, (err, report) => {
if (err) {
console.warn('Error producing', err)
} else {
const {topic, partition, value} = report;
console.log(`Successfully produced record to topic "${topic}" partition ${partition} ${value}`);
}
});
for (let idx = 0; idx < 10; ++idx) {
const key = 'alice';
const value = Buffer.from(JSON.stringify({ count: idx }));
console.log(`Producing record ${key}\t${value}`);
producer.produce(config.topic, -1, value, key);
}
producer.flush(10000, () => {
producer.disconnect();
});
}
produceExample()
.catch((err) => {
console.error(`Something went wrong:\n${err}`);
process.exit(1);
});