Skip to content

Commit

Permalink
Retry commits
Browse files Browse the repository at this point in the history
  • Loading branch information
emilyzheng committed Apr 23, 2024
1 parent 1ac80d8 commit 9550013
Show file tree
Hide file tree
Showing 20 changed files with 759 additions and 203 deletions.
6 changes: 6 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ fail_under = 100
exclude_lines =
pragma: no cover
if __name__ == .__main__.:

[run]
dynamic_context = test_function

[html]
show_contexts = True
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[mypy]
ignore_missing_imports = True
disallow_subclassing_any = False
disallow_untyped_decorators = False
strict = True
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
requests
typing_extension
mock

108 changes: 77 additions & 31 deletions src/pubtools/sign/clients/msg_recv_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import json
import logging
import threading
Expand All @@ -18,6 +19,7 @@
class _RecvClient(_MsgClient):
def __init__(
self,
uid: str,
topic: str,
message_ids: List[str],
id_key: str,
Expand All @@ -42,14 +44,18 @@ def __init__(
self.confirmed = 0
self.recv = recv
self.timeout = timeout
self.recv_in_time = False
self.uid = uid
self.last_message_received = datetime.datetime.now()
LOG.info("Expected to receive %s messages", len(message_ids))

def on_start(self, event: proton.Event) -> None:
LOG.debug("RECEIVER: On start %s %s %s", event, self.topic, self.broker_urls)
self.conn = event.container.connect(
urls=self.broker_urls, ssl_domain=self.ssl_domain, sasl_enabled=False
)
self.receiver = event.container.create_receiver(self.conn, self.topic)
self.timer_task = event.container.schedule(self.timeout, self)
self.timer_task = event.container.schedule(self.timeout / 2, self)

def on_message(self, event: proton.Event) -> None:
LOG.debug("RECEIVER: On message (%s)", event)
Expand All @@ -60,6 +66,8 @@ def on_message(self, event: proton.Event) -> None:
if msg_id in self.recv_ids:
self.recv_ids[msg_id] = True
self.recv[msg_id] = (outer_message, headers)
self.recv_in_time = True
self.last_message_received = datetime.datetime.now()
self.accept(event.delivery)
else:
LOG.debug(f"RECEIVER: Ignored message {msg_id}")
Expand All @@ -68,23 +76,52 @@ def on_message(self, event: proton.Event) -> None:
self.timer_task.cancel()
event.receiver.close()
event.connection.close()
LOG.info("[%d][%s] All messages received", threading.get_ident(), self.uid)

def on_timer_task(self, event: proton.Event) -> None:
LOG.debug("RECEIVER: On timeout (%s)", event)
if self.recv_in_time:
LOG.info(
"[%d][%s] RECEIVER: On timeout but messages was received "
"- continue, received: %d/%d",
threading.get_ident(),
self.uid,
len([x for x in self.recv_ids.values() if x]),
len(self.recv_ids),
)
self.recv_in_time = False
self.timer_task = event.reactor.schedule(self.timeout / 2, self)
return
if (datetime.datetime.now() - self.last_message_received).total_seconds() < self.timeout:
self.timer_task = event.reactor.schedule(self.timeout / 2, self)
return
LOG.info(
"[%d][%s] RECEIVER: On timeout (%s) messages: %d/%d",
threading.get_ident(),
event,
self.uid,
len([x for x in self.recv_ids.values() if x]),
len(self.recv_ids),
)
self.timer_task.cancel()
if event.connection:
event.connection.close() # pragma: no cover
if event.receiver:
event.receiver.close() # pragma: no cover
event.container.stop()

self.errors.append(
MsgError(
source=event,
name="MessagingTimeout",
description="Out of time when receiving messages",
if not all(self.recv_ids.values()):
self.errors.append(
MsgError(
source=event,
name="MessagingTimeout",
description="[%d] Out of time when receiving messages (%d/%d)"
% (
threading.get_ident(),
len([x for x in self.recv_ids.values() if x]),
len(self.recv_ids),
),
)
)
)

def close(self) -> None:
if hasattr(self, "timer_task"):
Expand All @@ -100,6 +137,7 @@ class RecvClient(Container):

def __init__(
self,
uid: str,
topic: str,
message_ids: List[str],
id_key: str,
Expand All @@ -109,6 +147,7 @@ def __init__(
timeout: int,
retries: int,
errors: List[MsgError],
received: Dict[Any, Any],
) -> None:
"""Recv Client Initializer.
Expand All @@ -130,18 +169,23 @@ def __init__(
:type retries: int
:param errors: List of errors which occured during the process
:type errors: List[MsgError]
:param received: Mapping of received messages
:type errors: Dict[int, Any]
"""
self.message_ids = message_ids
self.recv: Dict[Any, Any] = {}
self._errors = errors
self.recv: Dict[Any, Any] = received
self._errors: List[MsgError] = errors
self.topic = topic
self.message_ids = message_ids
self.id_key = id_key
self.broker_urls = broker_urls
self.cert = cert
self.ca_cert = ca_cert
self.timeout = timeout
self.uid = uid
self._retries = retries
handler = _RecvClient(
uid=uid,
topic=topic,
message_ids=message_ids,
id_key=id_key,
Expand All @@ -152,37 +196,39 @@ def __init__(
recv=self.recv,
errors=self._errors,
)
self._retries = retries
super().__init__(handler)
self._handler = handler

def get_errors(self) -> List[MsgError]:
"""Get errors from receiver.
This method doesn't have any meaningfull usecase, it's only used for testing
"""
return self._errors # pragma: no cover

def get_received(self) -> Dict[Any, Any]:
"""Get received messages.
This method doesn't have any meaningfull usecase, it's only used for testing
"""
return self.recv # pragma: no cover

def run(self) -> Union[Dict[Any, Any], List[MsgError]]:
"""Run the receiver."""
errors_len = 0
if not len(self.message_ids):
LOG.warning("No messages to receive")
return []

for x in range(self._retries):
super().run()
if len(self._errors) == errors_len:
break
errors_len = len(self._errors)
recv = _RecvClient(
topic=self.topic,
message_ids=self.message_ids,
id_key=self.id_key,
broker_urls=self.broker_urls,
cert=self.cert,
ca_cert=self.ca_cert,
timeout=self.timeout,
recv=self.recv,
errors=self._errors,
)
super().__init__(recv)
else:
super().run()
if self._errors:
return self._errors
return self.recv

def close(self) -> None:
"""Close receiver."""
if self._handler:
self._handler.close()


class RecvThread(threading.Thread):
"""Receiver wrapper allows to stop receiver on demand."""
Expand All @@ -198,7 +244,7 @@ def __init__(self, recv: RecvClient):

def stop(self) -> None:
"""Stop receiver."""
self.recv.handler.handlers[0].close()
self.recv.close()

def run(self) -> None:
"""Run receiver."""
Expand Down
19 changes: 10 additions & 9 deletions src/pubtools/sign/clients/msg_send_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import List
from typing import List, Dict, Any

from ..models.msg import MsgMessage, MsgError

Expand All @@ -21,8 +21,9 @@ def __init__(
cert: str,
ca_cert: str,
errors: List[MsgError],
):
super().__init__(errors=errors)
**kwargs: Dict[str, Any],
) -> None:
super().__init__(errors=errors, **kwargs)
self.broker_urls = broker_urls
self.messages = messages
self.ssl_domain = proton.SSLDomain(proton.SSLDomain.MODE_CLIENT)
Expand All @@ -42,8 +43,7 @@ def on_start(self, event: proton.Event) -> None:
self.sender = event.container.create_sender(conn)

def on_sendable(self, event: proton.Event) -> None:
LOG.debug("Sender on_sendable")
if self.sent < self.total:
if event.sender.credit and self.sent < self.total:
message = self.messages[self.sent]
LOG.debug("Sending message: %s %s %s", message.body, message.address, message.headers)
event.sender.send(
Expand All @@ -56,10 +56,10 @@ def on_sendable(self, event: proton.Event) -> None:
self.sent += 1

def on_accepted(self, event: proton.Event) -> None:
LOG.debug("Sender accepted")
# LOG.info("Sender accepted")
self.confirmed += 1
if self.confirmed == self.total:
LOG.debug("Sender closing")
LOG.info("Sender closing")
event.connection.close()

def on_disconnected(self, event: proton.Event) -> None: # pragma: no cover
Expand All @@ -77,7 +77,8 @@ def __init__(
ca_cert: str,
retries: int,
errors: List[MsgError],
):
**kwargs: Dict[str, Any],
) -> None:
"""Send Client Initializer.
:param messages: List of messages to send.
Expand All @@ -97,7 +98,7 @@ def __init__(
)
self._retries = retries
self._errors = errors
super().__init__(self.handler)
super().__init__(self.handler, **kwargs)

def run(self) -> List[MsgError]:
"""Run the SendClient."""
Expand Down
1 change: 1 addition & 0 deletions src/pubtools/sign/conf/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class MsgSignerSchema(ma.Schema):
service = ma.fields.String(required=True)
timeout = ma.fields.Integer(required=True)
retries = ma.fields.Integer(required=True)
send_retries = ma.fields.Integer(required=True)
message_id_key = ma.fields.String(required=True)
log_level = ma.fields.String(default="INFO")
key_aliases = ma.fields.Dict(required=False, keys=ma.fields.String(), values=ma.fields.String())
Expand Down
1 change: 0 additions & 1 deletion src/pubtools/sign/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class SignOperation(ABC):

ResultType: ClassVar[OperationResult]
signing_key: str
repo: str

@classmethod
def doc_arguments(cls: Type[Self]) -> Dict[str, Any]:
Expand Down
9 changes: 0 additions & 9 deletions src/pubtools/sign/operations/containersign.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ class ContainerSignOperation(SignOperation):
task_id: str = field(
metadata={"description": "Usually pub task id, serves as identifier for in signing request"}
)
repo: str = field(
metadata={
"type": "str",
"description": "Repository name",
"required": "true",
"sample": "repo",
}
)

def to_dict(self) -> dict[str, Any]:
"""Return a dict representation of the object."""
Expand All @@ -38,5 +30,4 @@ def to_dict(self) -> dict[str, Any]:
references=self.references,
signing_key=self.signing_key,
task_id=self.task_id,
repo=self.repo,
)
9 changes: 8 additions & 1 deletion src/pubtools/sign/signers/cosignsigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,21 @@ def container_sign(self: CosignSigner, operation: ContainerSignOperation) -> Sig
processes[f"{ref_digest}"] = run_command(common_args + [ref_digest], env=env_vars)
for ref, process in processes.items():
stdout, stderr = process.communicate()
for i in range(self.retries):
if process.returncode != 0:
LOG.error(stderr)
LOG.error("Retry: %s" % i)
stdout, stderr = process.communicate()
else:
break
outputs[ref] = (stdout, stderr, process.returncode)

for ref, (stdout, stderr, returncode) in outputs.items():
if returncode != 0:
operation_result.results.append(stderr)
operation_result.failed = True
signing_results.signer_results.status = "failed"
signing_results.signer_results.error_message += stderr
else:
operation_result.results.append(stderr)
signing_results.operation_result = operation_result
Expand Down Expand Up @@ -342,7 +350,6 @@ def cosign_container_sign(
references=reference,
signing_key=signing_key,
task_id="",
repo="",
)
signing_result = cosign_signer.sign(operation)
return {
Expand Down
Loading

0 comments on commit 9550013

Please sign in to comment.