diff --git a/neon_api_proxy/api_connector.py b/neon_api_proxy/api_connector.py index 5259eca..1ad11e3 100644 --- a/neon_api_proxy/api_connector.py +++ b/neon_api_proxy/api_connector.py @@ -58,16 +58,24 @@ def handle_api_input(self, try: if body and isinstance(body, bytes): request = b64_to_dict(body) - message_id = request.get("message_id") + tokens = self.extract_agent_tokens(request) + + message_id = tokens.pop('message_id', request.get("message_id", None)) + LOG.info(f"request={request}; message_id={message_id}") + respond = self.proxy.resolve_query(request) - LOG.debug(f"message={message_id} status={respond.get('status_code')}") + LOG.info(f"message={message_id} status={respond.get('status_code')}") + + respond['content'] = bytes(respond.get('content', '')).decode(encoding='utf-8') + respond = {**respond, **tokens} + LOG.debug(f"respond={respond}") data = dict_to_b64(respond) + routing_key = request.get('routing_key', 'neon_api_output') # queue declare is idempotent, just making sure queue exists - channel.queue_declare(queue='neon_api_output') - + channel.queue_declare(queue=routing_key) channel.basic_publish(exchange='', - routing_key=request.get('routing_key', 'neon_api_output'), + routing_key=routing_key, body=data, properties=pika.BasicProperties(expiration='1000') ) @@ -78,12 +86,33 @@ def handle_api_input(self, LOG.error(f"message_id={message_id}") LOG.error(e) + @staticmethod + def extract_agent_tokens(msg_data: dict) -> dict: + """ + Extracts tokens from msg data based on received "agent" + + :param msg_data: desired message data + :return: dictionary containing tokens dedicated to resolved agent + """ + tokens = dict() + request_agent = msg_data.pop('agent', 'undefined') + if 'klatchat' in request_agent: + LOG.info('Resolved agent is "klatchat"') + tokens['cid'] = msg_data.pop("cid", None) + tokens['message_id'] = tokens['replied_message'] = msg_data.get('messageID', None) + else: + LOG.warning('Failed to resolve an agent from the message data') + return tokens + def handle_error(self, thread, exception): LOG.error(exception) LOG.info(f"Restarting Consumers") - self.stop_consumers() + self.stop() self.run() - def run(self): + def pre_run(self, **kwargs): self.register_consumer("neon_api_consumer", self.vhost, 'neon_api_input', self.handle_api_input, auto_ack=False) - self.run_consumers() + self.register_consumer("neon_api_consumer_targeted", + self.vhost, + f'neon_api_input_{self.service_id}', + self.handle_api_input, auto_ack=False) diff --git a/version.py b/version.py index c49dd20..624f5f9 100644 --- a/version.py +++ b/version.py @@ -17,4 +17,4 @@ # US Patents 2008-2021: US7424516, US20140161250, US20140177813, US8638908, US8068604, US8553852, US10530923, US10530924 # China Patent: CN102017585 - Europe Patent: EU2156652 - Patents Pending -__version__ = "0.1.4" +__version__ = "0.1.6"