From b4a736c057d69a447cf91e564622acd12aa98f52 Mon Sep 17 00:00:00 2001 From: rudolfix Date: Mon, 6 May 2024 20:23:52 +0200 Subject: [PATCH] adds std pipe iterator for stdin and stderr (#1321) * adds std pipe iterator for stdin and stderr * fixes wrapped exceptions on stdout * defines file numbers as literals, uses rstrip --- dlt/common/logger.py | 2 +- dlt/common/runners/stdout.py | 52 +++++++++++++++++-- .../{test_pipes.py => test_std_pipes.py} | 22 ++++++-- tests/common/scripts/stderr_counter.py | 10 ++++ 4 files changed, 76 insertions(+), 10 deletions(-) rename tests/common/runners/{test_pipes.py => test_std_pipes.py} (86%) create mode 100644 tests/common/scripts/stderr_counter.py diff --git a/dlt/common/logger.py b/dlt/common/logger.py index 02412248c3..88abd575b0 100644 --- a/dlt/common/logger.py +++ b/dlt/common/logger.py @@ -81,7 +81,7 @@ def _init_logging( logger = logging.getLogger(logger_name) logger.propagate = False logger.setLevel(level) - # get or create logging handler + # get or create logging handler, we log to stderr by default handler = next(iter(logger.handlers), logging.StreamHandler()) logger.addHandler(handler) diff --git a/dlt/common/runners/stdout.py b/dlt/common/runners/stdout.py index 8ddfb45ee4..6a92838342 100644 --- a/dlt/common/runners/stdout.py +++ b/dlt/common/runners/stdout.py @@ -1,13 +1,17 @@ import sys +import queue from contextlib import contextmanager from subprocess import PIPE, CalledProcessError from threading import Thread -from typing import Any, Generator, Iterator, List +from typing import Any, Generator, Iterator, List, Tuple, Literal from dlt.common.runners.venv import Venv from dlt.common.runners.synth_pickle import decode_obj, decode_last_obj, encode_obj from dlt.common.typing import AnyFun +# file number of stdout (1) and stderr (2) +OutputStdStreamNo = Literal[1, 2] + @contextmanager def exec_to_stdout(f: AnyFun) -> Iterator[Any]: @@ -24,6 +28,47 @@ def exec_to_stdout(f: AnyFun) -> Iterator[Any]: print(encode_obj(rv), flush=True) +def iter_std( + venv: Venv, command: str, *script_args: Any +) -> Iterator[Tuple[OutputStdStreamNo, str]]: + """Starts a process `command` with `script_args` in environment `venv` and returns iterator + of (filno, line) tuples where `fileno` is 1 for stdout and 2 for stderr. `line` is + a content of a line with stripped new line character. + + Use -u in scripts_args for unbuffered python execution + """ + with venv.start_command( + command, *script_args, stdout=PIPE, stderr=PIPE, bufsize=1, text=True + ) as process: + exit_code: int = None + q_: queue.Queue[Tuple[OutputStdStreamNo, str]] = queue.Queue() + + def _r_q(std_: OutputStdStreamNo) -> None: + stream_ = process.stderr if std_ == 2 else process.stdout + for line in iter(stream_.readline, ""): + q_.put((std_, line.rstrip("\n"))) + # close queue + q_.put(None) + + # read stderr with a thread, selectors do not work on windows + t_out = Thread(target=_r_q, args=(1,), daemon=True) + t_out.start() + t_err = Thread(target=_r_q, args=(2,), daemon=True) + t_err.start() + while line := q_.get(): + yield line + + # get exit code + exit_code = process.wait() + # wait till stderr is received + t_out.join() + t_err.join() + + # we fail iterator if exit code is not 0 + if exit_code != 0: + raise CalledProcessError(exit_code, command, output="", stderr="") + + def iter_stdout(venv: Venv, command: str, *script_args: Any) -> Iterator[str]: # start a process in virtual environment, assume that text comes from stdout with venv.start_command( @@ -44,10 +89,7 @@ def _r_stderr() -> None: # read stdout with for line in iter(process.stdout.readline, ""): - if line.endswith("\n"): - yield line[:-1] - else: - yield line + yield line.rstrip("\n") # get exit code exit_code = process.wait() diff --git a/tests/common/runners/test_pipes.py b/tests/common/runners/test_std_pipes.py similarity index 86% rename from tests/common/runners/test_pipes.py rename to tests/common/runners/test_std_pipes.py index 6db7c2d0e2..bd94d9e697 100644 --- a/tests/common/runners/test_pipes.py +++ b/tests/common/runners/test_std_pipes.py @@ -5,7 +5,7 @@ from dlt.common.exceptions import UnsupportedProcessStartMethodException from dlt.common.runners import TRunMetrics, Venv -from dlt.common.runners.stdout import iter_stdout, iter_stdout_with_result +from dlt.common.runners.stdout import iter_std, iter_stdout, iter_stdout_with_result from dlt.common.runners.synth_pickle import encode_obj, decode_obj, decode_last_obj from dlt.common.utils import digest128b @@ -17,6 +17,7 @@ class _TestPickler(NamedTuple): # this is our unknown NamedTuple +# NOTE: do not remove commented out code # class _TestPicklex(NamedTuple): # val_str: str # val_int: int @@ -57,7 +58,7 @@ def test_pickle_encoder_none() -> None: def test_synth_pickler_unknown_types() -> None: # synth unknown tuple obj = decode_obj( - "LfDoYo19lgUOtTn0Ib6JgASVQAAAAAAAAACMH3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3RfcGlwZXOUjAxfVGVzdFBpY2tsZXiUk5SMA1hZWpRLe4aUgZQu" + "ITcR+B7x+XYsddD8ws1cgASVRAAAAAAAAACMI3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3Rfc3RkX3BpcGVzlIwMX1Rlc3RQaWNrbGV4lJOUjANYWVqUS3uGlIGULg==" ) assert type(obj).__name__.endswith("_TestPicklex") # this is completely different type @@ -65,7 +66,7 @@ def test_synth_pickler_unknown_types() -> None: # synth unknown class containing other unknown types obj = decode_obj( - "Koyo502yl4IKMqIxUTJFgASVbQAAAAAAAACMH3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3RfcGlwZXOUjApfVGVzdENsYXNzlJOUKYGUfZQojAJzMZRoAIwMX1Rlc3RQaWNrbGV4lJOUjAFZlEsXhpSBlIwCczKUjAFVlIwDX3MzlEsDdWIu" + "G5nqyni0vOTdqmmd58izgASVcQAAAAAAAACMI3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3Rfc3RkX3BpcGVzlIwKX1Rlc3RDbGFzc5STlCmBlH2UKIwCczGUaACMDF9UZXN0UGlja2xleJSTlIwBWZRLF4aUgZSMAnMylIwBVZSMA19zM5RLA3ViLg==" ) assert type(obj).__name__.endswith("_TestClass") # tuple inside will be synthesized as well @@ -73,7 +74,7 @@ def test_synth_pickler_unknown_types() -> None: # known class containing unknown types obj = decode_obj( - "PozhjHuf2oS7jPcRxKoagASVbQAAAAAAAACMH3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3RfcGlwZXOUjBJfVGVzdENsYXNzVW5rRmllbGSUk5QpgZR9lCiMAnMxlGgAjAxfVGVzdFBpY2tsZXiUk5SMAVmUSxeGlIGUjAJzMpSMAVWUdWIu" + "9Ob27Bf1H05E48gxbOJZgASVcQAAAAAAAACMI3Rlc3RzLmNvbW1vbi5ydW5uZXJzLnRlc3Rfc3RkX3BpcGVzlIwSX1Rlc3RDbGFzc1Vua0ZpZWxklJOUKYGUfZQojAJzMZRoAIwMX1Rlc3RQaWNrbGV4lJOUjAFZlEsXhpSBlIwCczKUjAFVlHViLg==" ) assert isinstance(obj, _TestClassUnkField) assert type(obj.s1).__name__.endswith("_TestPicklex") # type: ignore[attr-defined] @@ -146,6 +147,19 @@ def test_iter_stdout_raises() -> None: assert _i == 2 +def test_std_iter() -> None: + # even -> stdout, odd -> stderr + expected = [(1, "0"), (2, "1"), (1, "2"), (2, "3")] + with pytest.raises(CalledProcessError) as cpe: + for i, line in enumerate( + iter_std( + Venv.restore_current(), "python", "-u", "tests/common/scripts/stderr_counter.py" + ) + ): + assert expected[i] == line + assert cpe.value.returncode == 1 + + def test_stdout_encode_result() -> None: # use current venv to execute so we have dlt venv = Venv.restore_current() diff --git a/tests/common/scripts/stderr_counter.py b/tests/common/scripts/stderr_counter.py new file mode 100644 index 0000000000..b64a124c66 --- /dev/null +++ b/tests/common/scripts/stderr_counter.py @@ -0,0 +1,10 @@ +import sys +from time import sleep + + +for i in range(5): + print(i, file=sys.stderr if i % 2 else sys.stdout) + if i == 3: + exit(1) + sleep(0.3) +print("exit")