-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathKafka_Start_Run.py
28 lines (25 loc) · 1.19 KB
/
Kafka_Start_Run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import Send_Kafka_RunInfo # import kafka run info sender lib
from streaming_data_types.run_start_pl72 import serialise_pl72 # import ESS Flatbuffer serialiser for PL72
import time # import time to get current time for runstart
starttime = int(time.time() * 1000)
# Define all values to send to Kafka - as set data type to send all data to serialiser in one package
RunInfo = {
"job_id": "1",
"filename": "test_file.nxs",
"start_time": starttime,
# "stop_time": None,
"run_name": "test_run",
"nexus_structure": "{}",
"service_id": "IESGPythonTesting",
"instrument_name": "MAP",
"broker": "livedata.isis.cclrc.ac.uk:9092",
# "metadata": None,
"control_topic": "MAPSTEST2_runInfo",
}
# Serialise and send set to kafka:
print("Kafka start streaming test run, info: ")
print(RunInfo)
Serialised_PL72 = serialise_pl72(**RunInfo) # Serialise the info set into the pl72 data set
print("Serialised data: ", Serialised_PL72)
Send_Kafka_RunInfo.send_flatBuffer(Serialised_PL72) # Send the serialised data to the kafka run info topic
print("Sent to Kafka, test run started.")