From 20ae1f81db17911c182e8f7852a6385c06253fea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20=C3=96qvist?= Date: Wed, 1 Nov 2023 13:50:29 +0100 Subject: [PATCH] Add OpenTelemetry tracing --- nipap-cli/nipap | 18 +++- nipap-cli/nipaprc | 4 + nipap-www/nipapwww/__init__.py | 12 +++ nipap-www/nipapwww/tracing.py | 27 ++++++ nipap-www/nipapwww/xhr.py | 22 +++++ nipap/nipap.conf.dist | 6 ++ nipap/nipap/authlib.py | 16 +++- nipap/nipap/backend.py | 32 ++++++- nipap/nipap/rest.py | 13 +-- nipap/nipap/tracing.py | 154 +++++++++++++++++++++++++++++++++ nipap/nipapd | 28 ++++++ nipap/requirements.txt | 19 +++- pynipap/pynipap.py | 61 ++++++++----- pynipap/setup.py | 2 +- pynipap/tracing.py | 111 ++++++++++++++++++++++++ 15 files changed, 492 insertions(+), 33 deletions(-) create mode 100644 nipap-www/nipapwww/tracing.py create mode 100644 nipap/nipap/tracing.py create mode 100644 pynipap/tracing.py diff --git a/nipap-cli/nipap b/nipap-cli/nipap index 2d8d1900d..6238f1d75 100755 --- a/nipap-cli/nipap +++ b/nipap-cli/nipap @@ -14,7 +14,7 @@ import nipap_cli import nipap_cli.nipap_cli from nipap_cli.command import Command, CommandError from pynipap import NipapError - +import tracing # early close of stdout to avoid broken pipe, see #464 # If our output is being piped and the receiver of the pipe is killed off before @@ -88,6 +88,22 @@ if __name__ == '__main__': cfg.read(userrcfile) nipap_cli.nipap_cli.cfg = cfg + if cfg.has_section("tracing"): + try: + use_grpc = True + try: + otlp_endpoint = cfg.get("tracing", "otlp_grpc_endpoint") + except configparser.NoOptionError: + # Send trace via nipapd + use_grpc = False + use_ssl = cfg.getboolean("global", "use_ssl") + otlp_endpoint = "https://" if use_ssl else "http://" + (cfg.get("global", "hostname") + + ":" + cfg.get("global", "port") + + "/v1/traces/") + tracing.init_tracing("nipap-cli", otlp_endpoint, use_grpc) + except KeyError: + pass + # setup our configuration nipap_cli.nipap_cli.setup_connection() diff --git a/nipap-cli/nipaprc b/nipap-cli/nipaprc index 88d23c1d4..276cb3e7b 100644 --- a/nipap-cli/nipaprc +++ b/nipap-cli/nipaprc @@ -46,3 +46,7 @@ default_list_vrf_rt = all # # prefix_list_columns = vrf_rt,prefix # +# Enable OpenTelemetry tracing by uncommenting section. +# [tracing] +# Specify OTLP GRPC endpoint. If no endpoint is specified traces will be sent via nipapd to OpenTelemetry Collector +# otlp_grpc_endpoint = http://127.0.0.1:4317 diff --git a/nipap-www/nipapwww/__init__.py b/nipap-www/nipapwww/__init__.py index 6675ac25d..46dfd99b0 100644 --- a/nipap-www/nipapwww/__init__.py +++ b/nipap-www/nipapwww/__init__.py @@ -38,6 +38,18 @@ def create_app(test_config=None): # configure pynipap pynipap.xmlrpc_uri = app.config["XMLRPC_URI"] + # configure tracing + if nipap_config.has_section("tracing"): + try: + import tracing + from opentelemetry.instrumentation.wsgi import OpenTelemetryMiddleware + tracing.init_tracing("nipap-www", nipap_config.get("tracing", "otlp_grpc_endpoint")) + app.wsgi_app = OpenTelemetryMiddleware(app.wsgi_app) + except KeyError: + pass + except ImportError: + pass + # Set up blueprints from . import auth, ng, prefix, static, version, xhr app.register_blueprint(auth.bp) diff --git a/nipap-www/nipapwww/tracing.py b/nipap-www/nipapwww/tracing.py new file mode 100644 index 000000000..5bc11af81 --- /dev/null +++ b/nipap-www/nipapwww/tracing.py @@ -0,0 +1,27 @@ +import functools + +from flask import session + +try: + from opentelemetry import trace + from opentelemetry.trace import INVALID_SPAN + + def create_span(view): + @functools.wraps(view) + def wrapped_view(**kwargs): + current_span = trace.get_current_span() + if current_span != INVALID_SPAN: + if session.get("user") is None: + current_span.set_attribute("username", "unknown") + else: + current_span.set_attribute("username", session.get("user")) + return view(**kwargs) + + return wrapped_view +except ImportError: + def create_span(view): + @functools.wraps(view) + def wrapped_view(**kwargs): + return view(**kwargs) + + return wrapped_view diff --git a/nipap-www/nipapwww/xhr.py b/nipap-www/nipapwww/xhr.py index 3e67afd3f..610460f1a 100644 --- a/nipap-www/nipapwww/xhr.py +++ b/nipap-www/nipapwww/xhr.py @@ -6,6 +6,8 @@ from .auth import login_required +from .tracing import create_span + bp = Blueprint('xhr', __name__, url_prefix='/xhr') @@ -65,6 +67,7 @@ def extract_pool_attr(req): # TODO: Is this used any more? @bp.route('/list_vrf') @login_required +@create_span def list_vrf(): """ List VRFs and return JSON encoded result. """ @@ -81,6 +84,7 @@ def list_vrf(): @bp.route('/smart_search_vrf', methods=('GET', 'POST')) @login_required +@create_span def smart_search_vrf(): """ Perform a smart VRF search. @@ -131,6 +135,7 @@ def smart_search_vrf(): @bp.route('/add_vrf', methods=('GET', 'POST')) @login_required +@create_span def add_vrf(): """ Add a new VRF to NIPAP and return its data. """ @@ -159,6 +164,7 @@ def add_vrf(): @bp.route('/edit_vrf/', methods=('GET', 'POST')) @login_required +@create_span def edit_vrf(id): """ Edit a VRF. """ @@ -193,6 +199,7 @@ def edit_vrf(id): @bp.route('/remove_vrf/', methods=('GET', 'POST')) @login_required +@create_span def remove_vrf(id): """ Remove a VRF. """ @@ -211,6 +218,7 @@ def remove_vrf(id): @bp.route('/list_pool', methods=('GET', 'POST')) @login_required +@create_span def list_pool(): """ List pools and return JSON encoded result. """ @@ -233,6 +241,7 @@ def list_pool(): @bp.route('/smart_search_pool') @login_required +@create_span def smart_search_pool(): """ Perform a smart pool search. @@ -273,6 +282,7 @@ def smart_search_pool(): @bp.route('/add_pool', methods=('GET', 'POST')) @login_required +@create_span def add_pool(): """ Add a pool. """ @@ -307,6 +317,7 @@ def add_pool(): @bp.route('/edit_pool/', methods=('GET', 'POST')) @login_required +@create_span def edit_pool(id): """ Edit a pool. """ @@ -341,6 +352,7 @@ def edit_pool(id): @bp.route('/remove_pool/', methods=('GET', 'POST')) @login_required +@create_span def remove_pool(id): """ Remove a pool. """ @@ -359,6 +371,7 @@ def remove_pool(id): @bp.route('/list_prefix', methods=('GET', 'POST')) @login_required +@create_span def list_prefix(): """ List prefixes and return JSON encoded result. """ @@ -378,6 +391,7 @@ def list_prefix(): @bp.route('/search_prefix', methods=('GET', 'POST')) @login_required +@create_span def search_prefix(): """ Search prefixes. Does not yet incorporate all the functions of the search_prefix API function due to difficulties with transferring @@ -446,6 +460,7 @@ def search_prefix(): @bp.route('/smart_search_prefix', methods=('GET', 'POST')) @login_required +@create_span def smart_search_prefix(): """ Perform a smart search. @@ -559,6 +574,7 @@ def smart_search_prefix(): @bp.route('/add_prefix', methods=('GET', 'POST')) @login_required +@create_span def add_prefix(): """ Add prefix according to the specification. @@ -676,6 +692,7 @@ def add_prefix(): @bp.route('/edit_prefix/', methods=('GET', 'POST')) @login_required +@create_span def edit_prefix(id): """ Edit a prefix. """ @@ -755,6 +772,7 @@ def edit_prefix(id): @bp.route('/remove_prefix/', methods=('GET', 'POST')) @login_required +@create_span def remove_prefix(id): """ Remove a prefix. """ @@ -773,6 +791,7 @@ def remove_prefix(id): @bp.route('/add_current_vrf', methods=('GET', 'POST')) @login_required +@create_span def add_current_vrf(): """ Add VRF to filter list session variable """ @@ -798,6 +817,7 @@ def add_current_vrf(): @bp.route('/del_current_vrf', methods=('GET', 'POST')) @login_required +@create_span def del_current_vrf(): """ Remove VRF to filter list session variable """ @@ -812,6 +832,7 @@ def del_current_vrf(): @bp.route('/get_current_vrfs') @login_required +@create_span def get_current_vrfs(): """ Return VRF filter list from session variable @@ -856,6 +877,7 @@ def get_current_vrfs(): @bp.route('/list_tags') @login_required +@create_span def list_tags(self): """ List Tags and return JSON encoded result. """ diff --git a/nipap/nipap.conf.dist b/nipap/nipap.conf.dist index ebb00461f..358b13b33 100644 --- a/nipap/nipap.conf.dist +++ b/nipap/nipap.conf.dist @@ -202,3 +202,9 @@ secret_key = {{WWW_SECRET_KEY}} # the web interface, it might contain hints about what credentials to use or # just greet the user. #welcome_message = Welcome to NIPAP at COMPANY. Please login using your XYZ credentials. +# Enable OpenTelemetry tracing by uncommenting section. +# [tracing] +# Specify OTLP GRPC endpoint. Used to send traces to OpenTelemetry Collector +# otlp_grpc_endpoint=http://opentelemetry-collector:4317 +# Specify OTLP HTTP endpoint. Used when proxying traces to OpenTelemetry-Collector from nipap-cli +# otlp_http_endpoint=http://opentelemetry-collector:4318/v1/traces diff --git a/nipap/nipap/authlib.py b/nipap/nipap/authlib.py index 464f00075..5bc9ace83 100644 --- a/nipap/nipap/authlib.py +++ b/nipap/nipap/authlib.py @@ -77,6 +77,8 @@ import random import requests +from .tracing import create_span_authenticate + try: import jwt except ImportError: @@ -336,6 +338,16 @@ def __init__(self, name, jwt_token, authoritative_source, self._logger.error('Unable to load Python jwt module, please verify it is installed') raise AuthError('Unable to authenticate') + # Decode token + try: + payload = jwt.decode( + self._jwt_token, options={"verify_signature": False}) + self.username = payload.get('sub') + self.full_name = payload.get('name', payload.get('sub')) + except jwt.exceptions.DecodeError: + raise AuthError('Failed to decode jwt token') + + @create_span_authenticate def authenticate(self): """ Verify authentication. @@ -403,9 +415,7 @@ def authenticate(self): # auth succeeded if self._authenticated: - self.username = payload.get('sub') self.authenticated_as = payload.get('sub') - self.full_name = payload.get('name', payload.get('sub')) self._logger.debug('successfully authenticated as %s, username' % self.authenticated_as) self.trusted = False @@ -495,6 +505,7 @@ def __init__(self, name, username, password, authoritative_source, auth_options= self._logger.exception(exc) raise AuthError('Unable to establish secure connection to ldap server') + @create_span_authenticate def authenticate(self): """ Verify authentication. @@ -687,6 +698,7 @@ def _upgrade_database(self): pass self._db_conn.commit() + @create_span_authenticate def authenticate(self): """ Verify authentication. diff --git a/nipap/nipap/backend.py b/nipap/nipap/backend.py index 09093e418..5d98ed201 100644 --- a/nipap/nipap/backend.py +++ b/nipap/nipap/backend.py @@ -202,6 +202,7 @@ from . import smart_parsing from . import db_schema import nipap +from .tracing import create_span # support multiple versions of parsedatetime try: @@ -720,9 +721,9 @@ def _connect_db(self): # Create database connection while True: try: - self._con_pg = psycopg2.connect(**db_args) + self._con_pg = psycopg2.connect(**db_args, cursor_factory=psycopg2.extras.DictCursor) self._con_pg.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - self._curs_pg = self._con_pg.cursor(cursor_factory=psycopg2.extras.DictCursor) + self._curs_pg = self._con_pg.cursor() psycopg2.extras.register_hstore(self._con_pg, globally=True) except psycopg2.Error as exc: if re.search("database.*does not exist", str(exc)): @@ -1156,6 +1157,7 @@ def _expand_vrf_query(self, query, table_name=None): return where, opt + @create_span @requires_rw def add_vrf(self, auth, attr): """ Add a new VRF. @@ -1204,6 +1206,7 @@ def add_vrf(self, auth, attr): return vrf + @create_span @requires_rw def remove_vrf(self, auth, spec): """ Remove a VRF. @@ -1252,6 +1255,7 @@ def remove_vrf(self, auth, spec): sql, params = self._sql_expand_insert(audit_params) self._execute('INSERT INTO ip_net_log ' + sql, params) + @create_span def list_vrf(self, auth, spec=None): """ Return a list of VRFs matching `spec`. @@ -1292,6 +1296,7 @@ def list_vrf(self, auth, spec=None): return res + @create_span def _get_vrf(self, auth, spec, prefix='vrf_'): """ Get a VRF based on prefix spec @@ -1323,6 +1328,7 @@ def _get_vrf(self, auth, spec, prefix='vrf_'): raise NipapNonExistentError('No matching VRF found.') + @create_span @requires_rw def edit_vrf(self, auth, spec, attr): """ Update VRFs matching `spec` with attributes `attr`. @@ -1379,6 +1385,7 @@ def edit_vrf(self, auth, spec, attr): return updated_vrfs + @create_span def search_vrf(self, auth, query, search_options=None): """ Search VRF list for VRFs matching `query`. @@ -1508,6 +1515,7 @@ def search_vrf(self, auth, query, search_options=None): return {'search_options': search_options, 'result': result} + @create_span def smart_search_vrf(self, auth, query_str, search_options=None, extra_query=None): """ Perform a smart search on VRF list. @@ -1676,6 +1684,7 @@ def _expand_pool_query(self, query, table_name=None): return where, opt + @create_span @requires_rw def add_pool(self, auth, attr): """ Create a pool according to `attr`. @@ -1721,6 +1730,7 @@ def add_pool(self, auth, attr): return pool + @create_span @requires_rw def remove_pool(self, auth, spec): """ Remove a pool. @@ -1760,6 +1770,7 @@ def remove_pool(self, auth, spec): sql, params = self._sql_expand_insert(audit_params) self._execute('INSERT INTO ip_net_log ' + sql, params) + @create_span def list_pool(self, auth, spec=None): """Return a list of pools. @@ -1878,6 +1889,7 @@ def _get_pool(self, auth, spec): raise NipapInputError("non-existing pool specified") return pool[0] + @create_span @requires_rw def edit_pool(self, auth, spec, attr): """ Update pool given by `spec` with attributes `attr`. @@ -1933,6 +1945,7 @@ def edit_pool(self, auth, spec, attr): return updated_pools + @create_span def search_pool(self, auth, query, search_options=None): """ Search pool list for pools matching `query`. @@ -2085,6 +2098,7 @@ def search_pool(self, auth, query, search_options=None): return {'search_options': search_options, 'result': result} + @create_span def smart_search_pool(self, auth, query_str, search_options=None, extra_query=None): """ Perform a smart search on pool list. @@ -2310,6 +2324,7 @@ def _expand_prefix_query(self, query, table_name=None): return where, opt + @create_span @requires_rw def add_prefix(self, auth, attr, args=None): """ Add a prefix and return its ID. @@ -2507,6 +2522,7 @@ def add_prefix(self, auth, attr, args=None): return prefix + @create_span @requires_rw def edit_prefix(self, auth, spec, attr): """ Update prefix matching `spec` with attributes `attr`. @@ -2636,6 +2652,7 @@ def edit_prefix(self, auth, spec, attr): return updated_prefixes + @create_span def find_free_prefix(self, auth, vrf, args): """ Finds free prefixes in the sources given in `args`. @@ -2800,6 +2817,7 @@ def find_free_prefix(self, auth, vrf, args): return res + @create_span def list_prefix(self, auth, spec=None): """ List prefixes matching the `spec`. @@ -2893,6 +2911,7 @@ def _db_remove_prefix(self, spec, recursive=False): sql = "DELETE FROM ip_net_plan AS p WHERE " + where self._execute(sql, params) + @create_span @requires_rw def remove_prefix(self, auth, spec, recursive=False): """ Remove prefix matching `spec`. @@ -2972,6 +2991,7 @@ def remove_prefix(self, auth, spec, recursive=False): sql, params = self._sql_expand_insert(audit_params2) self._execute('INSERT INTO ip_net_log ' + sql, params) + @create_span def search_prefix(self, auth, query, search_options=None): """ Search prefix list for prefixes matching `query`. @@ -3370,6 +3390,7 @@ def search_prefix(self, auth, query, search_options=None): return {'search_options': search_options, 'result': result} + @create_span def smart_search_prefix(self, auth, query_str, search_options=None, extra_query=None): """ Perform a smart search on prefix list. @@ -3543,6 +3564,7 @@ def _expand_asn_spec(self, spec): return where, params + @create_span def list_asn(self, auth, asn=None): """ List AS numbers matching `spec`. @@ -3582,6 +3604,7 @@ def list_asn(self, auth, asn=None): return res + @create_span @requires_rw def add_asn(self, auth, attr): """ Add AS number to NIPAP. @@ -3626,6 +3649,7 @@ def add_asn(self, auth, attr): return asn + @create_span @requires_rw def edit_asn(self, auth, asn, attr): """ Edit AS number @@ -3679,6 +3703,7 @@ def edit_asn(self, auth, asn, attr): return updated_asns + @create_span @requires_rw def remove_asn(self, auth, asn): """ Remove an AS number. @@ -3718,6 +3743,7 @@ def remove_asn(self, auth, asn): sql, params = self._sql_expand_insert(audit_params) self._execute('INSERT INTO ip_net_log ' + sql, params) + @create_span def search_asn(self, auth, query, search_options=None): """ Search ASNs for entries matching 'query' @@ -3815,6 +3841,7 @@ def search_asn(self, auth, query, search_options=None): return {'search_options': search_options, 'result': result} + @create_span def smart_search_asn(self, auth, query_str, search_options=None, extra_query=None): """ Perform a smart search operation among AS numbers @@ -4006,6 +4033,7 @@ def _expand_tag_query(self, query, table_name=None): return where, opt + @create_span def search_tag(self, auth, query, search_options=None): """ Search Tags for entries matching 'query' diff --git a/nipap/nipap/rest.py b/nipap/nipap/rest.py index 10d3a9fb5..a9899fc25 100644 --- a/nipap/nipap/rest.py +++ b/nipap/nipap/rest.py @@ -17,6 +17,8 @@ from .backend import Nipap, NipapError import nipap from .authlib import AuthFactory, AuthError +from .tracing import create_span_rest + def setup(app): api = Api(app, prefix="/rest/v1") @@ -162,9 +164,9 @@ def decorated(self, *args, **kwargs): # authenticated? if not auth.authenticate(): - self.logger.debug("Invalid bearer token.") + self.logger.debug("Invalid bearer token") abort(401, error={"code": 401, - "message": "Invalid bearer token."}) + "message": "Invalid bearer token"}) else: auth = af.get_auth(request.authorization.username, request.authorization.password, auth_source, auth_options or {}) @@ -213,6 +215,7 @@ def __init__(self): @requires_auth + @create_span_rest def get(self, args): """ Search/list prefixes """ @@ -247,8 +250,8 @@ def get(self, args): self.logger.error(str(err)) abort(500, error={"code": 500, "message": "Internal error"}) - @requires_auth + @create_span_rest def post(self, args): """ Add prefix """ @@ -266,8 +269,8 @@ def post(self, args): self.logger.error(str(err)) abort(500, error={"code": 500, "message": "Internal error"}) - @requires_auth + @create_span_rest def put(self, args): """ Edit prefix """ @@ -286,8 +289,8 @@ def put(self, args): self.logger.error(str(err)) abort(500, error={"code": 500, "message": "Internal error"}) - @requires_auth + @create_span_rest def delete(self, args): """ Remove prefix """ diff --git a/nipap/nipap/tracing.py b/nipap/nipap/tracing.py new file mode 100644 index 000000000..96b738f63 --- /dev/null +++ b/nipap/nipap/tracing.py @@ -0,0 +1,154 @@ +try: + import inspect + from functools import wraps + + from flask import request + + from opentelemetry import trace, context + from opentelemetry.trace import SpanKind, StatusCode + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk.resources import SERVICE_NAME, Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from requests import post + + tracer = trace.get_tracer("nipap") + + def init_tracing(service_name, endpoint): + resource = Resource(attributes={ + SERVICE_NAME: service_name + }) + + provider = TracerProvider(resource=resource) + processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint)) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + + + def setup(app, endpoint): + @app.route('/v1/traces/', defaults={'path': ''}, methods=["POST"]) + def proxy(path): + return post(f'{endpoint}{path}', data=request.data, headers=request.headers).content + + + def create_span(f): + """ Class decorator creating a opentelemetry span + """ + + @wraps(f) + def decorated(*args, **kwargs): + """ + """ + + auth = args[1] + signature = inspect.signature(f) + with tracer.start_as_current_span(args[0].__class__.__name__ + " " + f.__name__, context.get_current(), + SpanKind.CLIENT): + index = 0 + span = trace.get_current_span() + for parameter in signature.parameters: + if index > 1: + try: + span.set_attribute("argument." + parameter, str(args[index])) + except IndexError: + break + index += 1 + try: + span.set_attribute("username", auth.username) + if auth.full_name is not None: + span.set_attribute("full_name", auth.full_name) + span.set_attribute("authoritative_source", auth.authoritative_source) + span.set_attribute("authenticated_as", auth.authenticated_as) + except: + pass + return f(*args, **kwargs) + + return decorated + + def create_span_rest(f): + """ Class decorator creating a opentelemetry span + """ + + @wraps(f) + def decorated(*args, **kwargs): + """ + """ + + with tracer.start_as_current_span(args[0].__class__.__name__ + " " + f.__name__, context.get_current(), + SpanKind.CLIENT): + span = trace.get_current_span() + try: + auth = args[1]["auth"] + span.set_attribute("username", auth.username) + span.set_attribute("full_name", auth.full_name) + span.set_attribute("authoritative_source", auth.authoritative_source) + span.set_attribute("authenticated_as", auth.authenticated_as) + except KeyError: + pass + + try: + prefix = args[1]["prefix"] + span.set_attribute("prefix", str(prefix)) + except KeyError: + pass + + try: + attr = args[1]["attr"] + span.set_attribute("attr", str(attr)) + except KeyError: + pass + + return f(*args, **kwargs) + return decorated + + def create_span_authenticate(f): + """ Class decorator creating a opentelemetry span + """ + + @wraps(f) + def decorated(*args, **kwargs): + """ + """ + + result = f(*args, **kwargs) + + span = trace.get_current_span() + + span.set_attribute("auth_backend", args[0].auth_backend) + if args[0].username is not None: + span.set_attribute("username", args[0].username) + if args[0].authenticated_as is not None: + span.set_attribute("authenticated_as", args[0].authenticated_as) + if args[0].full_name is not None: + span.set_attribute("full_name", args[0].full_name) + if args[0].authoritative_source is not None: + span.set_attribute("authoritative_source", args[0].authoritative_source) + + if result: + span.set_status(StatusCode.OK) + else: + span.set_status(StatusCode.ERROR) + + return result + + return decorated + + +except ImportError: + def create_span(f): + @wraps(f) + def decorated(*args, **kwargs): + return f(*args, **kwargs) + return decorated + + def create_span_rest(f): + @wraps(f) + def decorated(*args, **kwargs): + return f(*args, **kwargs) + return decorated + + def create_span_authenticate(f): + @wraps(f) + def decorated(*args, **kwargs): + return f(*args, **kwargs) + return decorated \ No newline at end of file diff --git a/nipap/nipapd b/nipap/nipapd index 42f65ca22..96db55537 100755 --- a/nipap/nipapd +++ b/nipap/nipapd @@ -226,6 +226,34 @@ if __name__ == '__main__': # flask app setups app = Flask(__name__) + + # Configure tracing + if cfg.has_section("tracing"): + try: + from nipap.tracing import init_tracing, setup + from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor + from opentelemetry.instrumentation.flask import FlaskInstrumentor + otlp_grpc_endpoint = cfg.get("tracing", "otlp_grpc_endpoint") + init_tracing("nipapd", otlp_grpc_endpoint) + FlaskInstrumentor.instrument_app(app, excluded_urls="/v1/traces") + Psycopg2Instrumentor().instrument(enable_commenter=True, commenter_options={}) + + # Configure proxy of traces from nipap-cli to collector + try: + otlp_http_endpoint = cfg.get("tracing", "otlp_http_endpoint") + setup(app, otlp_http_endpoint) + except configparser.NoOptionError: + pass + logger.debug('Tracing is enabled') + except KeyError: + logger.error('Error in tracing configuration. No tracing enabled') + pass + except ImportError: + logger.error('Failed to import tracing libraries. Check dependencies. No tracing enabled') + pass + else: + logger.debug('Tracing is disabled') + Compress(app) import nipap.rest diff --git a/nipap/requirements.txt b/nipap/requirements.txt index ef817ae79..b506d47f7 100644 --- a/nipap/requirements.txt +++ b/nipap/requirements.txt @@ -17,9 +17,24 @@ psutil==5.7.2 psycopg2==2.8.6 --no-binary psycopg2 pyparsing==2.4.7 python-dateutil==2.8.1 -# optional dependency on ldap -python-ldap==3.3.1 pytz==2021.1 pyjwt==1.5.3 tornado==6.1 docutils==0.16 +# optional dependencies below +python-ldap==3.3.1 +opentelemetry-api==1.20.0 +opentelemetry-exporter-otlp==1.20.0 +opentelemetry-exporter-otlp-proto-common==1.20.0 +opentelemetry-exporter-otlp-proto-grpc==1.20.0 +opentelemetry-exporter-otlp-proto-http==1.20.0 +opentelemetry-instrumentation==0.41b0 +opentelemetry-instrumentation-dbapi==0.41b0 +opentelemetry-instrumentation-flask==0.41b0 +opentelemetry-instrumentation-psycopg2==0.41b0 +opentelemetry-instrumentation-wsgi==0.41b0 +opentelemetry-proto==1.20.0 +opentelemetry-sdk==1.20.0 +opentelemetry-semantic-conventions==0.41b0 +opentelemetry-util-http==0.41b0 + diff --git a/pynipap/pynipap.py b/pynipap/pynipap.py index 80bc01f47..5c5e2c747 100644 --- a/pynipap/pynipap.py +++ b/pynipap/pynipap.py @@ -253,6 +253,12 @@ def __init__(self, options = None): self.options = options +try: + from tracing import create_span, TracingXMLTransport + xml_transport = TracingXMLTransport +except ImportError: + xml_transport = xmlrpclib.Transport + class XMLRPCConnection: """ Handles a shared XML-RPC connection. @@ -276,8 +282,10 @@ def __init__(self): raise NipapError('XML-RPC URI not specified') # creating new instance - self.connection = xmlrpclib.ServerProxy(xmlrpc_uri, allow_none=True, - use_datetime=True) + self.connection = xmlrpclib.ServerProxy(xmlrpc_uri, + transport=xml_transport(), + allow_none=True, + use_datetime=True) self._logger = logging.getLogger(self.__class__.__name__) @@ -329,6 +337,7 @@ class Tag(Pynipap): """ @classmethod + @create_span def from_dict(cls, tag=None): """ Create new Tag-object from dict. @@ -346,6 +355,7 @@ def from_dict(cls, tag=None): @classmethod + @create_span def search(cls, query, search_opts=None): """ Search tags. @@ -423,6 +433,7 @@ def __init__(self): @classmethod + @create_span def list(cls, vrf=None): """ List VRFs. @@ -452,7 +463,8 @@ def list(cls, vrf=None): @classmethod - def from_dict(cls, parm, vrf = None): + @create_span + def from_dict(cls, parm, vrf=None): """ Create new VRF-object from dict. Suitable for creating objects from XML-RPC data. @@ -487,6 +499,7 @@ def from_dict(cls, parm, vrf = None): @classmethod + @create_span def get(cls, id): """ Get the VRF with id 'id'. """ @@ -509,6 +522,7 @@ def get(cls, id): @classmethod + @create_span def search(cls, query, search_opts=None): """ Search VRFs. @@ -541,7 +555,8 @@ def search(cls, query, search_opts=None): @classmethod - def smart_search(cls, query_string, search_options=None, extra_query = None): + @create_span + def smart_search(cls, query_string, search_options=None, extra_query=None): """ Perform a smart VRF search. Maps to the function @@ -576,8 +591,7 @@ def smart_search(cls, query_string, search_options=None, extra_query = None): return result - - + @create_span def save(self): """ Save changes made to object to NIPAP. @@ -634,8 +648,7 @@ def save(self): _cache['VRF'][self.id] = self - - + @create_span def remove(self): """ Remove VRF. @@ -689,7 +702,7 @@ def __init__(self): self.tags = {} self.avps = {} - + @create_span def save(self): """ Save changes made to pool to NIPAP. @@ -748,8 +761,7 @@ def save(self): _cache['Pool'][self.id] = self - - + @create_span def remove(self): """ Remove pool. @@ -773,6 +785,7 @@ def remove(self): @classmethod + @create_span def get(cls, id): """ Get the pool with id 'id'. """ @@ -795,6 +808,7 @@ def get(cls, id): @classmethod + @create_span def search(cls, query, search_opts=None): """ Search pools. @@ -828,7 +842,8 @@ def search(cls, query, search_opts=None): @classmethod - def smart_search(cls, query_string, search_options=None, extra_query = None): + @create_span + def smart_search(cls, query_string, search_options=None, extra_query=None): """ Perform a smart pool search. Maps to the function @@ -867,7 +882,8 @@ def smart_search(cls, query_string, search_options=None, extra_query = None): @classmethod - def from_dict(cls, parm, pool = None): + @create_span + def from_dict(cls, parm, pool=None): """ Create new Pool-object from dict. Suitable for creating objects from XML-RPC data. @@ -907,6 +923,7 @@ def from_dict(cls, parm, pool = None): @classmethod + @create_span def list(self, spec=None): """ List pools. @@ -979,6 +996,7 @@ def __init__(self): @classmethod + @create_span def get(cls, id): """ Get the prefix with id 'id'. """ @@ -1001,6 +1019,7 @@ def get(cls, id): @classmethod + @create_span def find_free(cls, vrf, args): """ Finds a free prefix. @@ -1036,6 +1055,7 @@ def find_free(cls, vrf, args): @classmethod + @create_span def search(cls, query, search_opts=None): """ Search for prefixes. @@ -1070,7 +1090,8 @@ def search(cls, query, search_opts=None): @classmethod - def smart_search(cls, query_string, search_options=None, extra_query = None): + @create_span + def smart_search(cls, query_string, search_options=None, extra_query=None): """ Perform a smart prefix search. Maps to the function @@ -1109,6 +1130,7 @@ def smart_search(cls, query_string, search_options=None, extra_query = None): @classmethod + @create_span def list(cls, spec=None): """ List prefixes. @@ -1136,8 +1158,7 @@ def list(cls, spec=None): return res - - + @create_span def save(self, args=None): """ Save prefix to NIPAP. @@ -1251,9 +1272,8 @@ def save(self, args=None): if self.pool.id in _cache['Pool']: del _cache['Pool'][self.pool.id] - - - def remove(self, recursive = False): + @create_span + def remove(self, recursive=False): """ Remove the prefix. Maps to the function :py:func:`nipap.backend.Nipap.remove_prefix` @@ -1283,7 +1303,8 @@ def remove(self, recursive = False): @classmethod - def from_dict(cls, pref, prefix = None): + @create_span + def from_dict(cls, pref, prefix=None): """ Create a Prefix object from a dict. Suitable for creating Prefix objects from XML-RPC input. diff --git a/pynipap/setup.py b/pynipap/setup.py index c1a5753c1..a0490477b 100644 --- a/pynipap/setup.py +++ b/pynipap/setup.py @@ -17,7 +17,7 @@ author_email = pynipap.__author_email__, license = pynipap.__license__, url = pynipap.__url__, - py_modules = ['pynipap'], + py_modules = ['pynipap','tracing'], keywords = ['nipap'], classifiers = [ 'Development Status :: 4 - Beta', diff --git a/pynipap/tracing.py b/pynipap/tracing.py new file mode 100644 index 000000000..0a779efaf --- /dev/null +++ b/pynipap/tracing.py @@ -0,0 +1,111 @@ +import inspect +import sys +from functools import wraps +from pynipap import AuthOptions + +if sys.version_info[0] < 3: + import xmlrpclib + + int = long +else: + import xmlrpc.client as xmlrpclib + +try: + from opentelemetry import trace, context + from opentelemetry.trace import SpanKind, INVALID_SPAN + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk.resources import SERVICE_NAME, Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + import opentelemetry.exporter.otlp.proto.http.trace_exporter + + tracer = trace.get_tracer("pynipap") + + + def init_tracing(service_name, endpoint, use_grpc=True): + resource = Resource(attributes={ + SERVICE_NAME: service_name + }) + + provider = TracerProvider(resource=resource) + if use_grpc: + processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint)) + else: + processor = BatchSpanProcessor(opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter(endpoint=endpoint)) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + + + def create_span(f): + """ Class decorator creating a opentelemetry span + """ + + @wraps(f) + def decorated(*args, **kwargs): + """ + """ + + signature = inspect.signature(f) + + if args[0].__class__ == type: + span_name = args[0].__name__ + " " + f.__name__ + else: + span_name = str(args[0].__class__.__name__) + " " + f.__name__ + + with tracer.start_as_current_span(span_name, + context.get_current(), + SpanKind.CLIENT): + index = 0 + span = trace.get_current_span() + for parameter in signature.parameters: + if index > 0: + try: + span.set_attribute("argument." + parameter, str(args[index])) + except IndexError: + break + index += 1 + try: + span.set_attribute("username", AuthOptions().options['username']) + span.set_attribute("authoritative_source", AuthOptions().options['authoritative_source']) + except BaseException: + pass + return f(*args, **kwargs) + return decorated + + + class TracingXMLTransport(xmlrpclib.Transport): + + def __init__(self, use_datetime=False, use_builtin_types=False, + *, headers=()): + super().__init__(use_datetime=use_datetime, + use_builtin_types=use_builtin_types, + headers=headers) + + def request(self, host, handler, request_body, verbose=False): + with tracer.start_as_current_span("POST XML request", context.get_current(), SpanKind.CLIENT): + current_span = trace.get_current_span() + try: + result = super().request(host, handler, request_body, verbose) + current_span.set_attribute("http.status_code", 200) + except xmlrpclib.ProtocolError as protocolError: + current_span.set_attribute("http.status_code", protocolError.errcode) + current_span.record_exception(protocolError) + raise protocolError + return result + + def send_content(self, connection, request_body): + current_span = trace.get_current_span() + if current_span != INVALID_SPAN: + current_span.set_attribute("net.peer.ip", connection.host) + current_span.set_attribute("net.peer.port", connection.port) + current_span.set_attribute("net.peer.transport", "ip_tcp") + connection.putheader("traceparent", "00-" + hex(current_span.get_span_context().trace_id)[2:].zfill(32) + "-" + hex( + current_span.get_span_context().span_id)[2:].zfill(16) + "-01") + + super().send_content(connection, request_body) +except ImportError: + def create_span(f): + @wraps(f) + def decorated(*args, **kwargs): + return f(*args, **kwargs) + return decorated