Skip to content

Commit

Permalink
refactor: Use io.TextIOWrapper to redirect stdout and stderr
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Dec 28, 2024
1 parent 2f91bc1 commit 07f348e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 16 deletions.
30 changes: 20 additions & 10 deletions singer_sdk/testing/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ def get_standard_target_tests(
return []


def tap_sync_test(tap: Tap) -> tuple[io.StringIO, io.StringIO]:
def tap_sync_test(
tap: Tap,
) -> tuple[
io.TextIOWrapper[io.BytesIO],
io.TextIOWrapper[io.BytesIO],
]:
"""Invokes a Tap object and return STDOUT and STDERR results in StringIO buffers.
Args:
Expand All @@ -120,8 +125,8 @@ def tap_sync_test(tap: Tap) -> tuple[io.StringIO, io.StringIO]:
Returns:
A 2-item tuple with StringIO buffers from the Tap's output: (stdout, stderr)
"""
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
stdout_buf = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
stderr_buf = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
tap.sync_all()
stdout_buf.seek(0)
Expand Down Expand Up @@ -171,10 +176,10 @@ def _select_all(catalog_dict: dict) -> dict:

def target_sync_test(
target: Target,
input: io.StringIO | None, # noqa: A002
input: t.IO[str] | None, # noqa: A002
*,
finalize: bool = True,
) -> tuple[io.StringIO, io.StringIO]:
) -> tuple[io.TextIOWrapper[io.BytesIO], io.TextIOWrapper[io.BytesIO]]:
"""Invoke the target with the provided input.
Args:
Expand All @@ -186,8 +191,8 @@ def target_sync_test(
Returns:
A 2-item tuple with StringIO buffers from the Target's output: (stdout, stderr)
"""
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
stdout_buf = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
stderr_buf = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")

with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
if input is not None:
Expand All @@ -203,7 +208,12 @@ def target_sync_test(
def tap_to_target_sync_test(
tap: Tap,
target: Target,
) -> tuple[io.StringIO, io.StringIO, io.StringIO, io.StringIO]:
) -> tuple[
io.TextIOWrapper[io.BytesIO],
io.TextIOWrapper[io.BytesIO],
io.TextIOWrapper[io.BytesIO],
io.TextIOWrapper[io.BytesIO],
]:
"""Test and end-to-end sink from the tap to the target.
Note: This method buffers all output from the tap in memory and should not be
Expand Down Expand Up @@ -236,15 +246,15 @@ def sync_end_to_end(tap: Tap, target: Target, *mappers: InlineMapper) -> None:
mappers: Zero or more inline mapper to apply in between the tap and target, in
order.
"""
buf = io.StringIO()
buf = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
with redirect_stdout(buf):
tap.sync_all()

buf.seek(0)
mapper_output = buf

for mapper in mappers:
buf = io.StringIO()
buf = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
with redirect_stdout(buf):
mapper.listen(mapper_output)

Expand Down
10 changes: 5 additions & 5 deletions singer_sdk/testing/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ def _execute_sync(self) -> tuple[str, str]:
Returns:
A 2-item tuple with StringIO buffers from the Tap's output: (stdout, stderr)
"""
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
stdout_buf = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
stderr_buf = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
self.run_sync_dry_run()
stdout_buf.seek(0)
Expand Down Expand Up @@ -281,7 +281,7 @@ def _execute_sync( # noqa: PLR6301
target_input: t.IO[str],
*,
finalize: bool = True,
) -> tuple[io.StringIO, io.StringIO]:
) -> tuple[io.TextIOWrapper[io.BytesIO], io.TextIOWrapper[io.BytesIO]]:
"""Invoke the target with the provided input.
Args:
Expand All @@ -294,8 +294,8 @@ def _execute_sync( # noqa: PLR6301
A 2-item tuple with StringIO buffers from the Target's output:
(stdout, stderr)
"""
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
stdout_buf = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
stderr_buf = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")

with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
if target_input is not None:
Expand Down
2 changes: 1 addition & 1 deletion tests/samples/test_tap_countries.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_batch_mode(outdir):
},
)

buf = io.StringIO()
buf = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
with redirect_stdout(buf):
tap.sync_all()

Expand Down

0 comments on commit 07f348e

Please sign in to comment.