From b78511a448773ad234ea718a024c7d4b18daabca Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Wed, 24 Apr 2019 13:59:09 -0500 Subject: [PATCH 01/19] adding delete api Signed-off-by: Jesse Jaggars --- api/host.py | 183 ++++++++++++++++++++++++------------------ swagger/api.spec.yaml | 26 ++++++ test_api.py | 32 ++++++++ 3 files changed, 165 insertions(+), 76 deletions(-) diff --git a/api/host.py b/api/host.py index cad30dbd2..5932e1d95 100644 --- a/api/host.py +++ b/api/host.py @@ -29,32 +29,43 @@ def add_host_list(host_list): for host in host_list: try: (host, status_code) = _add_host(host) - response_host_list.append({'status': status_code, 'host': host}) + response_host_list.append({"status": status_code, "host": host}) except InventoryException as e: number_of_errors += 1 logger.exception("Error adding host", extra={"host": host}) response_host_list.append({**e.to_json(), "host": host}) except ValidationError as e: number_of_errors += 1 - logger.exception("Input validation error while adding host", - extra={"host": host}) - response_host_list.append({"status": 400, - "title": "Bad Request", - "detail": str(e.messages), - "type": "unknown", - "host": host}) + logger.exception( + "Input validation error while adding host", extra={"host": host} + ) + response_host_list.append( + { + "status": 400, + "title": "Bad Request", + "detail": str(e.messages), + "type": "unknown", + "host": host, + } + ) except Exception as e: number_of_errors += 1 logger.exception("Error adding host", extra={"host": host}) - response_host_list.append({"status": 500, - "title": "Error", - "type": "unknown", - "detail": "Could not complete operation", - "host": host}) - - response = {'total': len(response_host_list), - 'errors': number_of_errors, - 'data': response_host_list} + response_host_list.append( + { + "status": 500, + "title": "Error", + "type": "unknown", + "detail": "Could not complete operation", + "host": host, + } + ) + + response = { + "total": len(response_host_list), + "errors": number_of_errors, + "data": response_host_list, + } return _build_json_response(response, status=207) @@ -70,14 +81,17 @@ def _add_host(host): input_host = Host.from_json(validated_input_host_dict.data) - if (not current_identity.is_trusted_system and - current_identity.account_number != input_host.account): - raise InventoryException(title="Invalid request", - detail="The account number associated with the user does not " - "match the account number associated with the host") + if ( + not current_identity.is_trusted_system + and current_identity.account_number != input_host.account + ): + raise InventoryException( + title="Invalid request", + detail="The account number associated with the user does not " + "match the account number associated with the host", + ) - existing_host = find_existing_host(input_host.account, - input_host.canonical_facts) + existing_host = find_existing_host(input_host.account, input_host.canonical_facts) if existing_host: return update_existing_host(existing_host, input_host) @@ -96,17 +110,16 @@ def find_existing_host(account_number, canonical_facts): existing_host = find_host_by_insights_id(account_number, insights_id) if not existing_host: - existing_host = find_host_by_canonical_facts(account_number, - canonical_facts) + existing_host = find_host_by_canonical_facts(account_number, canonical_facts) return existing_host def find_host_by_insights_id(account_number, insights_id): existing_host = Host.query.filter( - (Host.account == account_number) - & (Host.canonical_facts["insights_id"].astext == insights_id) - ).first() + (Host.account == account_number) + & (Host.canonical_facts["insights_id"].astext == insights_id) + ).first() if existing_host: logger.debug("Found existing host using id match: %s", existing_host) @@ -160,9 +173,14 @@ def update_existing_host(existing_host, input_host): @api_operation @metrics.api_request_time.time() -def get_host_list(display_name=None, fqdn=None, - hostname_or_id=None, insights_id=None, - page=1, per_page=100): +def get_host_list( + display_name=None, + fqdn=None, + hostname_or_id=None, + insights_id=None, + page=1, + per_page=100, +): if fqdn: query = find_hosts_by_canonical_facts( current_identity.account_number, {"fqdn": fqdn} @@ -173,14 +191,14 @@ def get_host_list(display_name=None, fqdn=None, ) elif hostname_or_id: query = find_hosts_by_hostname_or_id( - current_identity.account_number, hostname_or_id) + current_identity.account_number, hostname_or_id + ) elif insights_id: query = find_hosts_by_canonical_facts( - current_identity.account_number, {"insights_id": insights_id}) - else: - query = Host.query.filter( - Host.account == current_identity.account_number + current_identity.account_number, {"insights_id": insights_id} ) + else: + query = Host.query.filter(Host.account == current_identity.account_number) query = query.order_by(Host.created_on, Host.id) query_results = query.paginate(page, per_page, True) @@ -193,26 +211,26 @@ def get_host_list(display_name=None, fqdn=None, def _build_paginated_host_list_response(total, page, per_page, host_list): json_host_list = [host.to_json() for host in host_list] - json_output = {"total": total, - "count": len(host_list), - "page": page, - "per_page": per_page, - "results": json_host_list, - } + json_output = { + "total": total, + "count": len(host_list), + "page": page, + "per_page": per_page, + "results": json_host_list, + } return _build_json_response(json_output, status=200) def _build_json_response(json_data, status=200): - return flask.Response(ujson.dumps(json_data), - status=status, - mimetype="application/json") + return flask.Response( + ujson.dumps(json_data), status=status, mimetype="application/json" + ) def find_hosts_by_display_name(account, display_name): logger.debug("find_hosts_by_display_name(%s)" % display_name) return Host.query.filter( - (Host.account == account) - & Host.display_name.comparator.contains(display_name) + (Host.account == account) & Host.display_name.comparator.contains(display_name) ) @@ -226,8 +244,10 @@ def find_hosts_by_canonical_facts(account_number, canonical_facts): def find_hosts_by_hostname_or_id(account_number, hostname): logger.debug("find_hosts_by_hostname_or_id(%s)", hostname) - filter_list = [Host.display_name.comparator.contains(hostname), - Host.canonical_facts['fqdn'].astext.contains(hostname), ] + filter_list = [ + Host.display_name.comparator.contains(hostname), + Host.canonical_facts["fqdn"].astext.contains(hostname), + ] try: uuid.UUID(hostname) @@ -236,19 +256,28 @@ def find_hosts_by_hostname_or_id(account_number, hostname): logger.debug("Adding id (uuid) to the filter list") except Exception as e: # Do not filter using the id - logger.debug("The hostname (%s) could not be converted into a UUID", - hostname, - exc_info=True) + logger.debug( + "The hostname (%s) could not be converted into a UUID", + hostname, + exc_info=True, + ) - return Host.query.filter(sqlalchemy.and_(*[Host.account == account_number, - sqlalchemy.or_(*filter_list)])) + return Host.query.filter( + sqlalchemy.and_(*[Host.account == account_number, sqlalchemy.or_(*filter_list)]) + ) + + +@api_operation +@metrics.api_request_time.time() +def delete_by_id(host_id_list): + hosts = _get_host_list_by_id_list(current_identity.account_number, host_id_list, order=False) + hosts.delete(synchronize_session='fetch') @api_operation @metrics.api_request_time.time() def get_host_by_id(host_id_list, page=1, per_page=100): - query = _get_host_list_by_id_list(current_identity.account_number, - host_id_list) + query = _get_host_list_by_id_list(current_identity.account_number, host_id_list) query_results = query.paginate(page, per_page, True) @@ -259,30 +288,30 @@ def get_host_by_id(host_id_list, page=1, per_page=100): ) -def _get_host_list_by_id_list(account_number, host_id_list): - return Host.query.filter( - (Host.account == account_number) - & Host.id.in_(host_id_list) - ).order_by(Host.created_on, Host.id) +def _get_host_list_by_id_list(account_number, host_id_list, order=True): + q = Host.query.filter((Host.account == account_number) & Host.id.in_(host_id_list)) + if order: + return q.order_by(Host.created_on, Host.id) + else: + return q @api_operation @metrics.api_request_time.time() def get_host_system_profile_by_id(host_id_list, page=1, per_page=100): - query = _get_host_list_by_id_list(current_identity.account_number, - host_id_list) + query = _get_host_list_by_id_list(current_identity.account_number, host_id_list) query_results = query.paginate(page, per_page, True) - response_list = [host.to_system_profile_json() - for host in query_results.items] + response_list = [host.to_system_profile_json() for host in query_results.items] - json_output = {"total": query_results.total, - "count": len(response_list), - "page": page, - "per_page": per_page, - "results": response_list, - } + json_output = { + "total": query_results.total, + "count": len(response_list), + "page": page, + "per_page": per_page, + "results": response_list, + } return _build_json_response(json_output, status=200) @@ -322,8 +351,9 @@ def patch_by_id(host_id_list, host_data): @api_operation @metrics.api_request_time.time() def replace_facts(host_id_list, namespace, fact_dict): - return update_facts_by_namespace(FactOperations.replace, host_id_list, - namespace, fact_dict) + return update_facts_by_namespace( + FactOperations.replace, host_id_list, namespace, fact_dict + ) @api_operation @@ -334,8 +364,9 @@ def merge_facts(host_id_list, namespace, fact_dict): logger.debug(error_msg) return error_msg, 400 - return update_facts_by_namespace(FactOperations.merge, host_id_list, - namespace, fact_dict) + return update_facts_by_namespace( + FactOperations.merge, host_id_list, namespace, fact_dict + ) def update_facts_by_namespace(operation, host_id_list, namespace, fact_dict): diff --git a/swagger/api.spec.yaml b/swagger/api.spec.yaml index 7ae52424e..5a498fa67 100644 --- a/swagger/api.spec.yaml +++ b/swagger/api.spec.yaml @@ -138,6 +138,32 @@ paths: description: Invalid request. '404': description: Host not found. + delete: + tags: + - hosts + summary: Delete hosts by IDs + description: Delete hosts by IDs + operationId: api.host.delete_by_id + security: + - ApiKeyAuth: [] + parameters: + - in: path + name: host_id_list + description: A comma separated list of host IDs. + required: true + schema: + type: array + items: + type: string + format: uuid + responses: + '200': + description: Successfully deleted hosts. + '400': + description: Invalid request. + '404': + description: Host not found. + '/hosts/{host_id}': patch: tags: - hosts diff --git a/test_api.py b/test_api.py index d659f6441..1705204a5 100755 --- a/test_api.py +++ b/test_api.py @@ -101,6 +101,13 @@ def put(self, path, data, status=200, return_response_as_json=True): self.client().put, path, data, status, return_response_as_json ) + def delete(self, path, status=200, return_response_as_json=True): + return self._response_check( + self.client().delete(path, headers=self._get_valid_auth_header()), + status, + return_response_as_json, + ) + def verify_error_response(self, response, expected_title=None, expected_status=None, expected_detail=None, expected_type=None): @@ -211,6 +218,31 @@ def _validate_host(self, received_host, expected_host, self.assertIsNotNone(received_host["updated"]) +class DeleteHostsTestCase(DBAPITestCase): + def test_create_then_delete(self): + facts = None + + host_data = HostWrapper(test_data(facts=facts)) + + # Create the host + response = self.post(HOST_URL, [host_data.data()], 207) + + self._verify_host_status(response, 0, 201) + + created_host = self._pluck_host_from_response(response, 0) + + original_id = created_host["id"] + + # Get the host + response = self.get(HOST_URL + "/" + original_id, 200) + + # Delete the host + response = self.delete(HOST_URL + "/" + original_id, 200, return_response_as_json=False) + + # Try to get the host again + response = self.get(HOST_URL + "/" + original_id, 200) + + class CreateHostsTestCase(DBAPITestCase): def test_create_and_update(self): facts = None From b24a23b9847bcb730408e700f3054ded2ae5aeb0 Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Thu, 25 Apr 2019 10:44:02 -0500 Subject: [PATCH 02/19] emitting events for deletes Signed-off-by: Jesse Jaggars --- api/host.py | 11 +++++++++-- tasks/__init__.py | 21 ++++++++++++++------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/api/host.py b/api/host.py index 5932e1d95..c9485f7aa 100644 --- a/api/host.py +++ b/api/host.py @@ -14,6 +14,9 @@ from app.logging import get_logger from api import api_operation, metrics +from tasks import emit_event +import events + TAG_OPERATIONS = ("apply", "remove") FactOperations = Enum("FactOperations", ["merge", "replace"]) @@ -270,8 +273,12 @@ def find_hosts_by_hostname_or_id(account_number, hostname): @api_operation @metrics.api_request_time.time() def delete_by_id(host_id_list): - hosts = _get_host_list_by_id_list(current_identity.account_number, host_id_list, order=False) - hosts.delete(synchronize_session='fetch') + hosts = _get_host_list_by_id_list( + current_identity.account_number, host_id_list, order=False + ) + hosts.delete(synchronize_session="fetch") + for id_ in host_id_list: + emit_event(events.delete(id_)) @api_operation diff --git a/tasks/__init__.py b/tasks/__init__.py index 8683183c8..02443be35 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -1,6 +1,7 @@ import os import json from kafka import KafkaConsumer +from kafka import KafkaProducer from threading import Thread from api import metrics @@ -13,6 +14,13 @@ TOPIC = os.environ.get("KAFKA_TOPIC", "platform.system-profile") KAFKA_GROUP = os.environ.get("KAFKA_GROUP", "inventory") BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "kafka:29092") +EVENT_TOPIC = os.environ.get("KAFKA_EVENT_TOPIC", "platform.inventory.events") + +producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS) + + +def emit_event(e): + producer.send(EVENT_TOPIC, value=e) @metrics.system_profile_commit_processing_time.time() @@ -26,7 +34,9 @@ def msg_handler(parsed): if host is None: logger.error("Host with id [%s] not found!", id_) return - logger.info("Processing message id=%s request_id=%s", parsed["id"], parsed["request_id"]) + logger.info( + "Processing message id=%s request_id=%s", parsed["id"], parsed["request_id"] + ) profile = SystemProfileSchema(strict=True).load(parsed["system_profile"]).data host._update_system_profile(profile) db.session.commit() @@ -38,9 +48,8 @@ def start_consumer(flask_app, handler=msg_handler, consumer=None): if consumer is None: consumer = KafkaConsumer( - TOPIC, - group_id=KAFKA_GROUP, - bootstrap_servers=BOOTSTRAP_SERVERS) + TOPIC, group_id=KAFKA_GROUP, bootstrap_servers=BOOTSTRAP_SERVERS + ) def _f(): with flask_app.app_context(): @@ -55,7 +64,5 @@ def _f(): logger.exception("uncaught exception in handler, moving on.") metrics.system_profile_failure_count.inc() - t = Thread( - target=_f, - daemon=True) + t = Thread(target=_f, daemon=True) t.start() From 56b6e1f39387cc91bd71a5fe7600b914aa02ce5d Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Thu, 25 Apr 2019 13:27:42 -0500 Subject: [PATCH 03/19] adding events module Signed-off-by: Jesse Jaggars --- app/events.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 app/events.py diff --git a/app/events.py b/app/events.py new file mode 100644 index 000000000..d657afcaa --- /dev/null +++ b/app/events.py @@ -0,0 +1,17 @@ +import logging +from datetime import datetime +from marshmallow import Schema, fields, validate, validates, ValidationError + +logger = logging.getLogger(__name__) + + +class HostEvent(Schema): + id = fields.UUID() + timestamp = fields.DateTime() + type = fields.Str() + + +def delete(id): + return HostEvent(strict=True).load( + {"id": id, "timestamp": datetime.now(), "type": "delete"} + ) From 7b90090efa2fc26d1534ea889c616dc1bd08bb38 Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Thu, 25 Apr 2019 13:30:33 -0500 Subject: [PATCH 04/19] removing unused imports Signed-off-by: Jesse Jaggars --- app/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/events.py b/app/events.py index d657afcaa..32743bca6 100644 --- a/app/events.py +++ b/app/events.py @@ -1,6 +1,6 @@ import logging from datetime import datetime -from marshmallow import Schema, fields, validate, validates, ValidationError +from marshmallow import Schema, fields logger = logging.getLogger(__name__) From 0015e222490ea4ab81bc1ed21622fabeb460d9da Mon Sep 17 00:00:00 2001 From: Derek Horton Date: Thu, 25 Apr 2019 16:05:25 -0500 Subject: [PATCH 05/19] Modified how the event json is built --- api/host.py | 4 +--- app/events.py | 8 ++++---- tasks/__init__.py | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/api/host.py b/api/host.py index c9485f7aa..93928e9c9 100644 --- a/api/host.py +++ b/api/host.py @@ -7,15 +7,13 @@ from flask_api import status from marshmallow import ValidationError -from app import db +from app import db, events from app.models import Host, HostSchema, PatchHostSchema from app.auth import current_identity from app.exceptions import InventoryException from app.logging import get_logger from api import api_operation, metrics - from tasks import emit_event -import events TAG_OPERATIONS = ("apply", "remove") diff --git a/app/events.py b/app/events.py index 32743bca6..13f3de996 100644 --- a/app/events.py +++ b/app/events.py @@ -7,11 +7,11 @@ class HostEvent(Schema): id = fields.UUID() - timestamp = fields.DateTime() + timestamp = fields.DateTime(format="iso8601") type = fields.Str() def delete(id): - return HostEvent(strict=True).load( - {"id": id, "timestamp": datetime.now(), "type": "delete"} - ) + return HostEvent(strict=True).dumps( + {"id": id, "timestamp": datetime.utcnow(), "type": "delete"} + ).data diff --git a/tasks/__init__.py b/tasks/__init__.py index 02443be35..92c1e51de 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -20,7 +20,7 @@ def emit_event(e): - producer.send(EVENT_TOPIC, value=e) + producer.send(EVENT_TOPIC, value=e.encode("utf-8")) @metrics.system_profile_commit_processing_time.time() From 8c8c1551c560db5b6080dad2101463581fe93a87 Mon Sep 17 00:00:00 2001 From: dehort Date: Mon, 29 Apr 2019 10:47:09 -0500 Subject: [PATCH 06/19] Changing the formatting back to the original (#247) --- api/host.py | 171 +++++++++++++++++++++------------------------- tasks/__init__.py | 13 ++-- 2 files changed, 83 insertions(+), 101 deletions(-) diff --git a/api/host.py b/api/host.py index 93928e9c9..3392960c3 100644 --- a/api/host.py +++ b/api/host.py @@ -30,43 +30,32 @@ def add_host_list(host_list): for host in host_list: try: (host, status_code) = _add_host(host) - response_host_list.append({"status": status_code, "host": host}) + response_host_list.append({'status': status_code, 'host': host}) except InventoryException as e: number_of_errors += 1 logger.exception("Error adding host", extra={"host": host}) response_host_list.append({**e.to_json(), "host": host}) except ValidationError as e: number_of_errors += 1 - logger.exception( - "Input validation error while adding host", extra={"host": host} - ) - response_host_list.append( - { - "status": 400, - "title": "Bad Request", - "detail": str(e.messages), - "type": "unknown", - "host": host, - } - ) + logger.exception("Input validation error while adding host", + extra={"host": host}) + response_host_list.append({"status": 400, + "title": "Bad Request", + "detail": str(e.messages), + "type": "unknown", + "host": host}) except Exception as e: number_of_errors += 1 logger.exception("Error adding host", extra={"host": host}) - response_host_list.append( - { - "status": 500, - "title": "Error", - "type": "unknown", - "detail": "Could not complete operation", - "host": host, - } - ) - - response = { - "total": len(response_host_list), - "errors": number_of_errors, - "data": response_host_list, - } + response_host_list.append({"status": 500, + "title": "Error", + "type": "unknown", + "detail": "Could not complete operation", + "host": host}) + + response = {'total': len(response_host_list), + 'errors': number_of_errors, + 'data': response_host_list} return _build_json_response(response, status=207) @@ -82,17 +71,14 @@ def _add_host(host): input_host = Host.from_json(validated_input_host_dict.data) - if ( - not current_identity.is_trusted_system - and current_identity.account_number != input_host.account - ): - raise InventoryException( - title="Invalid request", - detail="The account number associated with the user does not " - "match the account number associated with the host", - ) + if (not current_identity.is_trusted_system and + current_identity.account_number != input_host.account): + raise InventoryException(title="Invalid request", + detail="The account number associated with the user does not " + "match the account number associated with the host") - existing_host = find_existing_host(input_host.account, input_host.canonical_facts) + existing_host = find_existing_host(input_host.account, + input_host.canonical_facts) if existing_host: return update_existing_host(existing_host, input_host) @@ -111,16 +97,17 @@ def find_existing_host(account_number, canonical_facts): existing_host = find_host_by_insights_id(account_number, insights_id) if not existing_host: - existing_host = find_host_by_canonical_facts(account_number, canonical_facts) + existing_host = find_host_by_canonical_facts(account_number, + canonical_facts) return existing_host def find_host_by_insights_id(account_number, insights_id): existing_host = Host.query.filter( - (Host.account == account_number) - & (Host.canonical_facts["insights_id"].astext == insights_id) - ).first() + (Host.account == account_number) + & (Host.canonical_facts["insights_id"].astext == insights_id) + ).first() if existing_host: logger.debug("Found existing host using id match: %s", existing_host) @@ -174,14 +161,9 @@ def update_existing_host(existing_host, input_host): @api_operation @metrics.api_request_time.time() -def get_host_list( - display_name=None, - fqdn=None, - hostname_or_id=None, - insights_id=None, - page=1, - per_page=100, -): +def get_host_list(display_name=None, fqdn=None, + hostname_or_id=None, insights_id=None, + page=1, per_page=100): if fqdn: query = find_hosts_by_canonical_facts( current_identity.account_number, {"fqdn": fqdn} @@ -192,14 +174,14 @@ def get_host_list( ) elif hostname_or_id: query = find_hosts_by_hostname_or_id( - current_identity.account_number, hostname_or_id - ) + current_identity.account_number, hostname_or_id) elif insights_id: query = find_hosts_by_canonical_facts( - current_identity.account_number, {"insights_id": insights_id} - ) + current_identity.account_number, {"insights_id": insights_id}) else: - query = Host.query.filter(Host.account == current_identity.account_number) + query = Host.query.filter( + Host.account == current_identity.account_number + ) query = query.order_by(Host.created_on, Host.id) query_results = query.paginate(page, per_page, True) @@ -212,26 +194,26 @@ def get_host_list( def _build_paginated_host_list_response(total, page, per_page, host_list): json_host_list = [host.to_json() for host in host_list] - json_output = { - "total": total, - "count": len(host_list), - "page": page, - "per_page": per_page, - "results": json_host_list, - } + json_output = {"total": total, + "count": len(host_list), + "page": page, + "per_page": per_page, + "results": json_host_list, + } return _build_json_response(json_output, status=200) def _build_json_response(json_data, status=200): - return flask.Response( - ujson.dumps(json_data), status=status, mimetype="application/json" - ) + return flask.Response(ujson.dumps(json_data), + status=status, + mimetype="application/json") def find_hosts_by_display_name(account, display_name): logger.debug("find_hosts_by_display_name(%s)" % display_name) return Host.query.filter( - (Host.account == account) & Host.display_name.comparator.contains(display_name) + (Host.account == account) + & Host.display_name.comparator.contains(display_name) ) @@ -245,10 +227,8 @@ def find_hosts_by_canonical_facts(account_number, canonical_facts): def find_hosts_by_hostname_or_id(account_number, hostname): logger.debug("find_hosts_by_hostname_or_id(%s)", hostname) - filter_list = [ - Host.display_name.comparator.contains(hostname), - Host.canonical_facts["fqdn"].astext.contains(hostname), - ] + filter_list = [Host.display_name.comparator.contains(hostname), + Host.canonical_facts['fqdn'].astext.contains(hostname), ] try: uuid.UUID(hostname) @@ -257,15 +237,12 @@ def find_hosts_by_hostname_or_id(account_number, hostname): logger.debug("Adding id (uuid) to the filter list") except Exception as e: # Do not filter using the id - logger.debug( - "The hostname (%s) could not be converted into a UUID", - hostname, - exc_info=True, - ) + logger.debug("The hostname (%s) could not be converted into a UUID", + hostname, + exc_info=True) - return Host.query.filter( - sqlalchemy.and_(*[Host.account == account_number, sqlalchemy.or_(*filter_list)]) - ) + return Host.query.filter(sqlalchemy.and_(*[Host.account == account_number, + sqlalchemy.or_(*filter_list)])) @api_operation @@ -282,7 +259,8 @@ def delete_by_id(host_id_list): @api_operation @metrics.api_request_time.time() def get_host_by_id(host_id_list, page=1, per_page=100): - query = _get_host_list_by_id_list(current_identity.account_number, host_id_list) + query = _get_host_list_by_id_list(current_identity.account_number, + host_id_list) query_results = query.paginate(page, per_page, True) @@ -294,7 +272,11 @@ def get_host_by_id(host_id_list, page=1, per_page=100): def _get_host_list_by_id_list(account_number, host_id_list, order=True): - q = Host.query.filter((Host.account == account_number) & Host.id.in_(host_id_list)) + q = Host.query.filter( + (Host.account == account_number) + & Host.id.in_(host_id_list) + ) + if order: return q.order_by(Host.created_on, Host.id) else: @@ -304,19 +286,20 @@ def _get_host_list_by_id_list(account_number, host_id_list, order=True): @api_operation @metrics.api_request_time.time() def get_host_system_profile_by_id(host_id_list, page=1, per_page=100): - query = _get_host_list_by_id_list(current_identity.account_number, host_id_list) + query = _get_host_list_by_id_list(current_identity.account_number, + host_id_list) query_results = query.paginate(page, per_page, True) - response_list = [host.to_system_profile_json() for host in query_results.items] + response_list = [host.to_system_profile_json() + for host in query_results.items] - json_output = { - "total": query_results.total, - "count": len(response_list), - "page": page, - "per_page": per_page, - "results": response_list, - } + json_output = {"total": query_results.total, + "count": len(response_list), + "page": page, + "per_page": per_page, + "results": response_list, + } return _build_json_response(json_output, status=200) @@ -356,9 +339,8 @@ def patch_by_id(host_id_list, host_data): @api_operation @metrics.api_request_time.time() def replace_facts(host_id_list, namespace, fact_dict): - return update_facts_by_namespace( - FactOperations.replace, host_id_list, namespace, fact_dict - ) + return update_facts_by_namespace(FactOperations.replace, host_id_list, + namespace, fact_dict) @api_operation @@ -369,9 +351,8 @@ def merge_facts(host_id_list, namespace, fact_dict): logger.debug(error_msg) return error_msg, 400 - return update_facts_by_namespace( - FactOperations.merge, host_id_list, namespace, fact_dict - ) + return update_facts_by_namespace(FactOperations.merge, host_id_list, + namespace, fact_dict) def update_facts_by_namespace(operation, host_id_list, namespace, fact_dict): diff --git a/tasks/__init__.py b/tasks/__init__.py index 92c1e51de..bc698e7d5 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -34,9 +34,7 @@ def msg_handler(parsed): if host is None: logger.error("Host with id [%s] not found!", id_) return - logger.info( - "Processing message id=%s request_id=%s", parsed["id"], parsed["request_id"] - ) + logger.info("Processing message id=%s request_id=%s", parsed["id"], parsed["request_id"]) profile = SystemProfileSchema(strict=True).load(parsed["system_profile"]).data host._update_system_profile(profile) db.session.commit() @@ -48,8 +46,9 @@ def start_consumer(flask_app, handler=msg_handler, consumer=None): if consumer is None: consumer = KafkaConsumer( - TOPIC, group_id=KAFKA_GROUP, bootstrap_servers=BOOTSTRAP_SERVERS - ) + TOPIC, + group_id=KAFKA_GROUP, + bootstrap_servers=BOOTSTRAP_SERVERS) def _f(): with flask_app.app_context(): @@ -64,5 +63,7 @@ def _f(): logger.exception("uncaught exception in handler, moving on.") metrics.system_profile_failure_count.inc() - t = Thread(target=_f, daemon=True) + t = Thread( + target=_f, + daemon=True) t.start() From 91650a6d318c29a090b347ddfe2d24c75cd40103 Mon Sep 17 00:00:00 2001 From: dehort Date: Mon, 29 Apr 2019 10:48:46 -0500 Subject: [PATCH 07/19] Adding a couple of metrics for the host delete operation (#248) * Adding a couple of metrics for the host delete operation * Commit the deleted hosts * Only emit events for hosts that were actually deleted * Verify the host is deleted in the test case --- api/host.py | 9 ++++++--- api/metrics.py | 4 ++++ test_api.py | 3 +++ 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/api/host.py b/api/host.py index 3392960c3..c49b27545 100644 --- a/api/host.py +++ b/api/host.py @@ -251,9 +251,12 @@ def delete_by_id(host_id_list): hosts = _get_host_list_by_id_list( current_identity.account_number, host_id_list, order=False ) - hosts.delete(synchronize_session="fetch") - for id_ in host_id_list: - emit_event(events.delete(id_)) + with metrics.delete_host_processing_time.time(): + hosts.delete(synchronize_session="fetch") + db.session.commit() + metrics.delete_host_count.inc(hosts.count()) + for deleted_host in hosts: + emit_event(events.delete(deleted_host.id)) @api_operation diff --git a/api/metrics.py b/api/metrics.py index 13061c9fb..648443523 100644 --- a/api/metrics.py +++ b/api/metrics.py @@ -14,6 +14,10 @@ "The total amount of hosts created") update_host_count = Counter("inventory_update_host_count", "The total amount of hosts updated") +delete_host_count = Counter("inventory_delete_host_count", + "The total amount of hosts deleted") +delete_host_processing_time = Summary("inventory_delete_host_commit_seconds", + "Time spent deleting hosts from the database") login_failure_count = Counter("inventory_login_failure_count", "The total amount of failed login attempts") system_profile_deserialization_time = Summary("inventory_system_profile_deserialization_time", diff --git a/test_api.py b/test_api.py index 1705204a5..0c36e4cd0 100755 --- a/test_api.py +++ b/test_api.py @@ -242,6 +242,9 @@ def test_create_then_delete(self): # Try to get the host again response = self.get(HOST_URL + "/" + original_id, 200) + self.assertEqual(response["count"], 0) + self.assertEqual(response["total"], 0) + self.assertEqual(response["results"], []) class CreateHostsTestCase(DBAPITestCase): def test_create_and_update(self): From 961523e29ec3df5b5992d5339225c1a053ab8bbf Mon Sep 17 00:00:00 2001 From: dehort Date: Mon, 29 Apr 2019 13:05:25 -0500 Subject: [PATCH 08/19] Delete hosts add info to readme (#251) * Adding delete event info to the README --- README.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/README.md b/README.md index 03e3d42c3..bf197bd42 100644 --- a/README.md +++ b/README.md @@ -176,6 +176,21 @@ will be added to the existing host entry. If the canonical facts based lookup does not locate an existing host, then a new host entry is created. +#### Host deletion + +Hosts can be deleted by using the DELETE HTTP Method on the _/hosts/id_ endpoint. +When a host is deleted, the inventory service will send an event message +to the _platform.inventory.events_ message queue. The delete event message +will look like the following: + +``` + {"id": , "timestamp": , "type": "delete"} +``` + + - type: type of host change (delete in this case) + - id: Inventory host id of the host that was deleted + - timestamp: the time at which the host was deleted + #### Testing API Calls It is necessary to pass an authentication header along on each call to the From 7de7ec5b2b6360b694ec3da141c581a9dcedf0e2 Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Wed, 15 May 2019 13:19:13 -0500 Subject: [PATCH 09/19] using a null producer when kafka isn't available Signed-off-by: Jesse Jaggars --- app/__init__.py | 6 +++--- app/config.py | 13 ++++++++----- tasks/__init__.py | 27 +++++++++++++++++---------- test_api.py | 10 +++++++--- test_unit.py | 10 +++------- 5 files changed, 38 insertions(+), 28 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 8cfa5f1b9..e82570a5b 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,4 +1,3 @@ -import os import connexion import yaml @@ -30,7 +29,8 @@ def create_app(config_name): # needs to be setup before the flask app is initialized. configure_logging(config_name) - app_config = Config(config_name) + app_config = Config() + app_config.log_configuration(config_name) connexion_app = connexion.App( "inventory", specification_dir="./swagger/", options=connexion_options @@ -75,7 +75,7 @@ def set_request_id(): REQUEST_ID_HEADER, UNKNOWN_REQUEST_ID_VALUE) - if all(map(os.environ.get, ["KAFKA_TOPIC", "KAFKA_GROUP", "KAFKA_BOOTSTRAP_SERVERS"])): + if app_config.kafka_enabled: start_consumer(flask_app) return flask_app diff --git a/app/config.py b/app/config.py index e5707c83b..db777fe99 100644 --- a/app/config.py +++ b/app/config.py @@ -5,9 +5,8 @@ class Config: - def __init__(self, config_name): + def __init__(self): self.logger = get_logger(__name__) - self._config_name = config_name self._db_user = os.getenv("INVENTORY_DB_USER", "insights") self._db_password = os.getenv("INVENTORY_DB_PASS", "insights") @@ -25,7 +24,11 @@ def __init__(self, config_name): self.api_urls = [self.api_url_path_prefix, self.legacy_api_url_path_prefix] - self._log_configuration() + self.system_profile_topic = os.environ.get("KAFKA_TOPIC", "platform.system-profile") + self.consumer_group = os.environ.get("KAFKA_GROUP", "inventory") + self.bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "kafka:29092") + self.event_topic = os.environ.get("KAFKA_EVENT_TOPIC", "platform.inventory.events") + self.kafka_enabled = all(map(os.environ.get, ["KAFKA_TOPIC", "KAFKA_GROUP", "KAFKA_BOOTSTRAP_SERVERS"])) def _build_base_url_path(self): app_name = os.getenv("APP_NAME", "inventory") @@ -39,8 +42,8 @@ def _build_api_path(self): api_path = f"{base_url_path}/{version}" return api_path - def _log_configuration(self): - if self._config_name != "testing": + def log_configuration(self, config_name): + if config_name != "testing": self.logger.info("Insights Host Inventory Configuration:") self.logger.info("Build Version: %s" % get_build_version()) self.logger.info("API URL Path: %s" % self.api_url_path_prefix) diff --git a/tasks/__init__.py b/tasks/__init__.py index bc698e7d5..5a26bde0f 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -1,4 +1,3 @@ -import os import json from kafka import KafkaConsumer from kafka import KafkaProducer @@ -7,20 +6,28 @@ from api import metrics from app import db from app.logging import threadctx, get_logger +from app.config import Config from app.models import Host, SystemProfileSchema logger = get_logger(__name__) -TOPIC = os.environ.get("KAFKA_TOPIC", "platform.system-profile") -KAFKA_GROUP = os.environ.get("KAFKA_GROUP", "inventory") -BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "kafka:29092") -EVENT_TOPIC = os.environ.get("KAFKA_EVENT_TOPIC", "platform.inventory.events") +cfg = Config() -producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS) + +class NullProducer: + + def send(self, topic, value=None): + pass + + +if cfg.kafka_enabled: + producer = KafkaProducer(bootstrap_servers=cfg.bootstrap_servers) +else: + producer = NullProducer() def emit_event(e): - producer.send(EVENT_TOPIC, value=e.encode("utf-8")) + producer.send(cfg.event_topic, value=e.encode("utf-8")) @metrics.system_profile_commit_processing_time.time() @@ -46,9 +53,9 @@ def start_consumer(flask_app, handler=msg_handler, consumer=None): if consumer is None: consumer = KafkaConsumer( - TOPIC, - group_id=KAFKA_GROUP, - bootstrap_servers=BOOTSTRAP_SERVERS) + cfg.system_profile_topic, + group_id=cfg.consumer_group, + bootstrap_servers=cfg.bootstrap_servers) def _f(): with flask_app.app_context(): diff --git a/test_api.py b/test_api.py index 0c36e4cd0..7f4e8b73c 100755 --- a/test_api.py +++ b/test_api.py @@ -233,19 +233,22 @@ def test_create_then_delete(self): original_id = created_host["id"] + url = HOST_URL + "/" + original_id + # Get the host - response = self.get(HOST_URL + "/" + original_id, 200) + self.get(url, 200) # Delete the host - response = self.delete(HOST_URL + "/" + original_id, 200, return_response_as_json=False) + self.delete(url, 200, return_response_as_json=False) # Try to get the host again - response = self.get(HOST_URL + "/" + original_id, 200) + response = self.get(url, 200) self.assertEqual(response["count"], 0) self.assertEqual(response["total"], 0) self.assertEqual(response["results"], []) + class CreateHostsTestCase(DBAPITestCase): def test_create_and_update(self): facts = None @@ -1791,5 +1794,6 @@ def test_version(self): response = self.get(VERSION_URL, 200) assert response['version'] is not None + if __name__ == "__main__": unittest.main() diff --git a/test_unit.py b/test_unit.py index b56491fd5..2d054adec 100755 --- a/test_unit.py +++ b/test_unit.py @@ -1,7 +1,5 @@ #!/usr/bin/env python -import os - from api import api_operation from app.config import Config from app.auth.identity import (Identity, @@ -15,7 +13,6 @@ from unittest.mock import Mock, patch import test.support from test.support import EnvironmentVarGuard -from werkzeug.exceptions import Forbidden class ApiOperationTestCase(TestCase): @@ -211,7 +208,7 @@ def test_configuration_with_env_vars(self): env.set("PATH_PREFIX", path_prefix) env.set("INVENTORY_MANAGEMENT_URL_PATH_PREFIX", expected_mgmt_url_path_prefix) - conf = Config("testing") + conf = Config() self.assertEqual(conf.db_uri, "postgresql://fredflintstone:bedrock1234@localhost/SlateRockAndGravel") self.assertEqual(conf.db_pool_timeout, 3) @@ -232,7 +229,7 @@ def test_config_default_settings(self): "INVENTORY_MANAGEMENT_URL_PATH_PREFIX",): env.unset(env_var) - conf = Config("testing") + conf = Config() self.assertEqual(conf.db_uri, "postgresql://insights:insights@localhost/insights") self.assertEqual(conf.api_url_path_prefix, expected_api_path) @@ -244,8 +241,7 @@ def test_config_development_settings(self): with test.support.EnvironmentVarGuard() as env: env.set("INVENTORY_DB_POOL_TIMEOUT", "3") - # Test a different "type" (development) of config settings - conf = Config("development") + conf = Config() self.assertEqual(conf.db_pool_timeout, 3) From bfdd88aaf00e7bb978c15ce133a6d5599313b274 Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Wed, 15 May 2019 13:30:04 -0500 Subject: [PATCH 10/19] fixing up old patch spec Signed-off-by: Jesse Jaggars --- swagger/api.spec.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/swagger/api.spec.yaml b/swagger/api.spec.yaml index 5a498fa67..e1e75290b 100644 --- a/swagger/api.spec.yaml +++ b/swagger/api.spec.yaml @@ -163,7 +163,6 @@ paths: description: Invalid request. '404': description: Host not found. - '/hosts/{host_id}': patch: tags: - hosts From 7dbe78517d23102ed8d36b9c4d866e42dd029095 Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Wed, 15 May 2019 13:57:25 -0500 Subject: [PATCH 11/19] testing the event emit function Signed-off-by: Jesse Jaggars --- api/host.py | 7 ++++--- test_api.py | 14 ++++++++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/api/host.py b/api/host.py index c49b27545..065fa8623 100644 --- a/api/host.py +++ b/api/host.py @@ -248,13 +248,14 @@ def find_hosts_by_hostname_or_id(account_number, hostname): @api_operation @metrics.api_request_time.time() def delete_by_id(host_id_list): - hosts = _get_host_list_by_id_list( + query = _get_host_list_by_id_list( current_identity.account_number, host_id_list, order=False ) + hosts = query.all() with metrics.delete_host_processing_time.time(): - hosts.delete(synchronize_session="fetch") + query.delete(synchronize_session="fetch") db.session.commit() - metrics.delete_host_count.inc(hosts.count()) + metrics.delete_host_count.inc(len(hosts)) for deleted_host in hosts: emit_event(events.delete(deleted_host.id)) diff --git a/test_api.py b/test_api.py index 7f4e8b73c..7f37d7a30 100755 --- a/test_api.py +++ b/test_api.py @@ -7,7 +7,7 @@ import uuid import copy import tempfile -from app import create_app, db +from app import create_app, db, events from app.auth.identity import Identity from app.utils import HostWrapper from base64 import b64encode @@ -238,8 +238,18 @@ def test_create_then_delete(self): # Get the host self.get(url, 200) + class MockEmitEvent: + + def __init__(self): + self.events = [] + + def __call__(self, e): + self.events.append(e) + # Delete the host - self.delete(url, 200, return_response_as_json=False) + with unittest.mock.patch("api.host.emit_event", new=MockEmitEvent()) as m: + self.delete(url, 200, return_response_as_json=False) + assert original_id in m.events[0] # Try to get the host again response = self.get(url, 200) From ac2a81f834725b0ba82c9ff664cb05c9a5b14b98 Mon Sep 17 00:00:00 2001 From: dehort Date: Wed, 15 May 2019 16:03:13 -0500 Subject: [PATCH 12/19] Update README.md Co-Authored-By: Glutexo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index bf197bd42..327a00735 100644 --- a/README.md +++ b/README.md @@ -183,7 +183,7 @@ When a host is deleted, the inventory service will send an event message to the _platform.inventory.events_ message queue. The delete event message will look like the following: -``` +```json {"id": , "timestamp": , "type": "delete"} ``` From 43b9b057d7780921e157ce5b075bf0fb277bb02d Mon Sep 17 00:00:00 2001 From: Derek Horton Date: Thu, 16 May 2019 09:03:25 -0500 Subject: [PATCH 13/19] Return a 404 when no hosts are found on the delete operation --- api/host.py | 5 +++++ test_api.py | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/api/host.py b/api/host.py index 065fa8623..d38d0de31 100644 --- a/api/host.py +++ b/api/host.py @@ -251,7 +251,12 @@ def delete_by_id(host_id_list): query = _get_host_list_by_id_list( current_identity.account_number, host_id_list, order=False ) + hosts = query.all() + + if not hosts: + return flask.abort(status.HTTP_404_NOT_FOUND) + with metrics.delete_host_processing_time.time(): query.delete(synchronize_session="fetch") db.session.commit() diff --git a/test_api.py b/test_api.py index 7f37d7a30..94b2048c6 100755 --- a/test_api.py +++ b/test_api.py @@ -258,6 +258,11 @@ def __call__(self, e): self.assertEqual(response["total"], 0) self.assertEqual(response["results"], []) + def test_delete_non_existent_host(self): + url = HOST_URL + "/" + generate_uuid() + + self.delete(url, 404) + class CreateHostsTestCase(DBAPITestCase): def test_create_and_update(self): From 19c25f74185fb73cd1f561ec2e246cf4047dea35 Mon Sep 17 00:00:00 2001 From: Derek Horton Date: Thu, 16 May 2019 10:04:22 -0500 Subject: [PATCH 14/19] Use the PreCreatedHostsBaseTestCase to implement the delete tests --- test_api.py | 82 +++++++++++++++++++++++------------------------------ 1 file changed, 36 insertions(+), 46 deletions(-) diff --git a/test_api.py b/test_api.py index 94b2048c6..cc9c50d38 100755 --- a/test_api.py +++ b/test_api.py @@ -218,52 +218,6 @@ def _validate_host(self, received_host, expected_host, self.assertIsNotNone(received_host["updated"]) -class DeleteHostsTestCase(DBAPITestCase): - def test_create_then_delete(self): - facts = None - - host_data = HostWrapper(test_data(facts=facts)) - - # Create the host - response = self.post(HOST_URL, [host_data.data()], 207) - - self._verify_host_status(response, 0, 201) - - created_host = self._pluck_host_from_response(response, 0) - - original_id = created_host["id"] - - url = HOST_URL + "/" + original_id - - # Get the host - self.get(url, 200) - - class MockEmitEvent: - - def __init__(self): - self.events = [] - - def __call__(self, e): - self.events.append(e) - - # Delete the host - with unittest.mock.patch("api.host.emit_event", new=MockEmitEvent()) as m: - self.delete(url, 200, return_response_as_json=False) - assert original_id in m.events[0] - - # Try to get the host again - response = self.get(url, 200) - - self.assertEqual(response["count"], 0) - self.assertEqual(response["total"], 0) - self.assertEqual(response["results"], []) - - def test_delete_non_existent_host(self): - url = HOST_URL + "/" + generate_uuid() - - self.delete(url, 404) - - class CreateHostsTestCase(DBAPITestCase): def test_create_and_update(self): facts = None @@ -1351,6 +1305,42 @@ def test_invalid_data(self): expected_status=400) +class DeleteHostsTestCase(PreCreatedHostsBaseTestCase): + + def test_create_then_delete(self): + original_id = self.added_hosts[0].id + + url = HOST_URL + "/" + original_id + + # Get the host + self.get(url, 200) + + class MockEmitEvent: + + def __init__(self): + self.events = [] + + def __call__(self, e): + self.events.append(e) + + # Delete the host + with unittest.mock.patch("api.host.emit_event", new=MockEmitEvent()) as m: + self.delete(url, 200, return_response_as_json=False) + assert original_id in m.events[0] + + # Try to get the host again + response = self.get(url, 200) + + self.assertEqual(response["count"], 0) + self.assertEqual(response["total"], 0) + self.assertEqual(response["results"], []) + + def test_delete_non_existent_host(self): + url = HOST_URL + "/" + generate_uuid() + + self.delete(url, 404) + + class QueryTestCase(PreCreatedHostsBaseTestCase): def test_query_all(self): response = self.get(HOST_URL, 200) From 199b835d73f4c06c259c6507648653ebfcb26afe Mon Sep 17 00:00:00 2001 From: Derek Horton Date: Thu, 16 May 2019 13:02:33 -0500 Subject: [PATCH 15/19] Add kafka server to the development docker compose file --- dev.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/dev.yml b/dev.yml index 062a47705..af1fe9f9b 100644 --- a/dev.yml +++ b/dev.yml @@ -9,3 +9,19 @@ services: POSTGRES_DB: insights ports: - "5432:5432" + zookeeper: + image: confluentinc/cp-zookeeper + environment: + - ZOOKEEPER_CLIENT_PORT=32181 + - ZOOKEEPER_SERVER_ID=1 + kafka: + image: confluentinc/cp-kafka + ports: + - 29092:29092 + depends_on: + - zookeeper + environment: + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092 + - KAFKA_BROKER_ID=1 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181 From 47e63167bc50f44268e47b03875c55c0787fb2e4 Mon Sep 17 00:00:00 2001 From: Derek Horton Date: Thu, 16 May 2019 15:10:15 -0500 Subject: [PATCH 16/19] Add some more logging around the event producer --- logconfig.ini | 2 +- tasks/__init__.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/logconfig.ini b/logconfig.ini index 321262ecf..d477ed78f 100644 --- a/logconfig.ini +++ b/logconfig.ini @@ -42,7 +42,7 @@ propagate=0 qualname=inventory.api [logger_tasks] -level=INFO +level=DEBUG handlers=logstash propagate=0 qualname=inventory.tasks diff --git a/tasks/__init__.py b/tasks/__init__.py index 5a26bde0f..ccc17e176 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -17,12 +17,15 @@ class NullProducer: def send(self, topic, value=None): - pass + logger.debug("NullProducer - logging message: topic (%s) - message: %s " % + (topic, value)) if cfg.kafka_enabled: + logger.info("Starting KafkaProducer()") producer = KafkaProducer(bootstrap_servers=cfg.bootstrap_servers) else: + logger.info("Starting NullProducer()") producer = NullProducer() From 5015d8904fd51a6e5321a5169e964fb819ac6e55 Mon Sep 17 00:00:00 2001 From: Derek Horton Date: Thu, 16 May 2019 15:16:44 -0500 Subject: [PATCH 17/19] Modified the way the tasks module is initialized. The init_tasks() method will initilize / start the event producer and system profile consumer if kafka is enabled. --- app/__init__.py | 5 ++--- tasks/__init__.py | 43 +++++++++++++++++++++++++++++-------------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index e82570a5b..120b5531c 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -10,7 +10,7 @@ from app.exceptions import InventoryException from app.logging import configure_logging, threadctx from app.validators import verify_uuid_format # noqa: 401 -from tasks import start_consumer +from tasks import init_tasks REQUEST_ID_HEADER = "x-rh-insights-request-id" UNKNOWN_REQUEST_ID_VALUE = "-1" @@ -75,7 +75,6 @@ def set_request_id(): REQUEST_ID_HEADER, UNKNOWN_REQUEST_ID_VALUE) - if app_config.kafka_enabled: - start_consumer(flask_app) + init_tasks(app_config, flask_app) return flask_app diff --git a/tasks/__init__.py b/tasks/__init__.py index ccc17e176..f61df03f2 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -6,27 +6,38 @@ from api import metrics from app import db from app.logging import threadctx, get_logger -from app.config import Config from app.models import Host, SystemProfileSchema logger = get_logger(__name__) -cfg = Config() - class NullProducer: def send(self, topic, value=None): - logger.debug("NullProducer - logging message: topic (%s) - message: %s " % + logger.debug("NullProducer - logging message: topic (%s) - message: %s" % (topic, value)) -if cfg.kafka_enabled: - logger.info("Starting KafkaProducer()") - producer = KafkaProducer(bootstrap_servers=cfg.bootstrap_servers) -else: - logger.info("Starting NullProducer()") - producer = NullProducer() +producer = None +cfg = None + + +def init_tasks(config, flask_app): + global cfg + cfg = config + + _init_event_producer(config) + _init_system_profile_consumer(config, flask_app) + + +def _init_event_producer(config): + global producer + if config.kafka_enabled: + logger.info("Starting KafkaProducer()") + producer = KafkaProducer(bootstrap_servers=config.bootstrap_servers) + else: + logger.info("Starting NullProducer()") + producer = NullProducer() def emit_event(e): @@ -50,15 +61,19 @@ def msg_handler(parsed): db.session.commit() -def start_consumer(flask_app, handler=msg_handler, consumer=None): +def _init_system_profile_consumer(config, flask_app, handler=msg_handler, consumer=None): + + if not config.kafka_enabled: + logger.info("System profile consumer has been disabled") + return logger.info("Starting system profile queue consumer.") if consumer is None: consumer = KafkaConsumer( - cfg.system_profile_topic, - group_id=cfg.consumer_group, - bootstrap_servers=cfg.bootstrap_servers) + config.system_profile_topic, + group_id=config.consumer_group, + bootstrap_servers=config.bootstrap_servers) def _f(): with flask_app.app_context(): From 100ac9cc5387150cdbc43bed67610209f6951b5f Mon Sep 17 00:00:00 2001 From: Derek Horton Date: Mon, 20 May 2019 15:39:35 -0500 Subject: [PATCH 18/19] Change the way a global variable is handled --- tasks/__init__.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tasks/__init__.py b/tasks/__init__.py index f61df03f2..7dd274530 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -24,20 +24,21 @@ def send(self, topic, value=None): def init_tasks(config, flask_app): global cfg + global producer + cfg = config - _init_event_producer(config) + producer = _init_event_producer(config) _init_system_profile_consumer(config, flask_app) def _init_event_producer(config): - global producer if config.kafka_enabled: logger.info("Starting KafkaProducer()") - producer = KafkaProducer(bootstrap_servers=config.bootstrap_servers) + return KafkaProducer(bootstrap_servers=config.bootstrap_servers) else: logger.info("Starting NullProducer()") - producer = NullProducer() + return NullProducer() def emit_event(e): From eee9b0a275cdc149ef29d1d62e8d293e58557316 Mon Sep 17 00:00:00 2001 From: Derek Horton Date: Tue, 21 May 2019 11:57:58 -0500 Subject: [PATCH 19/19] Added a test for deleting with an invalid host id (uuid) --- test_api.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test_api.py b/test_api.py index cc9c50d38..53f09e2b3 100755 --- a/test_api.py +++ b/test_api.py @@ -1340,6 +1340,11 @@ def test_delete_non_existent_host(self): self.delete(url, 404) + def test_delete_with_invalid_host_id(self): + url = HOST_URL + "/" + "notauuid" + + self.delete(url, 400) + class QueryTestCase(PreCreatedHostsBaseTestCase): def test_query_all(self):