From 5ce520335a4ef859494bfe9212a2afb5a9eba0f0 Mon Sep 17 00:00:00 2001 From: david Date: Mon, 25 Mar 2024 15:17:54 +0100 Subject: [PATCH] meeting pipeline test requirements --- logprep/connector/http/input.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index ea01bcf62..ca4238a6b 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -265,6 +265,8 @@ def shut_down(self): class HttpConnector(Input): """Connector to accept log messages as http post requests""" + messages: queue.Queue = queue.Queue() + _endpoint_registry: Mapping[str, HttpEndpoint] = { "json": JSONHttpEndpoint, "plaintext": PlaintextHttpEndpoint, @@ -328,11 +330,6 @@ def __init__(self, name: str, configuration: "HttpConnector.Config", logger: Log self.host = self._config.uvicorn_config["host"] self.target = "http://" + self.host + ":" + str(self.port) - if not hasattr(self, "messages"): - self.messages = None - if not hasattr(self, "http_server"): - self.http_server = None - def setup(self): super().setup() if not hasattr(self, "pipeline_index"): @@ -343,14 +340,16 @@ def setup(self): if self.pipeline_index != 1: return - self.messages = queue.Queue(self._config.message_backlog_size) + self.messages = queue.Queue( + self._config.message_backlog_size + ) # pylint: disable=attribute-defined-outside-init endpoints_config = {} for endpoint_path, endpoint_name in self._config.endpoints.items(): endpoint_class = self._endpoint_registry.get(endpoint_name) endpoints_config[endpoint_path] = endpoint_class(self.messages) - self.http_server = ThreadingHTTPServer( + self.http_server = ThreadingHTTPServer( # pylint: disable=attribute-defined-outside-init connector_config=self._config, endpoints_config=endpoints_config, log_level=self._logger.level, @@ -375,4 +374,7 @@ def get_server_instance(self): def shut_down(self): """Raises Uvicorn HTTP Server internal stop flag and waits to join""" - self.http_server.shut_down() + try: + self.http_server.shut_down() + except: + pass