forked from kytos/kronos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
145 lines (120 loc) · 5.54 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""Main module of kytos/kronos Network Application."""
from flask import jsonify
from kytos.core import KytosNApp, log, rest
from kytos.core.helpers import listen_to
from napps.kytos.kronos import settings
from napps.kytos.kronos.backends.csvbackend import CSVBackend
from napps.kytos.kronos.backends.influx import InfluxBackend
from napps.kytos.kronos.utils import NamespaceError, ValueConvertError
class Main(KytosNApp):
"""Main class of kytos/kronos NApp.
This class is the entry point for this napp.
"""
backend = None
def setup(self):
"""Init method for the napp."""
log.info("Time Series NApp started.")
if settings.DEFAULT_BACKEND == 'INFLUXDB':
self.backend = InfluxBackend(settings)
elif settings.DEFAULT_BACKEND == 'CSV':
self.backend = CSVBackend(settings)
@rest('v1/<namespace>/<value>', methods=['POST'])
@rest('v1/<namespace>/<value>/<timestamp>', methods=['POST'])
def rest_save(self, namespace, value, timestamp=None):
"""Save the data in one of the backends."""
try:
self.backend.save(namespace, value, timestamp)
except (NamespaceError, ValueConvertError) as exc:
exc_name = exc.__class__.__name__
return jsonify({'exc_name': exc_name, 'response': str(exc)})
return jsonify({'response': 'Value saved.'})
@rest('v1/<namespace>/', methods=['DELETE'])
@rest('v1/<namespace>/start/<start>', methods=['DELETE'])
@rest('v1/<namespace>/end/<end>', methods=['DELETE'])
@rest('v1/<namespace>/<start>/<end>', methods=['DELETE'])
def rest_delete(self, namespace, start=None, end=None):
"""Delete the data in one of the backends."""
try:
self.backend.delete(namespace, start, end)
except (NamespaceError, ValueConvertError, ValueError) as exc:
exc_name = exc.__class__.__name__
return jsonify({'exc_name': exc_name, 'response': str(exc)})
return jsonify({'response': 'Values deleted.'})
@rest('v1/namespace/', methods=['GET'])
@rest('v1/<namespace>/', methods=['GET'])
@rest('v1/<namespace>/<start>/', methods=['GET'])
@rest('v1/<namespace>/<end>/', methods=['GET'])
@rest('v1/<namespace>/<start>/<end>', methods=['GET'])
@rest('v1/<namespace>/<start>/<end>/interpol/<method>', methods=['GET'])
@rest('v1/<namespace>/<start>/<end>/interpol/<method>/<filter>/',
methods=['GET'])
@rest('v1/<namespace>/<start>/<end>/interpol/<method>/<filter>/<group>',
methods=['GET'])
def rest_get(self, namespace, start=None, end=None, method=None,
fill=None, group=None):
"""Retrieve the data from one of the backends."""
try:
result = self.backend.get(namespace, start, end, method, fill,
group)
except (NamespaceError, ValueConvertError, ValueError) as exc:
exc_name = exc.__class__.__name__
return jsonify({'exc_name': exc_name, 'response': str(exc)})
return jsonify({'response': result})
@listen_to('kytos.kronos.save')
def event_save(self, event):
"""Save the data in one of the backends."""
error = None
result = None
try:
self.backend.save(event.content['namespace'],
event.content['value'],
event.content['timestamp'])
result = 'Value saved.'
except (NamespaceError, ValueConvertError) as exc:
error = (exc.__class__.__name__, str(exc))
self._execute_callback(event, result, error)
@listen_to('kytos.kronos.get')
def event_get(self, event):
"""Get the data in one of the backends."""
error = None
result = None
try:
result = self.backend.get(event.content['namespace'],
event.content['start'],
event.content['end'])
except (NamespaceError, ValueConvertError, ValueError) as exc:
error = (exc.__class__.__name__, str(exc))
self._execute_callback(event, result, error)
@listen_to('kytos.kronos.delete')
def event_delete(self, event):
"""Delete data in one of the backends."""
error = None
result = None
try:
self.backend.delete(event.content['namespace'],
event.content['start'],
event.content['end'])
result = 'Value deleted.'
except (NamespaceError, ValueConvertError, ValueError) as exc:
error = (exc.__class__.__name__, str(exc))
self._execute_callback(event, result, error)
@staticmethod
def _execute_callback(event, data, error):
"""Run the callback function for event calls to the NApp."""
try:
event.content['callback'](event, data, error)
except KeyError:
log.error(f'Event {event} without callback function!')
except TypeError as exception:
log.error(f'Bad callback function {event.content["callback"]}!')
log.error(exception)
def execute(self):
"""Run after the setup method execution.
You can also use this method in loop mode if you add to the above setup
method a line like the following example:
self.execute_as_loop(30) # 30-second interval.
"""
log.info("EXECUTING !")
def shutdown(self):
"""Execute before tha NApp is unloaded."""
log.info("Time Series NApp is shutting down.")