Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests fixup, fix handling stdout on python 3.11 #19

Merged
merged 10 commits into from
Dec 21, 2024
12 changes: 6 additions & 6 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
include:
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.1/gitlab-base.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.1/gitlab-dom0.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.1/gitlab-vm.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.2/gitlab-base.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.2/gitlab-host.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.2/gitlab-vm.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.3/gitlab-base.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.3/gitlab-host.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.3/gitlab-vm.yml'

checks:tests:
stage: checks
Expand Down
16 changes: 12 additions & 4 deletions splitgpg2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
from typing import Optional, Dict, Callable, Awaitable, Tuple, Pattern, List, \
Union, Any, TypeVar, Set, TYPE_CHECKING, Coroutine, Sequence, cast

from .stdiostream import StdoutWriterProtocol

if TYPE_CHECKING:
from typing_extensions import Protocol
from typing import TypeAlias
Expand Down Expand Up @@ -326,15 +328,21 @@ def setup_subkey_keyring(self) -> None:
self.gnupghome, self.source_keyring_dir)
with subprocess.Popen(export_cmd,
stdout=subprocess.PIPE,
stdin=subprocess.DEVNULL) as exporter, (
stdin=subprocess.DEVNULL,
stderr=subprocess.PIPE) as exporter, (
subprocess.Popen(import_cmd,
stdin=exporter.stdout)) as importer:
stdin=exporter.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)) as importer:
pass
if exporter.returncode or importer.returncode:
self.log.warning('Unable to export keys. If your key has a '
'passphrase, you might want to save it to a '
'file and use passphrase-file and '
'pinentry-mode loopback in gpg.conf')
'pinentry-mode loopback in gpg.conf.')
self.log.warning("Exporter output: %s", exporter.stderr)
self.log.warning("Importer output: %s %s",
importer.stdout, importer.stderr)
self.log.info('Subkey-only keyring %r created',
self.gnupghome)

Expand Down Expand Up @@ -1405,7 +1413,7 @@ def open_stdinout_connection(*,

write_transport, write_protocol = loop.run_until_complete(
loop.connect_write_pipe(
lambda: asyncio.streams.FlowControlMixin(loop),
lambda: StdoutWriterProtocol(loop),
sys.stdout.buffer))
writer = asyncio.StreamWriter(write_transport, write_protocol, None, loop)

Expand Down
76 changes: 76 additions & 0 deletions splitgpg2/stdiostream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# based on asyncio library:
# Copyright (C) 2001 Python Software Foundation
#
# Copyright (C) 2024 Marek Marczykowski-Górecki
# <[email protected]>
#

import collections
from asyncio import protocols, events, Future
from typing import Optional, Any

class StdoutWriterProtocol(protocols.Protocol):
"""Reusable flow control logic for StreamWriter.drain().
This implements the protocol methods pause_writing(),
resume_writing() and connection_lost(). If the subclass overrides
these it must call the super methods.
StreamWriter.drain() must wait for _drain_helper() coroutine.
"""

def __init__(self, loop: Optional[events.AbstractEventLoop] = None) -> None:
if loop is None:
self._loop = events.get_event_loop()
else:
self._loop = loop
self._paused = False
self._drain_waiters: collections.deque[Future[None]] = \
collections.deque()
self._connection_lost = False
self._closed = self._loop.create_future()

def pause_writing(self) -> None:
assert not self._paused
self._paused = True

def resume_writing(self) -> None:
assert self._paused
self._paused = False

for waiter in self._drain_waiters:
if not waiter.done():
waiter.set_result(None)

def connection_lost(self, exc: Optional[BaseException]) -> None:
self._connection_lost = True
# Wake up the writer(s) if currently paused.
if not self._paused:
return

for waiter in self._drain_waiters:
if not waiter.done():
if exc is None:
waiter.set_result(None)
else:
waiter.set_exception(exc)
if not self._closed.done():
if exc is None:
self._closed.set_result(None)
else:
self._closed.set_exception(exc)

async def _drain_helper(self) -> None:
if self._connection_lost:
raise ConnectionResetError('Connection lost')
if not self._paused:
return
waiter = self._loop.create_future()
self._drain_waiters.append(waiter)
try:
await waiter
finally:
self._drain_waiters.remove(waiter)

# pylint: disable=unused-argument
def _get_close_waiter(self, stream: Any) -> Future[None]:
return self._closed
Loading