forked from kytos-ng/mef_eline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
131 lines (108 loc) · 4.63 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""Utility functions."""
import functools
from pathlib import Path
from flask import request
from openapi_core import create_spec
from openapi_core.contrib.flask import FlaskOpenAPIRequest
from openapi_core.validation.request.validators import RequestValidator
from openapi_spec_validator import validate_spec
from openapi_spec_validator.readers import read_from_filename
from werkzeug.exceptions import BadRequest, UnsupportedMediaType
from kytos.core import log
from kytos.core.events import KytosEvent
def map_evc_event_content(evc, **kwargs):
"""Returns a set of values from evc to be used for content"""
return kwargs | {"evc_id": evc.id,
"name": evc.name,
"metadata": evc.metadata,
"active": evc._active,
"enabled": evc._enabled,
"uni_a": evc.uni_a.as_dict(),
"uni_z": evc.uni_z.as_dict()}
def emit_event(controller, name, context="kytos/mef_eline", content=None):
"""Send an event when something happens with an EVC."""
event_name = f"{context}.{name}"
event = KytosEvent(name=event_name, content=content)
controller.buffers.app.put(event)
def notify_link_available_tags(controller, link, src_func=None):
"""Notify link available tags."""
emit_event(controller, "link_available_tags", content={
"link": link,
"src_func": src_func
})
def compare_endpoint_trace(endpoint, vlan, trace):
"""Compare and endpoint with a trace step."""
if vlan and "vlan" in trace:
return (
endpoint.switch.dpid == trace["dpid"]
and endpoint.port_number == trace["port"]
and vlan == trace["vlan"]
)
return (
endpoint.switch.dpid == trace["dpid"]
and endpoint.port_number == trace["port"]
)
def compare_uni_out_trace(uni, trace):
"""Check if the trace last step (output) matches the UNI attributes."""
# keep compatibility for old versions of sdntrace-cp
if "out" not in trace:
return True
if not isinstance(trace["out"], dict):
return False
uni_vlan = uni.user_tag.value if uni.user_tag else None
return (
uni.interface.port_number == trace["out"].get("port")
and uni_vlan == trace["out"].get("vlan")
)
def load_spec():
"""Validate openapi spec."""
napp_dir = Path(__file__).parent
yml_file = napp_dir / "openapi.yml"
spec_dict, _ = read_from_filename(yml_file)
validate_spec(spec_dict)
return create_spec(spec_dict)
def validate(spec):
"""Decorator to validate a REST endpoint input.
Uses the schema defined in the openapi.yml file
to validate.
"""
def validate_decorator(func):
@functools.wraps(func)
def wrapper_validate(*args, **kwargs):
try:
data = request.get_json()
except BadRequest:
result = "The request body is not a well-formed JSON."
log.debug("create_circuit result %s %s", result, 400)
raise BadRequest(result) from BadRequest
if data is None:
result = "The request body mimetype is not application/json."
log.debug("update result %s %s", result, 415)
raise UnsupportedMediaType(result)
validator = RequestValidator(spec)
openapi_request = FlaskOpenAPIRequest(request)
result = validator.validate(openapi_request)
if result.errors:
error_response = (
"The request body contains invalid API data."
)
errors = result.errors[0]
if hasattr(errors, "schema_errors"):
schema_errors = errors.schema_errors[0]
error_log = {
"error_message": schema_errors.message,
"error_validator": schema_errors.validator,
"error_validator_value": schema_errors.validator_value,
"error_path": list(schema_errors.path),
"error_schema": schema_errors.schema,
"error_schema_path": list(schema_errors.schema_path),
}
log.debug("Invalid request (API schema): %s", error_log)
error_response += f" {schema_errors.message} for field"
error_response += (
f" {'/'.join(map(str,schema_errors.path))}."
)
raise BadRequest(error_response) from BadRequest
return func(*args, data=data, **kwargs)
return wrapper_validate
return validate_decorator