-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathSiddhiRuleProducer.java
71 lines (58 loc) · 2.22 KB
/
SiddhiRuleProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package org.apache.kafka.interfaces;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.utils.SiddhiRuleContract;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
public class SiddhiRuleProducer {
private String topic;
private String bootstrapServers;
private KafkaProducer<String, byte[]> producer;
/**
* Constructor
* @param topic kafka topic to produce a rule
* @param bootstrapServers kafka broker coordinates
*/
public SiddhiRuleProducer(String topic, String bootstrapServers) {
Objects.requireNonNull(topic, "Topic cannot be null");
Objects.requireNonNull(bootstrapServers, "Bootstrap servers should point to valid kafka brokers location");
this.topic = topic;
this.bootstrapServers = bootstrapServers;
this.createProducer();
}
private Properties getProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return properties;
}
private void createProducer() {
producer = new KafkaProducer<>(getProperties());
}
private String getKey() {
return "key";
}
/**
* Api to create Siddhi rule
* @param streamId siddhi streamId
* @param definitions siddhi rule definitions
* @param siddhiQuery siddhi rule Query
*/
public void createRule(String streamId, ArrayList<String> definitions, String siddhiQuery) {
Objects.requireNonNull(streamId, "Stream Id cannot be null");
Objects.requireNonNull(definitions, "Siddhi Rule Definitions cannot be null");
Objects.requireNonNull(siddhiQuery, "Siddhi Rule Query cannot be null");
byte[] rule = new SiddhiRuleContract(streamId, definitions, siddhiQuery).toString().getBytes();
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, getKey(), rule);
producer.send(producerRecord);
}
/**
* Api to shutdown the producer
*/
public void shutdown() {
producer.close();
}
}