-
Notifications
You must be signed in to change notification settings - Fork 2
/
kafka_helper.py
134 lines (105 loc) · 4.23 KB
/
kafka_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# This file is part of the ISIS IBEX application.
# Copyright (C) 2012-2021 Science & Technology Facilities Council.
# All rights reserved.
#
# This program is distributed in the hope that it will be useful.
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v1.0 which accompanies this distribution.
# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM
# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details.
#
# You should have received a copy of the Eclipse Public License v1.0
# along with this program; if not, you can obtain a copy from
# https://www.eclipse.org/org/documents/epl-v10.php or
# http://opensource.org/licenses/eclipse-1.0.php
from confluent_kafka import Producer, Consumer, TopicPartition
from collections.abc import Callable
import threading
import json
from datetime import datetime
from dateutil.tz import tzutc
import uuid
SERVER = "hinata.isis.cclrc.ac.uk:9092"
TOPIC = "{}_detector_diagnostics"
serialiser = lambda v: json.dumps(v).encode('utf-8')
deserialiser = lambda v: json.loads(v)
def _create_consumer():
consumer = Consumer(
{
"bootstrap.servers": SERVER,
"group.id": uuid.uuid4(),
})
return consumer
producer = Producer({"bootstrap.servers": SERVER})
def send_data(data: dict, instrument="MAPS"):
"""
Send data to kafka
Args:
data: A dictionary representing the data.
instrument: The instrument to send data for
"""
producer.produce(TOPIC.format(instrument), serialiser(data))
producer.flush()
def send_flatBuffer(data,instrument):
"""
Send data to kafka
Args:
data: Flatbuffer Data to send to Kafka
instrument: The instrument to send data for
"""
producer.produce(TOPIC.format(instrument), data)
producer.flush()
def do_func_on_live_data(my_func: Callable, instrument="MAPS"):
"""
Passes any live data to my_func.
Args:
my_func: The function to call with the live data.
instrument: The instrument to get data from
"""
consumer = _create_consumer()
consumer.subscribe([TOPIC.format(instrument)])
def live_data_thread():
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
my_func(json.loads(str(msg.value(), encoding='utf-8')))
thread = threading.Thread(target=live_data_thread)
thread.start()
def get_data_between(start_time: datetime, end_time: datetime, instrument="MAPS"):
"""
Get the data between the two given times.
Note that this is based on the timestamp of when the data was put into kafka.
Args:
start_time: The beginning of where you want the data from
end_time: The end of where you want the data to
instrument: The instrument to get the data from
"""
consumer = _create_consumer()
epoch = datetime.fromtimestamp(0, tzutc())
start_time = start_time.astimezone(tzutc())
end_time = end_time.astimezone(tzutc())
def get_part_offset(dt):
time_since_epoch = int((dt - epoch).total_seconds() * 1000)
return consumer.offsets_for_times([TopicPartition(TOPIC.format(instrument), 0, time_since_epoch)])[0]
try:
start_time_part_offset = get_part_offset(start_time)
end_time_part_offset = get_part_offset(end_time)
except Exception:
# Sometimes the consumer isn't quite ready, try once more
start_time_part_offset = get_part_offset(start_time)
end_time_part_offset = get_part_offset(end_time)
offsets = [start_time_part_offset.offset, end_time_part_offset.offset]
if offsets[0] == -1:
print("No data found for time period")
return
consumer.assign([start_time_part_offset])
if offsets[1] == -1:
offsets[1] = consumer.get_watermark_offsets(end_time_part_offset)[1]
number_to_consume = offsets[1] - offsets[0]
return [json.loads(str(data.value(), encoding="utf-8"))
for data in consumer.consume(number_to_consume)]