Skip to content

Commit

Permalink
Merge pull request #29 from fjhunter/producer-and-consumer-can-de-and…
Browse files Browse the repository at this point in the history
…-encode-json

Adds options to the consumer and producer to convert messages from an…
peterprib authored Nov 6, 2020
2 parents fcfb9ba + 82cbb83 commit 2a4fb25
Showing 4 changed files with 25 additions and 5 deletions.
9 changes: 7 additions & 2 deletions kafkaManager/kafkaConsumer.html
Original file line number Diff line number Diff line change
@@ -114,7 +114,11 @@
<option value="buffer">raw</option>
</select>
</div>


<div class="form-row">
<label for="node-input-convertToJson"><i class="fa fa-list-ul"></i> Convert message to JSON </label>
<input id="node-input-convertToJson" type="checkbox" style="width: auto"></input>
</div>
</div>

</div>
@@ -138,7 +142,8 @@
fromOffset: {value:0,required:true,validate:RED.validators.number()},
encoding: {value:'utf8',required:true},
keyEncoding: {value:'utf8',required:true},
connectionType: {value:"",required:true}
connectionType: {value:"",required:true},
convertToJson : {value:false, required:true}
},
inputs:0,
inputLabels: "",
11 changes: 9 additions & 2 deletions kafkaManager/kafkaConsumer.js
Original file line number Diff line number Diff line change
@@ -50,10 +50,17 @@ function connect (node) {
text: 'Ready with ' + node.brokerNode.name
})
}

const sendMessage = function(node, message) {
if(node.convertToJson){
message.value = JSON.parse(message.value);
}
node.brokerNode.sendMsg(node, message)
}
if (Array.isArray(message)) {
message.forEach((r) => node.brokerNode.sendMsg(node, r))
message.forEach((r) => sendMessage(node, r))
} else {
node.brokerNode.sendMsg(node, message)
sendMessage(node, message)
}
})

7 changes: 6 additions & 1 deletion kafkaManager/kafkaProducer.html
Original file line number Diff line number Diff line change
@@ -68,6 +68,10 @@
<option value="2">Compress using snappy</option>
</select>
</div>
<div class="form-row">
<label for="node-input-convertFromJson"><i class="fa fa-list-ul"></i> Convert message from JSON </label>
<input id="node-input-convertToJson" type="checkbox" style="width: auto"></input>
</div>

</script>

@@ -84,7 +88,8 @@
key: {required:false},
partition: {value:0,required:true},
attributes: {value:0,required:true},
connectionType: {value:"Producer",required:true}
connectionType: {value:"Producer",required:true},
convertFromJson: {value: false, required: true}
},
inputs:1,
inputLabels: "",
3 changes: 3 additions & 0 deletions kafkaManager/kafkaProducer.js
Original file line number Diff line number Diff line change
@@ -223,6 +223,9 @@ module.exports = function (RED) {
}
node.on('input', function (msg) {
if (node.connected) {
if(node.convertFromJson) {
msg.payload = JSON.stringify(msg.payload)
}
producerSend(node, msg)
return
}

0 comments on commit 2a4fb25

Please sign in to comment.