From 955001340660e5dc8bf11532098fc7d80220c9a2 Mon Sep 17 00:00:00 2001 From: Emily Zheng Date: Tue, 23 Apr 2024 14:53:08 +0800 Subject: [PATCH] Retry commits --- .coveragerc | 6 + mypy.ini | 1 + requirements.txt | 1 + src/pubtools/sign/clients/msg_recv_client.py | 108 ++++-- src/pubtools/sign/clients/msg_send_client.py | 19 +- src/pubtools/sign/conf/conf.py | 1 + src/pubtools/sign/operations/base.py | 1 - src/pubtools/sign/operations/containersign.py | 9 - src/pubtools/sign/signers/cosignsigner.py | 9 +- src/pubtools/sign/signers/msgsigner.py | 320 ++++++++++++------ src/pubtools/sign/utils.py | 43 ++- tests/conftest.py | 44 ++- tests/conftest_msgsig.py | 34 +- tests/test_bundle.py | 4 - tests/test_config.py | 5 +- tests/test_cosign_signer.py | 6 - tests/test_msg_recv_client.py | 83 ++++- tests/test_msg_signer.py | 249 ++++++++++++-- tests/test_sign_operations.py | 8 - tests/test_utils.py | 11 +- 20 files changed, 759 insertions(+), 203 deletions(-) diff --git a/.coveragerc b/.coveragerc index f031487..ae6169a 100644 --- a/.coveragerc +++ b/.coveragerc @@ -4,3 +4,9 @@ fail_under = 100 exclude_lines = pragma: no cover if __name__ == .__main__.: + +[run] +dynamic_context = test_function + +[html] +show_contexts = True diff --git a/mypy.ini b/mypy.ini index 2dbf2be..fc239e4 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,4 +1,5 @@ [mypy] ignore_missing_imports = True disallow_subclassing_any = False +disallow_untyped_decorators = False strict = True diff --git a/requirements.txt b/requirements.txt index 77dfa0c..5d6850f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ requests typing_extension +mock diff --git a/src/pubtools/sign/clients/msg_recv_client.py b/src/pubtools/sign/clients/msg_recv_client.py index 4b428cc..c027d61 100644 --- a/src/pubtools/sign/clients/msg_recv_client.py +++ b/src/pubtools/sign/clients/msg_recv_client.py @@ -1,3 +1,4 @@ +import datetime import json import logging import threading @@ -18,6 +19,7 @@ class _RecvClient(_MsgClient): def __init__( self, + uid: str, topic: str, message_ids: List[str], id_key: str, @@ -42,6 +44,10 @@ def __init__( self.confirmed = 0 self.recv = recv self.timeout = timeout + self.recv_in_time = False + self.uid = uid + self.last_message_received = datetime.datetime.now() + LOG.info("Expected to receive %s messages", len(message_ids)) def on_start(self, event: proton.Event) -> None: LOG.debug("RECEIVER: On start %s %s %s", event, self.topic, self.broker_urls) @@ -49,7 +55,7 @@ def on_start(self, event: proton.Event) -> None: urls=self.broker_urls, ssl_domain=self.ssl_domain, sasl_enabled=False ) self.receiver = event.container.create_receiver(self.conn, self.topic) - self.timer_task = event.container.schedule(self.timeout, self) + self.timer_task = event.container.schedule(self.timeout / 2, self) def on_message(self, event: proton.Event) -> None: LOG.debug("RECEIVER: On message (%s)", event) @@ -60,6 +66,8 @@ def on_message(self, event: proton.Event) -> None: if msg_id in self.recv_ids: self.recv_ids[msg_id] = True self.recv[msg_id] = (outer_message, headers) + self.recv_in_time = True + self.last_message_received = datetime.datetime.now() self.accept(event.delivery) else: LOG.debug(f"RECEIVER: Ignored message {msg_id}") @@ -68,9 +76,32 @@ def on_message(self, event: proton.Event) -> None: self.timer_task.cancel() event.receiver.close() event.connection.close() + LOG.info("[%d][%s] All messages received", threading.get_ident(), self.uid) def on_timer_task(self, event: proton.Event) -> None: - LOG.debug("RECEIVER: On timeout (%s)", event) + if self.recv_in_time: + LOG.info( + "[%d][%s] RECEIVER: On timeout but messages was received " + "- continue, received: %d/%d", + threading.get_ident(), + self.uid, + len([x for x in self.recv_ids.values() if x]), + len(self.recv_ids), + ) + self.recv_in_time = False + self.timer_task = event.reactor.schedule(self.timeout / 2, self) + return + if (datetime.datetime.now() - self.last_message_received).total_seconds() < self.timeout: + self.timer_task = event.reactor.schedule(self.timeout / 2, self) + return + LOG.info( + "[%d][%s] RECEIVER: On timeout (%s) messages: %d/%d", + threading.get_ident(), + event, + self.uid, + len([x for x in self.recv_ids.values() if x]), + len(self.recv_ids), + ) self.timer_task.cancel() if event.connection: event.connection.close() # pragma: no cover @@ -78,13 +109,19 @@ def on_timer_task(self, event: proton.Event) -> None: event.receiver.close() # pragma: no cover event.container.stop() - self.errors.append( - MsgError( - source=event, - name="MessagingTimeout", - description="Out of time when receiving messages", + if not all(self.recv_ids.values()): + self.errors.append( + MsgError( + source=event, + name="MessagingTimeout", + description="[%d] Out of time when receiving messages (%d/%d)" + % ( + threading.get_ident(), + len([x for x in self.recv_ids.values() if x]), + len(self.recv_ids), + ), + ) ) - ) def close(self) -> None: if hasattr(self, "timer_task"): @@ -100,6 +137,7 @@ class RecvClient(Container): def __init__( self, + uid: str, topic: str, message_ids: List[str], id_key: str, @@ -109,6 +147,7 @@ def __init__( timeout: int, retries: int, errors: List[MsgError], + received: Dict[Any, Any], ) -> None: """Recv Client Initializer. @@ -130,10 +169,12 @@ def __init__( :type retries: int :param errors: List of errors which occured during the process :type errors: List[MsgError] + :param received: Mapping of received messages + :type errors: Dict[int, Any] """ self.message_ids = message_ids - self.recv: Dict[Any, Any] = {} - self._errors = errors + self.recv: Dict[Any, Any] = received + self._errors: List[MsgError] = errors self.topic = topic self.message_ids = message_ids self.id_key = id_key @@ -141,7 +182,10 @@ def __init__( self.cert = cert self.ca_cert = ca_cert self.timeout = timeout + self.uid = uid + self._retries = retries handler = _RecvClient( + uid=uid, topic=topic, message_ids=message_ids, id_key=id_key, @@ -152,8 +196,22 @@ def __init__( recv=self.recv, errors=self._errors, ) - self._retries = retries super().__init__(handler) + self._handler = handler + + def get_errors(self) -> List[MsgError]: + """Get errors from receiver. + + This method doesn't have any meaningfull usecase, it's only used for testing + """ + return self._errors # pragma: no cover + + def get_received(self) -> Dict[Any, Any]: + """Get received messages. + + This method doesn't have any meaningfull usecase, it's only used for testing + """ + return self.recv # pragma: no cover def run(self) -> Union[Dict[Any, Any], List[MsgError]]: """Run the receiver.""" @@ -161,28 +219,16 @@ def run(self) -> Union[Dict[Any, Any], List[MsgError]]: if not len(self.message_ids): LOG.warning("No messages to receive") return [] - - for x in range(self._retries): - super().run() - if len(self._errors) == errors_len: - break - errors_len = len(self._errors) - recv = _RecvClient( - topic=self.topic, - message_ids=self.message_ids, - id_key=self.id_key, - broker_urls=self.broker_urls, - cert=self.cert, - ca_cert=self.ca_cert, - timeout=self.timeout, - recv=self.recv, - errors=self._errors, - ) - super().__init__(recv) - else: + super().run() + if self._errors: return self._errors return self.recv + def close(self) -> None: + """Close receiver.""" + if self._handler: + self._handler.close() + class RecvThread(threading.Thread): """Receiver wrapper allows to stop receiver on demand.""" @@ -198,7 +244,7 @@ def __init__(self, recv: RecvClient): def stop(self) -> None: """Stop receiver.""" - self.recv.handler.handlers[0].close() + self.recv.close() def run(self) -> None: """Run receiver.""" diff --git a/src/pubtools/sign/clients/msg_send_client.py b/src/pubtools/sign/clients/msg_send_client.py index 136a9cb..87b0951 100644 --- a/src/pubtools/sign/clients/msg_send_client.py +++ b/src/pubtools/sign/clients/msg_send_client.py @@ -1,6 +1,6 @@ import json import logging -from typing import List +from typing import List, Dict, Any from ..models.msg import MsgMessage, MsgError @@ -21,8 +21,9 @@ def __init__( cert: str, ca_cert: str, errors: List[MsgError], - ): - super().__init__(errors=errors) + **kwargs: Dict[str, Any], + ) -> None: + super().__init__(errors=errors, **kwargs) self.broker_urls = broker_urls self.messages = messages self.ssl_domain = proton.SSLDomain(proton.SSLDomain.MODE_CLIENT) @@ -42,8 +43,7 @@ def on_start(self, event: proton.Event) -> None: self.sender = event.container.create_sender(conn) def on_sendable(self, event: proton.Event) -> None: - LOG.debug("Sender on_sendable") - if self.sent < self.total: + if event.sender.credit and self.sent < self.total: message = self.messages[self.sent] LOG.debug("Sending message: %s %s %s", message.body, message.address, message.headers) event.sender.send( @@ -56,10 +56,10 @@ def on_sendable(self, event: proton.Event) -> None: self.sent += 1 def on_accepted(self, event: proton.Event) -> None: - LOG.debug("Sender accepted") + # LOG.info("Sender accepted") self.confirmed += 1 if self.confirmed == self.total: - LOG.debug("Sender closing") + LOG.info("Sender closing") event.connection.close() def on_disconnected(self, event: proton.Event) -> None: # pragma: no cover @@ -77,7 +77,8 @@ def __init__( ca_cert: str, retries: int, errors: List[MsgError], - ): + **kwargs: Dict[str, Any], + ) -> None: """Send Client Initializer. :param messages: List of messages to send. @@ -97,7 +98,7 @@ def __init__( ) self._retries = retries self._errors = errors - super().__init__(self.handler) + super().__init__(self.handler, **kwargs) def run(self) -> List[MsgError]: """Run the SendClient.""" diff --git a/src/pubtools/sign/conf/conf.py b/src/pubtools/sign/conf/conf.py index e5e4523..043b7a8 100644 --- a/src/pubtools/sign/conf/conf.py +++ b/src/pubtools/sign/conf/conf.py @@ -18,6 +18,7 @@ class MsgSignerSchema(ma.Schema): service = ma.fields.String(required=True) timeout = ma.fields.Integer(required=True) retries = ma.fields.Integer(required=True) + send_retries = ma.fields.Integer(required=True) message_id_key = ma.fields.String(required=True) log_level = ma.fields.String(default="INFO") key_aliases = ma.fields.Dict(required=False, keys=ma.fields.String(), values=ma.fields.String()) diff --git a/src/pubtools/sign/operations/base.py b/src/pubtools/sign/operations/base.py index 80cf08e..db28c86 100644 --- a/src/pubtools/sign/operations/base.py +++ b/src/pubtools/sign/operations/base.py @@ -15,7 +15,6 @@ class SignOperation(ABC): ResultType: ClassVar[OperationResult] signing_key: str - repo: str @classmethod def doc_arguments(cls: Type[Self]) -> Dict[str, Any]: diff --git a/src/pubtools/sign/operations/containersign.py b/src/pubtools/sign/operations/containersign.py index 589d615..8a24ba9 100644 --- a/src/pubtools/sign/operations/containersign.py +++ b/src/pubtools/sign/operations/containersign.py @@ -22,14 +22,6 @@ class ContainerSignOperation(SignOperation): task_id: str = field( metadata={"description": "Usually pub task id, serves as identifier for in signing request"} ) - repo: str = field( - metadata={ - "type": "str", - "description": "Repository name", - "required": "true", - "sample": "repo", - } - ) def to_dict(self) -> dict[str, Any]: """Return a dict representation of the object.""" @@ -38,5 +30,4 @@ def to_dict(self) -> dict[str, Any]: references=self.references, signing_key=self.signing_key, task_id=self.task_id, - repo=self.repo, ) diff --git a/src/pubtools/sign/signers/cosignsigner.py b/src/pubtools/sign/signers/cosignsigner.py index c76bc8b..d79fb3f 100644 --- a/src/pubtools/sign/signers/cosignsigner.py +++ b/src/pubtools/sign/signers/cosignsigner.py @@ -265,6 +265,13 @@ def container_sign(self: CosignSigner, operation: ContainerSignOperation) -> Sig processes[f"{ref_digest}"] = run_command(common_args + [ref_digest], env=env_vars) for ref, process in processes.items(): stdout, stderr = process.communicate() + for i in range(self.retries): + if process.returncode != 0: + LOG.error(stderr) + LOG.error("Retry: %s" % i) + stdout, stderr = process.communicate() + else: + break outputs[ref] = (stdout, stderr, process.returncode) for ref, (stdout, stderr, returncode) in outputs.items(): @@ -272,6 +279,7 @@ def container_sign(self: CosignSigner, operation: ContainerSignOperation) -> Sig operation_result.results.append(stderr) operation_result.failed = True signing_results.signer_results.status = "failed" + signing_results.signer_results.error_message += stderr else: operation_result.results.append(stderr) signing_results.operation_result = operation_result @@ -342,7 +350,6 @@ def cosign_container_sign( references=reference, signing_key=signing_key, task_id="", - repo="", ) signing_result = cosign_signer.sign(operation) return { diff --git a/src/pubtools/sign/signers/msgsigner.py b/src/pubtools/sign/signers/msgsigner.py index 8f23b53..c931a1e 100644 --- a/src/pubtools/sign/signers/msgsigner.py +++ b/src/pubtools/sign/signers/msgsigner.py @@ -10,6 +10,7 @@ import uuid import os import sys +import threading from OpenSSL import crypto import click @@ -25,7 +26,14 @@ from ..clients.msg_recv_client import RecvClient, RecvThread from ..models.msg import MsgMessage, MsgError from ..conf.conf import load_config, CONFIG_PATHS -from ..utils import set_log_level, sanitize_log_level, isodate_now, _get_config_file +from ..utils import ( + set_log_level, + sanitize_log_level, + isodate_now, + _get_config_file, + run_in_parallel, + FData, +) LOG = logging.getLogger("pubtools.sign.signers.msgsigner") @@ -120,12 +128,17 @@ class MsgSigner(Signer): timeout: int = field( init=False, default=60, - metadata={"description": "Timeout for messaging sent/receive", "sample": 1}, + metadata={"description": "Timeout for messaging receive", "sample": 1}, ) retries: int = field( init=False, - default=60, - metadata={"description": "Retries for messaging sent/receive", "sample": 3}, + default=3, + metadata={"description": "Retries for messaging receive", "sample": 3}, + ) + send_retries: int = field( + init=False, + default=2, + metadata={"description": "Retries for messaging send+receive", "sample": 2}, ) message_id_key: str = field( init=False, @@ -190,6 +203,7 @@ def _construct_headers( def _create_msg_message( self: MsgSigner, data: str, + repo: str, operation: SignOperation, sig_type: SignRequestType, extra_attrs: Optional[Dict[str, Any]] = None, @@ -203,7 +217,7 @@ def _create_msg_message( body=self._construct_signing_message( data, signing_key, - repo=operation.repo, + repo, extra_attrs=extra_attrs, sig_type=sig_type.value, ), @@ -227,6 +241,7 @@ def load_config(self: MsgSigner, config_data: Dict[str, Any]) -> None: self.service = config_data["msg_signer"]["service"] self.message_id_key = config_data["msg_signer"]["message_id_key"] self.retries = config_data["msg_signer"]["retries"] + self.send_retries = config_data["msg_signer"]["send_retries"] self.log_level = config_data["msg_signer"]["log_level"] self.timeout = config_data["msg_signer"]["timeout"] self.creator = self._get_cert_subject_cn() @@ -271,6 +286,7 @@ def clear_sign(self: MsgSigner, operation: ClearSignOperation) -> SigningResults for in_data in operation.inputs: message = self._create_msg_message( base64.b64encode(in_data.encode("latin1")).decode("latin-1"), + operation.repo, operation, SignRequestType.CLEARSIGN, extra_attrs={"pub_task_id": operation.task_id}, @@ -278,6 +294,8 @@ def clear_sign(self: MsgSigner, operation: ClearSignOperation) -> SigningResults message_to_data[message.body["request_id"]] = message messages.append(message) + all_messages = [x for x in messages] + signing_key = operation.signing_key if signing_key in self.key_aliases: signing_key = self.key_aliases[signing_key] @@ -293,57 +311,102 @@ def clear_sign(self: MsgSigner, operation: ClearSignOperation) -> SigningResults signer_results=signer_results, operation_result=operation_result, ) - LOG.debug(f"{len(messages)} messages to send") - errors: List[MsgError] = [] - message_ids = [message.body["request_id"] for message in messages] - - recvc = RecvClient( - message_ids=message_ids, - topic=self.topic_listen_to.format( - **dict(list(asdict(self).items()) + list(asdict(operation).items())) - ), - id_key=self.message_id_key, - broker_urls=self.messaging_brokers, - cert=self.messaging_cert_key, - ca_cert=self.messaging_ca_cert, - timeout=self.timeout, - retries=self.retries, - errors=errors, - ) - recvt = RecvThread(recvc) - recvt.start() - - errors = SendClient( - messages=messages, - broker_urls=self.messaging_brokers, - cert=self.messaging_cert_key, - ca_cert=self.messaging_ca_cert, - retries=self.retries, - errors=errors, - ).run() - - if errors: - signer_results.status = "error" - for error in errors: - signer_results.error_message += f"{error.name} : {error.description}\n" - return signing_results - - recvt.join() + received: Dict[int, Any] = {} + LOG.info("errors " + str(errors)) + + for i in range(self.send_retries): + message_ids = [message.body["request_id"] for message in messages] + LOG.debug(f"{len(messages)} messages to send") + recvc = RecvClient( + uid=str(i), + message_ids=message_ids, + topic=self.topic_listen_to.format( + **dict(list(asdict(self).items()) + list(asdict(operation).items())) + ), + id_key=self.message_id_key, + broker_urls=self.messaging_brokers, + cert=self.messaging_cert_key, + ca_cert=self.messaging_ca_cert, + timeout=self.timeout, + retries=self.retries, + errors=errors, + received=received, + ) + recvt = RecvThread(recvc) + recvt.start() + + errors = SendClient( + messages=messages, + broker_urls=self.messaging_brokers, + cert=self.messaging_cert_key, + ca_cert=self.messaging_ca_cert, + retries=self.retries, + errors=errors, + ).run() + # check sender errors + if errors: + signer_results.status = "error" + for error in errors: + signer_results.error_message += f"{error.name} : {error.description}\n" + return signing_results + + # wait for receiver to finish + recvt.join() + recvt.stop() + LOG.info("XXXX") + + # check receiver errors + for x in range(self.retries - 1): + errors = recvc._errors + LOG.error(errors) + if errors and errors[0].name == "MessagingTimeout": + LOG.info("RETRYING %s", x) + _messages = [] + for message in messages: + if message.body["request_id"] not in received: + _messages.append(message) + if x != self.retries - 1: + errors.pop(0) + messages = _messages + message_ids = [message.body["request_id"] for message in messages] + + LOG.info("Retrying recv") + recvc = RecvClient( + uid=str(i) + "-" + str(x), + message_ids=message_ids, + topic=self.topic_listen_to.format( + **dict(list(asdict(self).items()) + list(asdict(operation).items())) + ), + id_key=self.message_id_key, + broker_urls=self.messaging_brokers, + cert=self.messaging_cert_key, + ca_cert=self.messaging_ca_cert, + timeout=self.timeout, + retries=self.retries, + errors=errors, + received=received, + ) + recvt = RecvThread(recvc) + recvt.start() + recvt.join() + elif not errors: + break errors = recvc._errors if errors: - signer_results.status = "error" - for error in errors: - signer_results.error_message += f"{error.name} : {error.description}\n" - return signing_results + if errors: + signer_results.status = "error" + for error in errors: + signer_results.error_message += f"{error.name} : {error.description}\n" + return signing_results operation_result = ClearSignResult( - signing_key=operation.signing_key, outputs=[""] * len(messages) + signing_key=operation.signing_key, outputs=[""] * len(all_messages) ) - for recv_id, received in recvc.recv.items(): - operation_result.outputs[messages.index(message_to_data[recv_id])] = received + for recv_id, _received in recvc.recv.items(): + operation_result.outputs[all_messages.index(message_to_data[recv_id])] = _received signing_results.operation_result = operation_result return signing_results @@ -383,16 +446,34 @@ def container_sign(self: MsgSigner, operation: ContainerSignOperation) -> Signin signing_key = self.key_aliases[signing_key] LOG.info(f"Using signing key alias {signing_key} for {operation.signing_key}") + LOG.info(f"Container sign operation for {len(operation.digests)}") + + fargs = [] for digest, reference in zip(operation.digests, operation.references): - message = self._create_msg_message( - self.create_manifest_claim_message(signing_key, digest=digest, reference=reference), - operation, - SignRequestType.CONTAINER, - extra_attrs={"pub_task_id": operation.task_id, "manifest_digest": digest}, + repo = reference.split(":")[0].split("/", 1)[1].split(":")[0] + fargs.append( + FData( + args=[ + self.create_manifest_claim_message( + signing_key, digest=digest, reference=reference + ), + repo, + operation, + SignRequestType.CONTAINER, + ], + kwargs={ + "extra_attrs": {"pub_task_id": operation.task_id, "manifest_digest": digest} + }, + ) ) + ret = run_in_parallel(self._create_msg_message, fargs) + for n, message in ret.items(): message_to_data[message.body["request_id"]] = message messages.append(message) + all_messages = [x for x in messages] + LOG.info(f"Signing {len(all_messages)}") + signer_results = MsgSignerResults(status="ok", error_message="") operation_result = ContainerSignResult( signing_key=operation.signing_key, results=[""] * len(operation.digests), failed=False @@ -406,43 +487,95 @@ def container_sign(self: MsgSigner, operation: ContainerSignOperation) -> Signin LOG.debug(f"{len(messages)} messages to send") errors: List[MsgError] = [] - - message_ids = [message.body["request_id"] for message in messages] - recvc = RecvClient( - message_ids=message_ids, - topic=self.topic_listen_to.format( - **dict(list(asdict(self).items()) + list(asdict(operation).items())) - ), - id_key=self.message_id_key, - broker_urls=self.messaging_brokers, - cert=self.messaging_cert_key, - ca_cert=self.messaging_ca_cert, - timeout=self.timeout, - retries=self.retries, - errors=errors, + received: Dict[int, Any] = {} + LOG.info( + "Starting signing process. Retries %d,%d, timeout: %d", + self.send_retries, + self.retries, + self.timeout, ) - recvt = RecvThread(recvc) - recvt.start() - - errors = SendClient( - messages=messages, - broker_urls=self.messaging_brokers, - cert=self.messaging_cert_key, - ca_cert=self.messaging_ca_cert, - retries=self.retries, - errors=errors, - ).run() - - if errors: - signer_results.status = "error" - for error in errors: - signer_results.error_message += f"{error.name} : {error.description}\n" - return signing_results - # wait for receiver to finish - recvt.join() + for i in range(self.send_retries): + message_ids = [message.body["request_id"] for message in messages] + recvc = RecvClient( + uid=str(i), + message_ids=message_ids, + topic=self.topic_listen_to.format( + **dict(list(asdict(self).items()) + list(asdict(operation).items())) + ), + id_key=self.message_id_key, + broker_urls=self.messaging_brokers, + cert=self.messaging_cert_key, + ca_cert=self.messaging_ca_cert, + timeout=self.timeout, + retries=self.retries, + errors=errors, + received=received, + ) + recvt = RecvThread(recvc) + recvt.start() + + errors = SendClient( + messages=messages, + broker_urls=self.messaging_brokers, + cert=self.messaging_cert_key, + ca_cert=self.messaging_ca_cert, + retries=self.retries, + errors=errors, + ).run() + + # check sender errors + if errors: + signer_results.status = "error" + for error in errors: + signer_results.error_message += f"{error.name} : {error.description}\n" + return signing_results + + # wait for receiver to finish + recvt.join() + recvt.stop() + received = recvc.get_received() + + for x in range(self.retries): + errors = recvc.get_errors() + LOG.error(errors) + if errors and errors[0].name == "MessagingTimeout": + LOG.info("Retrying receiving %s/%s", x, self.retries) + _messages = [] + for message in messages: + if message.body["request_id"] not in received: + _messages.append(message) + if x != self.retries - 1: + errors.pop(0) + messages = _messages + message_ids = [message.body["request_id"] for message in messages] + + recvc = RecvClient( + uid=str(i) + "-" + str(x), + message_ids=message_ids, + topic=self.topic_listen_to.format( + **dict(list(asdict(self).items()) + list(asdict(operation).items())) + ), + id_key=self.message_id_key, + broker_urls=self.messaging_brokers, + cert=self.messaging_cert_key, + ca_cert=self.messaging_ca_cert, + timeout=self.timeout, + retries=self.retries, + errors=errors, + received=received, + ) + recvt = RecvThread(recvc) + recvt.start() + recvt.join() + elif not errors: + break + + # check receiver errors + errors = recvc.get_errors() + if not errors: + break - errors = recvc._errors if errors: signer_results.status = "error" for error in errors: @@ -450,11 +583,11 @@ def container_sign(self: MsgSigner, operation: ContainerSignOperation) -> Signin return signing_results operation_result = ContainerSignResult( - signing_key=operation.signing_key, results=[""] * len(messages), failed=False + signing_key=operation.signing_key, results=[""] * len(all_messages), failed=False ) - for recv_id, received in recvc.recv.items(): - operation_result.failed = True if received[0]["msg"]["errors"] else False - operation_result.results[messages.index(message_to_data[recv_id])] = received + for recv_id, _received in recvc.recv.items(): + operation_result.failed = True if _received[0]["msg"]["errors"] else False + operation_result.results[all_messages.index(message_to_data[recv_id])] = _received signing_results.operation_result = operation_result return signing_results @@ -495,7 +628,6 @@ def msg_container_sign( config_file: str = "", digest: list[str] = [], reference: list[str] = [], - repo: str = "", ) -> Dict[str, Any]: """Run containersign operation with cli arguments.""" msg_signer = MsgSigner() @@ -507,7 +639,6 @@ def msg_container_sign( references=reference, signing_key=signing_key, task_id=task_id, - repo=repo, ) signing_result = msg_signer.sign(operation) return { @@ -605,7 +736,6 @@ def msg_clear_sign_main( default="INFO", help="Set log level", ) -@click.option("--repo", help="Repository reference") def msg_container_sign_main( signing_key: str = "", task_id: str = "", @@ -614,7 +744,6 @@ def msg_container_sign_main( reference: List[str] = [], raw: bool = False, log_level: str = "INFO", - repo: str = "", ) -> None: """Entry point method for containersign operation.""" ch = logging.StreamHandler() @@ -628,7 +757,6 @@ def msg_container_sign_main( config_file=config_file, digest=digest, reference=reference, - repo=repo, ) if not raw: click.echo(json.dumps(ret)) diff --git a/src/pubtools/sign/utils.py b/src/pubtools/sign/utils.py index 839e376..cc95d19 100644 --- a/src/pubtools/sign/utils.py +++ b/src/pubtools/sign/utils.py @@ -1,8 +1,11 @@ +from concurrent import futures +from concurrent.futures.thread import ThreadPoolExecutor +from dataclasses import dataclass, field import datetime import subprocess import os import logging -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Union, Callable, cast, Iterable from .conf.conf import CONFIG_PATHS @@ -60,3 +63,41 @@ def _get_config_file(config_candidate: str) -> str: "No configuration file found: %s" % list(set(CONFIG_PATHS + [config_candidate])) ) return config_candidate + + +@dataclass +class FData: + """Dataclass for holding data for a function execution. + + Args: + args (Iterable[Any]): Arguments for the function. + kwargs (Dict[str, Any]): Keyword arguments for the function. + """ + + args: Iterable[Any] + kwargs: Dict[str, Any] = field(default_factory=dict) + + +def run_in_parallel( + func: Callable[..., Any], data: List[FData], threads: int = 10 +) -> Dict[Any, Any]: + """Run method on data in parallel. + + Args: + func (function): Function to run on data + data (list): List of tuples which are used as arguments for the function + Returns: + dict: List of result in the same order as data. + """ + future_results = {} + results = {} + with ThreadPoolExecutor(max_workers=threads) as executor: + future_results = { + executor.submit(func, *data_entry.args, **data_entry.kwargs): n + for n, data_entry in enumerate(data) + } + for future in futures.as_completed(future_results): + if future.exception() is not None: + raise cast(BaseException, future.exception()) + results[future_results[future]] = future.result() + return dict(sorted(results.items(), key=lambda kv: kv[0])) diff --git a/tests/conftest.py b/tests/conftest.py index d6a18a9..f0c053c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,11 +7,14 @@ import tempfile import uuid import os +import sys + from unittest.mock import patch from .conftest_msgsig import ( # noqa: F401 f_msg_signer, # noqa: F401 f_config_msg_signer_ok, # noqa: F401 + f_config_msg_signer_ok2, # noqa: F401 f_config_msg_signer_aliases, # noqa: F401 ) # noqa: F401 from .conftest_cosignsig import ( # noqa: F401 @@ -33,6 +36,7 @@ LOG = logging.getLogger("pubtools.sign.signers.radas") +LOG.addHandler(logging.StreamHandler(sys.stdout)) class _Queue(object): @@ -87,7 +91,8 @@ def __init__(self, url): self.queues = {} def on_start(self, event): - LOG.debug("BROKER on start", self.url) + print("BROKER on start", self.url) + LOG.info("BROKER on start", self.url) self.acceptor = event.container.listen(self.url) def _queue(self, address): @@ -96,7 +101,19 @@ def _queue(self, address): return self.queues[address] def on_link_opening(self, event): - LOG.debug( + LOG.info( + "BROKER on_link_opening event", + event.link, + "source addr:", + event.link.source.address, + "remote source addr", + event.link.remote_source.address, + "target addr:", + event.link.target.address, + "remote target addr", + event.link.remote_target.address, + ) + print( "BROKER on_link_opening event", event.link, "source addr:", @@ -128,28 +145,28 @@ def _unsubscribe(self, link): del self.queues[link.source.address] def on_link_closing(self, event): - LOG.debug(">> BROKER On link closing", event) + LOG.info(">> BROKER On link closing", event) if event.link.is_sender: self._unsubscribe(event.link) def on_disconnected(self, event): - LOG.debug(">> BROKER On disconnected", event) + LOG.info(">> BROKER On disconnected", event) self.remove_stale_consumers(event.connection) def remove_stale_consumers(self, connection): link = connection.link_head(Endpoint.REMOTE_ACTIVE) - LOG.debug("BROKER removing stale consumer", link) + LOG.info("BROKER removing stale consumer", link) while link: if link.is_sender: self._unsubscribe(link) link = link.next(Endpoint.REMOTE_ACTIVE) def on_sendable(self, event): - LOG.debug("BROKER on_sendable", event.link.source.address) + LOG.info("BROKER on_sendable", event.link.source.address) self._queue(event.link.source.address).dispatch(event.link) def on_message(self, event): - LOG.debug("BROKER ON MESSAGE", event.message) + LOG.info("BROKER ON MESSAGE", event.message) address = event.link.target.address if address is None: address = event.message.address @@ -159,7 +176,7 @@ def on_message(self, event): class _BrokenBroker(_Broker): def on_sendable(self, event): - LOG.debug("BROKER on_sendable", event.link.source.address) + LOG.info("BROKER on_sendable", event.link.source.address) self._queue(event.link.source.address).dispatch(event.link) raise ValueError("Simulated broker error") event.on_link_error(event) @@ -237,6 +254,11 @@ def on_message(self, event): sender.send(reply) +def run_broker(broker, stdout): + sys.stdout = stdout + broker.run() + + @fixture(scope="session") def f_msgsigner_listen_to_topic(): return "topic://Topic.pubtools.sign" @@ -287,12 +309,12 @@ def f_find_available_port_for_broken(): @fixture(scope="session") def f_qpid_broker(f_find_available_port): - LOG.debug("starting broker", f"localhost:{f_find_available_port}") + LOG.info("starting broker", f"localhost:{f_find_available_port}") broker = Container(_Broker(f"localhost:{f_find_available_port}")) - p = Process(target=broker.run, args=()) + p = Process(target=run_broker, args=(broker, sys.stdout)) p.start() yield (broker, f_find_available_port) - LOG.debug("destroying qpid broker") + LOG.info("destroying qpid broker") p.terminate() diff --git a/tests/conftest_msgsig.py b/tests/conftest_msgsig.py index fa3bf4b..927887e 100644 --- a/tests/conftest_msgsig.py +++ b/tests/conftest_msgsig.py @@ -11,7 +11,7 @@ def f_msg_signer(f_config_msg_signer_ok): @fixture -def f_config_msg_signer_ok(f_client_certificate): +def f_config_msg_signer_ok(f_client_certificate, f_ca_certificate): with tempfile.NamedTemporaryFile() as tmpf: tmpf.write( f""" @@ -20,13 +20,42 @@ def f_config_msg_signer_ok(f_client_certificate): - amqps://broker-01:5671 - amqps://broker-02:5671 messaging_cert_key: {f_client_certificate} - messaging_ca_cert: ~/messaging/ca-cert.crt + messaging_ca_cert: {f_ca_certificate} topic_send_to: topic://Topic.sign topic_listen_to: queue://Consumer.{{creator}}.{{task_id}}.Topic.sign.{{task_id}} environment: prod service: pubtools-sign timeout: 1 retries: 3 + send_retries: 2 + message_id_key: request_id + log_level: debug + """.encode( + "utf-8" + ) + ) + tmpf.flush() + yield tmpf.name + + +@fixture +def f_config_msg_signer_ok2(f_client_certificate, f_ca_certificate, f_qpid_broker): + qpid_broker, port = f_qpid_broker + with tempfile.NamedTemporaryFile() as tmpf: + tmpf.write( + f""" +msg_signer: + messaging_brokers: + - localhost:{port} + messaging_cert_key: {f_client_certificate} + messaging_ca_cert: {f_ca_certificate} + topic_send_to: topic://Topic.sign + topic_listen_to: queue://Consumer.{{creator}}.{{task_id}}.Topic.sign.{{task_id}} + environment: prod + service: pubtools-sign + timeout: 2 + retries: 2 + send_retries: 2 message_id_key: request_id log_level: debug """.encode( @@ -54,6 +83,7 @@ def f_config_msg_signer_aliases(f_client_certificate): service: pubtools-sign timeout: 1 retries: 3 + send_retries: 2 message_id_key: request_id log_level: debug key_aliases: diff --git a/tests/test_bundle.py b/tests/test_bundle.py index e5e1ed5..04f5051 100644 --- a/tests/test_bundle.py +++ b/tests/test_bundle.py @@ -22,8 +22,6 @@ def test_bundle_msg_container_sign(f_msg_signer, f_config_msg_signer_ok): "some-reference", "--task-id", "1", - "--repo", - "repo", "--config-file", f_config_msg_signer_ok, ], @@ -46,8 +44,6 @@ def test_bundle_msg_clear_sign(f_msg_signer, f_config_msg_signer_ok): "test-signing-key", "--task-id", "1", - "--repo", - "repo", "--config-file", f_config_msg_signer_ok, "hello world", diff --git a/tests/test_config.py b/tests/test_config.py index 7093d69..c42b922 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -4,18 +4,19 @@ from pubtools.sign.conf.conf import load_config -def test_load_config_radas_ok(f_config_msg_signer_ok, f_client_certificate): +def test_load_config_radas_ok(f_config_msg_signer_ok, f_client_certificate, f_ca_certificate): assert load_config(f_config_msg_signer_ok) == { "msg_signer": { "messaging_brokers": ["amqps://broker-01:5671", "amqps://broker-02:5671"], "messaging_cert_key": f_client_certificate, - "messaging_ca_cert": "~/messaging/ca-cert.crt", + "messaging_ca_cert": f_ca_certificate, "topic_send_to": "topic://Topic.sign", "topic_listen_to": "queue://Consumer.{creator}.{task_id}.Topic.sign.{task_id}", "environment": "prod", "service": "pubtools-sign", "timeout": 1, "retries": 3, + "send_retries": 2, "message_id_key": "request_id", "log_level": "debug", } diff --git a/tests/test_cosign_signer.py b/tests/test_cosign_signer.py index 93109c2..2107f50 100644 --- a/tests/test_cosign_signer.py +++ b/tests/test_cosign_signer.py @@ -176,7 +176,6 @@ def test_sign(f_config_cosign_signer_ok): references=("some-reference",), signing_key="test-signing-key", task_id="", - repo="r", ) with patch( "pubtools.sign.signers.cosignsigner.CosignSigner.container_sign" @@ -193,7 +192,6 @@ def test_container_sign(f_config_cosign_signer_ok, f_environ, f_expected_cosign_ digests=["sha256:abcdefg"], references=["some-registry/namespace/repo:tag"], signing_key="test-signing-key", - repo="", ) with patch("subprocess.Popen") as patched_popen: @@ -232,7 +230,6 @@ def test_container_sign_alias(f_config_cosign_signer_aliases, f_environ): digests=["sha256:abcdefg"], references=["some-registry/namespace/repo:tag"], signing_key="beta", - repo="", ) with patch("subprocess.Popen") as patched_popen: @@ -287,7 +284,6 @@ def test_container_sign_error(f_config_cosign_signer_ok, f_environ, f_expected_c digests=["sha256:abcdefg"], references=["some-registry/namespace/repo:tag"], signing_key="test-signing-key", - repo="", ) with patch("subprocess.Popen") as patched_popen: @@ -328,7 +324,6 @@ def test_container_sign_digests_only( digests=["some-registry/namespace/repo@sha256:abcdefg"], references=[], signing_key="test-signing-key", - repo="", ) with patch("subprocess.Popen") as patched_popen: @@ -385,7 +380,6 @@ def test_container_sign_mismatch_refs(f_config_cosign_signer_ok): digests=["sha256:abcdefg"], references=["some-registry/namespace/repo:tag1", "some-registry/namespace/repo:tag2"], signing_key="test-signing-key", - repo="", ) with patch("subprocess.Popen") as patched_popen: diff --git a/tests/test_msg_recv_client.py b/tests/test_msg_recv_client.py index 0b2092a..1f6591b 100644 --- a/tests/test_msg_recv_client.py +++ b/tests/test_msg_recv_client.py @@ -17,7 +17,9 @@ def test_recv_client_zero_messages( ): qpid_broker, port = f_qpid_broker errors = [] + received = {} rc = RecvClient( + "uid-1", f_msgsigner_send_to_queue, [], [f"localhost:{port}"], @@ -27,6 +29,7 @@ def test_recv_client_zero_messages( 1.0, 1, errors, + received, ) rc.run() msgsigner, _, received_messages = f_fake_msgsigner @@ -49,7 +52,9 @@ def test_recv_client_recv_message( sender = SendClient([message], [f"localhost:{port}"], "", "", 10, []) errors = [] + received = {} receiver = RecvClient( + "uid-1", f_msgsigner_send_to_queue, ["1"], "request_id", @@ -59,6 +64,7 @@ def test_recv_client_recv_message( 60.0, 2, errors, + received, ) tsc = Thread(target=sender.run, args=()) @@ -93,8 +99,10 @@ def test_recv_client_timeout( body={"msg": {"message": "test_message", "request_id": "1"}}, ) errors = [] + received = {} sender = SendClient([message], [f"localhost:{port}"], "", "", 10, []) receiver = RecvClient( + "uid-1", f_msgsigner_send_to_queue + "_wrong", ["1"], "request_id", @@ -104,6 +112,7 @@ def test_recv_client_timeout( 10.0, 1, errors, + received, ) tsc = Thread(target=sender.run, args=()) @@ -126,7 +135,9 @@ def test_recv_client_transport_error( ): qpid_broker, port = f_qpid_broker errors = [] + received = {} receiver = RecvClient( + "uid-1", f_msgsigner_send_to_queue, ["1"], "request_id", @@ -136,6 +147,7 @@ def test_recv_client_transport_error( 10.0, 1, errors, + received, ) trc = Thread(target=receiver.run, args=()) @@ -161,7 +173,9 @@ def test_recv_client_link_error( ) sender = SendClient([message], [f"localhostx:{port}"], "", "", 10, []) errors = [] + received = {} receiver = RecvClient( + "uid-1", f_msgsigner_send_to_queue, ["1"], "request_id", @@ -171,6 +185,7 @@ def test_recv_client_link_error( 10.0, 1, errors, + received, ) tsc = Thread(target=sender.run, args=()) @@ -195,9 +210,11 @@ def test_recv_client_errors( address=f_msgsigner_listen_to_topic, body={"msg": {"message": "test_message", "request_id": "1"}}, ) - sender = SendClient([message], [f"localhostx:{port}"], "", "", 10, []) + sender = SendClient([message], [f"localhost:{port}"], "", "", 10, []) errors = [] + received = {} + print(id(errors)) on_message_original = _RecvClient.on_message with patch( "pubtools.sign.clients.msg_recv_client._RecvClient.on_message", autospec=True @@ -208,6 +225,7 @@ def test_recv_client_errors( ] receiver = RecvClient( + "uid-1", f_msgsigner_send_to_queue, ["1"], "request_id", @@ -217,6 +235,7 @@ def test_recv_client_errors( 10.0, 1, errors, + received, ) tsc = Thread(target=sender.run, args=()) @@ -224,10 +243,61 @@ def test_recv_client_errors( trc.start() tsc.start() - time.sleep(1) + # time.sleep(1) + trc.join() assert len(errors) == 1 +def test_recv_client_timeout_recv_in_time( + f_cleanup_msgsigner_messages, + f_qpid_broker, + f_msgsigner_listen_to_topic, + f_fake_msgsigner, + f_msgsigner_send_to_queue, +): + qpid_broker, port = f_qpid_broker + message = MsgMessage( + headers={"mtype": "test"}, + address=f_msgsigner_listen_to_topic, + body={"msg": {"message": "test_message", "request_id": "1"}}, + ) + message2 = MsgMessage( + headers={"mtype": "test"}, + address=f_msgsigner_listen_to_topic, + body={"msg": {"message": "test_message", "request_id": "2"}}, + ) + sender = SendClient([message, message2], [f"localhost:{port}"], "", "", 10, [], prefetch=1) + errors = [] + received = {} + + # on_message_original = _RecvClient.on_message + receiver = RecvClient( + "uid-1", + f_msgsigner_send_to_queue, + ["1", "2"], + "request_id", + [f"localhost:{port}"], + "", + "", + 8, + 1, + errors, + received, + ) + + sender.handler.handlers[0].on_start(Mock()) + receiver._handler.on_start(Mock()) + sender.handler.handlers[0].on_sendable(Mock()) + receiver._handler.on_message( + Mock(message=Mock(body='{"msg":{"message":"test_message","request_id":"2"}}')) + ) + receiver._handler.on_timer_task(Mock()) + receiver._handler.on_message( + Mock(message=Mock(body='{"msg":{"message":"test_message","request_id":"2"}}')) + ) + assert receiver.errors == [] + + def test_recv_client_recv_message_stray( f_cleanup_msgsigner_messages, f_qpid_broker, @@ -244,7 +314,9 @@ def test_recv_client_recv_message_stray( sender = SendClient([message], [f"localhost:{port}"], "", "", 10, []) errors = [] + received = {} receiver = RecvClient( + "uid-1", f_msgsigner_send_to_queue_stray, ["1"], "request_id", @@ -254,6 +326,7 @@ def test_recv_client_recv_message_stray( 60.0, 2, errors, + received, ) tsc = Thread(target=sender.run, args=()) @@ -297,7 +370,9 @@ def test_recv_client_recv_message_timeout( sender = SendClient([message], [f"localhost:{port}"], "", "", 10, []) errors = [] + received = {} receiver = RecvClient( + "uid-1", f_msgsigner_send_to_queue, ["1"], "request_id", @@ -307,6 +382,7 @@ def test_recv_client_recv_message_timeout( 1.0, 2, errors, + received, ) tsc = Thread(target=sender.run, args=()) @@ -381,7 +457,9 @@ def test_recv_client_close( sender = SendClient([message], [f"localhost:{port}"], "", "", 10, []) errors = [] + received = {} receiver = RecvClient( + "uid-1", f_msgsigner_send_to_queue, ["1"], "request_id", @@ -391,6 +469,7 @@ def test_recv_client_close( 1.0, 2, errors, + received, ) tsc = Thread(target=sender.run, args=()) diff --git a/tests/test_msg_signer.py b/tests/test_msg_signer.py index 41aca32..20b0090 100644 --- a/tests/test_msg_signer.py +++ b/tests/test_msg_signer.py @@ -400,7 +400,7 @@ def test_create_msg_message(f_config_msg_signer_ok): inputs=["test-data-inputs"], signing_key="test-key", task_id="1", repo="repo" ) assert signer._create_msg_message( - data, operation, SignRequestType.CONTAINER + data, "repo", operation, SignRequestType.CONTAINER ) == MsgMessage( headers={ "service": "pubtools-sign", @@ -420,7 +420,7 @@ def test_create_msg_message(f_config_msg_signer_ok): }, ) assert signer._create_msg_message( - data, operation, SignRequestType.CLEARSIGN + data, "repo", operation, SignRequestType.CLEARSIGN ) == MsgMessage( headers={ "service": "pubtools-sign", @@ -449,7 +449,6 @@ def test_sign(f_config_msg_signer_ok): references=("some-reference",), signing_key="test-signing-key", task_id="1", - repo="repo", ) clear_sign_operation = ClearSignOperation( inputs=["hello world"], signing_key="test-signing-key", task_id="1", repo="repo" @@ -538,7 +537,7 @@ def test_clear_sign_aliases(patched_uuid, f_config_msg_signer_aliases): patch_construct_signing_message.assert_called_once_with( "aGVsbG8gd29ybGQ=", "abcde1245", - repo="repo", + "repo", extra_attrs={"pub_task_id": "1"}, sig_type="clearsign_signature", ) @@ -613,13 +612,14 @@ def test_clear_sign_send_errors(patched_uuid, f_config_msg_signer_ok): @patch("uuid.uuid4", return_value="1234-5678-abcd-efgh") -def test_container_sign(patched_uuid, f_config_msg_signer_ok, f_client_certificate): +def test_container_sign( + patched_uuid, f_config_msg_signer_ok, f_client_certificate, f_ca_certificate +): container_sign_operation = ContainerSignOperation( task_id="1", digests=["sha256:abcdefg"], references=["some-registry/namespace/repo:tag"], signing_key="test-signing-key", - repo="repo", ) with patch("pubtools.sign.signers.msgsigner.SendClient") as patched_send_client: @@ -632,7 +632,7 @@ def test_container_sign(patched_uuid, f_config_msg_signer_ok, f_client_certifica {"fake": "headers"}, ) } - patched_recv_client.return_value._errors = [] + patched_recv_client.return_value.get_errors.return_value = [] signer = MsgSigner() signer.load_config(load_config(f_config_msg_signer_ok)) @@ -642,9 +642,9 @@ def test_container_sign(patched_uuid, f_config_msg_signer_ok, f_client_certifica messages=[ANY], broker_urls=["amqps://broker-01:5671", "amqps://broker-02:5671"], cert=f_client_certificate, - ca_cert=os.path.expanduser("~/messaging/ca-cert.crt"), + ca_cert=f_ca_certificate, retries=3, - errors=[], + errors=patched_recv_client.return_value.get_errors.return_value, ) assert res == SigningResults( @@ -671,7 +671,6 @@ def test_container_sign_alias(patched_uuid, f_config_msg_signer_aliases, f_clien digests=["sha256:abcdefg"], references=["some-registry/namespace/repo:tag"], signing_key="beta", - repo="repo", ) with patch("pubtools.sign.signers.msgsigner.SendClient") as patched_send_client: @@ -684,7 +683,7 @@ def test_container_sign_alias(patched_uuid, f_config_msg_signer_aliases, f_clien {"fake": "headers"}, ) } - patched_recv_client.return_value._errors = [] + patched_recv_client.return_value.get_errors.return_value = [] signer = MsgSigner() signer.load_config(load_config(f_config_msg_signer_aliases)) @@ -699,7 +698,7 @@ def test_container_sign_alias(patched_uuid, f_config_msg_signer_aliases, f_clien patch_construct_signing_message.assert_called_once_with( ANY, "abcde1245", - repo="repo", + "namespace/repo", extra_attrs={"pub_task_id": "1", "manifest_digest": "sha256:abcdefg"}, sig_type="container_signature", ) @@ -710,7 +709,7 @@ def test_container_sign_alias(patched_uuid, f_config_msg_signer_aliases, f_clien cert=f_client_certificate, ca_cert=os.path.expanduser("~/messaging/ca-cert.crt"), retries=3, - errors=[], + errors=patched_recv_client.return_value.get_errors.return_value, ) assert res == SigningResults( @@ -737,13 +736,12 @@ def test_container_sign_recv_errors(patched_uuid, f_config_msg_signer_ok): digests=["sha256:abcdefg"], references=["some-registry/namespace/repo:tag"], signing_key="test-signing-key", - repo="repo", ) with patch("pubtools.sign.signers.msgsigner.SendClient") as patched_send_client: with patch("pubtools.sign.signers.msgsigner.RecvClient") as patched_recv_client: patched_send_client.return_value.run.return_value = [] - patched_recv_client.return_value._errors = [ + patched_recv_client.return_value.get_errors.return_value = [ MsgError( name="TestError", description="test error description", source="test-source" ) @@ -773,7 +771,6 @@ def test_container_sign_send_errors(patched_uuid, f_config_msg_signer_ok): digests=["sha256:abcdefg"], references=["some-registry/namespace/repo:tag"], signing_key="test-signing-key", - repo="repo", ) with patch("pubtools.sign.signers.msgsigner.SendClient") as patched_send_client: with patch("pubtools.sign.signers.msgsigner.RecvClient") as patched_recv_client: @@ -808,7 +805,6 @@ def test_container_sign_wrong_inputs(patched_uuid, f_config_msg_signer_ok): digests=["sha256:abcdefg"], references=["some-registry/namespace/repo:tag", "some-registry/namespace/repo:tag2"], signing_key="test-signing-key", - repo="repo", ) signer = MsgSigner() @@ -817,6 +813,219 @@ def test_container_sign_wrong_inputs(patched_uuid, f_config_msg_signer_ok): signer.container_sign(container_sign_operation) +@patch("uuid.uuid4", return_value="1234-5678-abcd-efgh") +def test_container_sign_recv_timeout_fails(patched_uuid, f_config_msg_signer_ok): + container_sign_operation = ContainerSignOperation( + task_id="1", + digests=["sha256:abcdefg"], + references=["some-registry/namespace/repo:tag"], + signing_key="test-signing-key", + ) + + with patch("pubtools.sign.signers.msgsigner.SendClient") as patched_send_client: + with patch("pubtools.sign.signers.msgsigner.RecvClient") as patched_recv_client: + patched_send_client.return_value.run.return_value = [] + patched_recv_client.return_value.get_errors.return_value = [ + MsgError( + name="MessagingTimeout", + description="Out of time when receiving messages", + source="test-source", + ), + MsgError( + name="MessagingTimeout", + description="Out of time when receiving messages", + source="test-source", + ), + MsgError( + name="MessagingTimeout", + description="Out of time when receiving messages", + source="test-source", + ), + ] + patched_recv_client.return_value.recv = { + "1234-5678-abcd-efgh": ( + {"msg": {"errors": [], "signed_claim": "signed:'claim'"}}, + {"fake": "headers"}, + ) + } + + signer = MsgSigner() + signer.retries = 2 + signer.send_retries = 1 + + signer.load_config(load_config(f_config_msg_signer_ok)) + res = signer.container_sign(container_sign_operation) + + assert res == SigningResults( + signer=signer, + operation=container_sign_operation, + signer_results=MsgSignerResults( + status="error", + error_message="MessagingTimeout : Out of time when receiving messages\n" + "MessagingTimeout : Out of time when receiving messages\n", + ), + operation_result=ContainerSignResult( + results=[""], signing_key="test-signing-key", failed=False + ), + ) + + +@patch("uuid.uuid4", return_value="1234-5678-abcd-efgh") +def test_container_sign_recv_timeout_ok(patched_uuid, f_config_msg_signer_ok): + container_sign_operation = ContainerSignOperation( + task_id="1", + digests=["sha256:abcdefg"], + references=["some-registry/namespace/repo:tag"], + signing_key="test-signing-key", + ) + + with patch("pubtools.sign.signers.msgsigner.SendClient") as patched_send_client: + with patch("pubtools.sign.signers.msgsigner.RecvClient") as patched_recv_client: + patched_send_client.return_value.run.return_value = [] + patched_recv_client.return_value.get_errors.return_value = [ + MsgError( + name="MessagingTimeout", + description="Out of time when receiving messages", + source="test-source", + ), + ] + patched_recv_client.return_value.recv = { + "1234-5678-abcd-efgh": ( + {"msg": {"errors": [], "signed_claim": "signed:'claim'"}}, + {"fake": "headers"}, + ) + } + + signer = MsgSigner() + signer.retries = 2 + signer.send_retries = 1 + + signer.load_config(load_config(f_config_msg_signer_ok)) + res = signer.container_sign(container_sign_operation) + + assert res == SigningResults( + signer=signer, + operation=container_sign_operation, + signer_results=MsgSignerResults( + status="ok", + error_message="", + ), + operation_result=ContainerSignResult( + results=[ + ( + {"msg": {"errors": [], "signed_claim": "signed:'claim'"}}, + {"fake": "headers"}, + ) + ], + signing_key="test-signing-key", + failed=False, + ), + ) + + +@patch("uuid.uuid4", return_value="1234-5678-abcd-efgh") +def test_clear_sign_recv_timeout(patched_uuid, f_config_msg_signer_ok): + clear_sign_operation = ClearSignOperation( + inputs=["hello world"], signing_key="test-signing-key", task_id="1", repo="repo" + ) + + with patch("pubtools.sign.signers.msgsigner.SendClient") as patched_send_client: + with patch("pubtools.sign.signers.msgsigner.RecvClient") as patched_recv_client: + patched_send_client.return_value.run.return_value = [] + patched_recv_client.return_value._errors = [ + MsgError( + name="MessagingTimeout", + description="Out of time when receiving messages", + source="test-source", + ), + MsgError( + name="MessagingTimeout", + description="Out of time when receiving messages", + source="test-source", + ), + MsgError( + name="MessagingTimeout", + description="Out of time when receiving messages", + source="test-source", + ), + ] + patched_recv_client.return_value.recv = {"1234-5678-abcd-efgh": "signed:'hello world'"} + + signer = MsgSigner() + signer.load_config(load_config(f_config_msg_signer_ok)) + res = signer.clear_sign(clear_sign_operation) + + assert res == SigningResults( + signer=signer, + operation=clear_sign_operation, + signer_results=MsgSignerResults( + status="error", + error_message="MessagingTimeout : Out of time when receiving messages\n", + ), + operation_result=ClearSignResult(outputs=[""], signing_key="test-signing-key"), + ) + + +def test_recv_client_recv_message_break( + f_cleanup_msgsigner_messages, + f_qpid_broker, + f_msgsigner_listen_to_topic, + f_fake_msgsigner, + f_msgsigner_send_to_queue, + f_client_certificate, + f_ca_certificate, + f_config_msg_signer_ok2, +): + qpid_broker, port = f_qpid_broker + container_sign_operation = ContainerSignOperation( + digests=("some-digest",), + references=("some/reference:some-tag",), + signing_key="test-signing-key", + task_id="1", + ) + with patch( + "pubtools.sign.clients.msg_recv_client.RecvClient.get_errors", autospec=True + ) as patched_recv_get_errors, patch( + "pubtools.sign.clients.msg_recv_client.RecvClient.get_received", autospec=True + ) as patched_recv_get_received, patch( + "uuid.uuid4", return_value="1234-5678-abcd-efgh" + ) as _: + patched_recv_get_errors.side_effect = [ + [ + MsgError( + name="MessagingTimeout", + description="Out of time when receiving messages", + source=ANY, + ) + ], + [ + MsgError( + name="MessagingTimeout", + description="Out of time when receiving messages", + source=ANY, + ) + ], + ] + patched_recv_get_received.side_effect = [{}, {"1234-5678-abcd-efgh": True}] + signer = MsgSigner() + signer.load_config(load_config(f_config_msg_signer_ok2)) + signer.retries = 2 + signer.send_retries = 2 + res = signer.container_sign(container_sign_operation) + + assert res == SigningResults( + signer=signer, + operation=container_sign_operation, + signer_results=MsgSignerResults( + status="error", + error_message="MessagingTimeout : Out of time when receiving messages\n", + ), + operation_result=ContainerSignResult( + results=[""], signing_key="test-signing-key", failed=False + ), + ) + + def test_msgsig_doc_arguments(): assert MsgSigner.doc_arguments() == { "options": { @@ -830,8 +1039,9 @@ def test_msgsig_doc_arguments(): "creator": {"description": "Identification of creator of signing request"}, "environment": {"description": "Environment indetification in sent messages"}, "service": {"description": "Service identificator"}, - "timeout": {"description": "Timeout for messaging sent/receive"}, - "retries": {"description": "Retries for messaging sent/receive"}, + "timeout": {"description": "Timeout for messaging receive"}, + "retries": {"description": "Retries for messaging receive"}, + "send_retries": {"description": "Retries for messaging send+receive"}, "message_id_key": { "description": "Attribute name in message body which should be used as message id" }, @@ -851,6 +1061,7 @@ def test_msgsig_doc_arguments(): "service": "pubtools-sign", "timeout": 1, "retries": 3, + "send_retries": 2, "message_id_key": "123", "log_level": "debug", "key_aliases": "{'production':'abcde1245'}", diff --git a/tests/test_sign_operations.py b/tests/test_sign_operations.py index 9a36394..bfe539f 100644 --- a/tests/test_sign_operations.py +++ b/tests/test_sign_operations.py @@ -13,18 +13,12 @@ def test_containersign_operation_doc_argument(): "task_id": { "description": "Usually pub task id, serves as identifier for in signing request" }, - "repo": { - "type": "str", - "description": "Repository name", - "required": "true", - }, }, "examples": { "digests": "", "references": "", "signing_key": "", "task_id": "", - "repo": "repo", }, } @@ -64,13 +58,11 @@ def test_container_sign_to_dict(): references=["references"], signing_key="sig-key", task_id="task-id", - repo="repo", ).to_dict() == dict( digests=["digest"], references=["references"], signing_key="sig-key", task_id="task-id", - repo="repo", ) diff --git a/tests/test_utils.py b/tests/test_utils.py index d05aff8..2166347 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,7 +2,7 @@ import pytest -from pubtools.sign.utils import set_log_level, sanitize_log_level +from pubtools.sign.utils import set_log_level, sanitize_log_level, run_in_parallel, FData def test_set_log_level(): @@ -28,3 +28,12 @@ def test_sanitize_log_level(): assert sanitize_log_level("UNKNOWN") == "INFO" assert sanitize_log_level("unknown") == "INFO" assert sanitize_log_level("UnKnOwN") == "INFO" + + +def simulated_error(x: int) -> None: + raise ValueError("Test") + + +def test_run_in_parallel_exception(): + with pytest.raises(ValueError): + run_in_parallel(simulated_error, [FData([1])])