Skip to content

Commit

Permalink
adds std pipe iterator for stdin and stderr (#1321)
Browse files Browse the repository at this point in the history
* adds std pipe iterator for stdin and stderr

* fixes wrapped exceptions on stdout

* defines file numbers as literals, uses rstrip
  • Loading branch information
rudolfix authored May 6, 2024
1 parent cd74a97 commit b4a736c
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 10 deletions.
2 changes: 1 addition & 1 deletion dlt/common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _init_logging(
logger = logging.getLogger(logger_name)
logger.propagate = False
logger.setLevel(level)
# get or create logging handler
# get or create logging handler, we log to stderr by default
handler = next(iter(logger.handlers), logging.StreamHandler())
logger.addHandler(handler)

Expand Down
52 changes: 47 additions & 5 deletions dlt/common/runners/stdout.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import sys
import queue
from contextlib import contextmanager
from subprocess import PIPE, CalledProcessError
from threading import Thread
from typing import Any, Generator, Iterator, List
from typing import Any, Generator, Iterator, List, Tuple, Literal

from dlt.common.runners.venv import Venv
from dlt.common.runners.synth_pickle import decode_obj, decode_last_obj, encode_obj
from dlt.common.typing import AnyFun

# file number of stdout (1) and stderr (2)
OutputStdStreamNo = Literal[1, 2]


@contextmanager
def exec_to_stdout(f: AnyFun) -> Iterator[Any]:
Expand All @@ -24,6 +28,47 @@ def exec_to_stdout(f: AnyFun) -> Iterator[Any]:
print(encode_obj(rv), flush=True)


def iter_std(
venv: Venv, command: str, *script_args: Any
) -> Iterator[Tuple[OutputStdStreamNo, str]]:
"""Starts a process `command` with `script_args` in environment `venv` and returns iterator
of (filno, line) tuples where `fileno` is 1 for stdout and 2 for stderr. `line` is
a content of a line with stripped new line character.
Use -u in scripts_args for unbuffered python execution
"""
with venv.start_command(
command, *script_args, stdout=PIPE, stderr=PIPE, bufsize=1, text=True
) as process:
exit_code: int = None
q_: queue.Queue[Tuple[OutputStdStreamNo, str]] = queue.Queue()

def _r_q(std_: OutputStdStreamNo) -> None:
stream_ = process.stderr if std_ == 2 else process.stdout
for line in iter(stream_.readline, ""):
q_.put((std_, line.rstrip("\n")))
# close queue
q_.put(None)

# read stderr with a thread, selectors do not work on windows
t_out = Thread(target=_r_q, args=(1,), daemon=True)
t_out.start()
t_err = Thread(target=_r_q, args=(2,), daemon=True)
t_err.start()
while line := q_.get():
yield line

# get exit code
exit_code = process.wait()
# wait till stderr is received
t_out.join()
t_err.join()

# we fail iterator if exit code is not 0
if exit_code != 0:
raise CalledProcessError(exit_code, command, output="", stderr="")


def iter_stdout(venv: Venv, command: str, *script_args: Any) -> Iterator[str]:
# start a process in virtual environment, assume that text comes from stdout
with venv.start_command(
Expand All @@ -44,10 +89,7 @@ def _r_stderr() -> None:

# read stdout with
for line in iter(process.stdout.readline, ""):
if line.endswith("\n"):
yield line[:-1]
else:
yield line
yield line.rstrip("\n")

# get exit code
exit_code = process.wait()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dlt.common.exceptions import UnsupportedProcessStartMethodException

from dlt.common.runners import TRunMetrics, Venv
from dlt.common.runners.stdout import iter_stdout, iter_stdout_with_result
from dlt.common.runners.stdout import iter_std, iter_stdout, iter_stdout_with_result
from dlt.common.runners.synth_pickle import encode_obj, decode_obj, decode_last_obj

from dlt.common.utils import digest128b
Expand All @@ -17,6 +17,7 @@ class _TestPickler(NamedTuple):


# this is our unknown NamedTuple
# NOTE: do not remove commented out code
# class _TestPicklex(NamedTuple):
# val_str: str
# val_int: int
Expand Down Expand Up @@ -57,23 +58,23 @@ def test_pickle_encoder_none() -> None:
def test_synth_pickler_unknown_types() -> None:
# synth unknown tuple
obj = decode_obj(
"LfDoYo19lgUOtTn0Ib6JgASVQAAAAAAAAACMH3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3RfcGlwZXOUjAxfVGVzdFBpY2tsZXiUk5SMA1hZWpRLe4aUgZQu"
"ITcR+B7x+XYsddD8ws1cgASVRAAAAAAAAACMI3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3Rfc3RkX3BpcGVzlIwMX1Rlc3RQaWNrbGV4lJOUjANYWVqUS3uGlIGULg=="
)
assert type(obj).__name__.endswith("_TestPicklex")
# this is completely different type
assert not isinstance(obj, tuple)

# synth unknown class containing other unknown types
obj = decode_obj(
"Koyo502yl4IKMqIxUTJFgASVbQAAAAAAAACMH3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3RfcGlwZXOUjApfVGVzdENsYXNzlJOUKYGUfZQojAJzMZRoAIwMX1Rlc3RQaWNrbGV4lJOUjAFZlEsXhpSBlIwCczKUjAFVlIwDX3MzlEsDdWIu"
"G5nqyni0vOTdqmmd58izgASVcQAAAAAAAACMI3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3Rfc3RkX3BpcGVzlIwKX1Rlc3RDbGFzc5STlCmBlH2UKIwCczGUaACMDF9UZXN0UGlja2xleJSTlIwBWZRLF4aUgZSMAnMylIwBVZSMA19zM5RLA3ViLg=="
)
assert type(obj).__name__.endswith("_TestClass")
# tuple inside will be synthesized as well
assert type(obj.s1).__name__.endswith("_TestPicklex")

# known class containing unknown types
obj = decode_obj(
"PozhjHuf2oS7jPcRxKoagASVbQAAAAAAAACMH3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3RfcGlwZXOUjBJfVGVzdENsYXNzVW5rRmllbGSUk5QpgZR9lCiMAnMxlGgAjAxfVGVzdFBpY2tsZXiUk5SMAVmUSxeGlIGUjAJzMpSMAVWUdWIu"
"9Ob27Bf1H05E48gxbOJZgASVcQAAAAAAAACMI3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3Rfc3RkX3BpcGVzlIwSX1Rlc3RDbGFzc1Vua0ZpZWxklJOUKYGUfZQojAJzMZRoAIwMX1Rlc3RQaWNrbGV4lJOUjAFZlEsXhpSBlIwCczKUjAFVlHViLg=="
)
assert isinstance(obj, _TestClassUnkField)
assert type(obj.s1).__name__.endswith("_TestPicklex") # type: ignore[attr-defined]
Expand Down Expand Up @@ -146,6 +147,19 @@ def test_iter_stdout_raises() -> None:
assert _i == 2


def test_std_iter() -> None:
# even -> stdout, odd -> stderr
expected = [(1, "0"), (2, "1"), (1, "2"), (2, "3")]
with pytest.raises(CalledProcessError) as cpe:
for i, line in enumerate(
iter_std(
Venv.restore_current(), "python", "-u", "tests/common/scripts/stderr_counter.py"
)
):
assert expected[i] == line
assert cpe.value.returncode == 1


def test_stdout_encode_result() -> None:
# use current venv to execute so we have dlt
venv = Venv.restore_current()
Expand Down
10 changes: 10 additions & 0 deletions tests/common/scripts/stderr_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import sys
from time import sleep


for i in range(5):
print(i, file=sys.stderr if i % 2 else sys.stdout)
if i == 3:
exit(1)
sleep(0.3)
print("exit")

0 comments on commit b4a736c

Please sign in to comment.