diff --git a/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py b/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py index f3c4f92b09e..3de7144c3f9 100644 --- a/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py +++ b/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py @@ -31,7 +31,7 @@ _FILE_COUNT_RE: Final[str] = r" (\d+)\s*files" _PROGRESS_PERCENT_RE: Final[str] = r" (?:100|\d?\d)% " _ALL_DONE_RE: Final[str] = r"Everything is Ok" -# NOTE: the size of `chunk_to_emit` should not be too big nor too small otherwise it might skip some updates +# NOTE: the size of `chunk_to_emit` should in theory contain everything that above regexes capture _DEFAULT_CHUNK_SIZE: Final[NonNegativeInt] = 20 @@ -115,45 +115,36 @@ async def _output_reader( output_handlers: list[Callable[[str], Awaitable[None]]] | None, chunk_size: NonNegativeInt = _DEFAULT_CHUNK_SIZE, ) -> str: - command_output = "" + # NOTE: we do not read line by line but chunk by chunk otherwise we'd miss progress updates + # the key is to read the smallest possible chunks of data so that the progress can be properly parsed + if output_handlers is None: + output_handlers = [] - # Initialize buffer to store lookbehind window - lookbehind_buffer = "" + command_output = "" - undecodable_chunk: bytes | None = None + lookbehind_buffer = "" # store the last chunk while True: read_chunk = await stream.read(chunk_size) if not read_chunk: # Process remaining buffer if any - if lookbehind_buffer and output_handlers: + if lookbehind_buffer: await asyncio.gather( *[handler(lookbehind_buffer) for handler in output_handlers] ) break - try: - if undecodable_chunk: - chunk = (undecodable_chunk + read_chunk).decode("utf-8") - undecodable_chunk = None - else: - chunk = read_chunk.decode("utf-8") - except UnicodeDecodeError: - undecodable_chunk = read_chunk - continue + # `errors=replace`: avoids getting stuck when can't parse utf-8 + chunk = read_chunk.decode("utf-8", errors="replace") command_output += chunk # Combine lookbehind buffer with new chunk chunk_to_emit = lookbehind_buffer + chunk - - if output_handlers: - await asyncio.gather( - *[handler(chunk_to_emit) for handler in output_handlers] - ) - # Keep last window_size characters for next iteration - lookbehind_buffer = chunk_to_emit[-chunk_size:] + lookbehind_buffer = chunk_to_emit[-len(chunk) :] + + await asyncio.gather(*[handler(chunk_to_emit) for handler in output_handlers]) return command_output @@ -258,6 +249,7 @@ async def unarchive_dir( num_steps=1, description=IDStr(f"extracting {archive_to_extract.name}") ) + # get archive information archive_info_parser = ArchiveInfoParser() await _run_cli_command( f"7z l {archive_to_extract}", @@ -279,6 +271,7 @@ async def unarchive_dir( ) ) + # extract archive async def progress_handler(byte_progress: NonNegativeInt) -> None: if tqdm_progress.update(byte_progress) and log_cb: with log_catch(_logger, reraise=False):