From bc67ce294f25cc83c0fcf6168b154f2f6acab031 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcell=20Szab=C3=B3?= Date: Fri, 23 Feb 2024 12:53:12 +0100 Subject: [PATCH] added async code and modified all sync test cases to test async --- async_stream_unzip.py | 493 ++++++++++++++++++++++++ docs/features.md | 2 + docs/get-started.md | 18 + pyproject.toml | 1 + test_async.py | 843 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 1357 insertions(+) create mode 100644 async_stream_unzip.py create mode 100644 test_async.py diff --git a/async_stream_unzip.py b/async_stream_unzip.py new file mode 100644 index 0000000..9e131d7 --- /dev/null +++ b/async_stream_unzip.py @@ -0,0 +1,493 @@ +from functools import partial +from struct import Struct +import bz2 +import zlib + +from Crypto.Cipher import AES +from Crypto.Hash import HMAC, SHA1 +from Crypto.Util import Counter +from Crypto.Protocol.KDF import PBKDF2 + +from stream_inflate import stream_inflate64 +from stream_unzip import ( + UnfinishedIterationError, + TruncatedDataError, + UnsupportedFlagsError, + UnsupportedCompressionTypeError, + UnsupportedZip64Error, + UnexpectedSignatureError, + HMACIntegrityError, + CRC32IntegrityError, + MissingZipCryptoPasswordError, + MissingAESPasswordError, + IncorrectZipCryptoPasswordError, + IncorrectAESPasswordError, + DeflateError, + MissingAESExtraError, + BZ2Error, + InvalidAESKeyLengthError, + CompressedSizeIntegrityError, + UncompressedSizeIntegrityError, + TruncatedAESExtraError, + TruncatedZip64ExtraError, +) + +async def async_stream_unzip(zipfile_chunks, password=None, chunk_size=65536, allow_zip64=True): + local_file_header_signature = b'PK\x03\x04' + local_file_header_struct = Struct('> i) & 1 + + def parse_extra(extra): + extra_offset = 0 + while extra_offset <= len(extra) - 4: + extra_signature = extra[extra_offset:extra_offset+2] + extra_offset += 2 + extra_data_size, = unsigned_short.unpack(extra[extra_offset:extra_offset+2]) + extra_offset += 2 + extra_data = extra[extra_offset:extra_offset+extra_data_size] + extra_offset += extra_data_size + yield (extra_signature, extra_data) + + def get_extra_value(extra, if_true, signature, exception_if_missing, min_length, exception_if_too_short): + value = None + + if if_true: + try: + value = extra[signature] + except KeyError: + if exception_if_missing: + raise exception_if_missing() + else: + if len(value) < min_length: + raise exception_if_too_short() + + return value + + async def decrypt_weak_decompress(chunks, decompress, is_done, num_unused): + key_0 = 305419896 + key_1 = 591751049 + key_2 = 878082192 + crc32 = zlib.crc32 + bytes_c = bytes + + def update_keys(byte): + nonlocal key_0, key_1, key_2 + key_0 = ~crc32(bytes_c((byte,)), ~key_0) & 0xFFFFFFFF + key_1 = (key_1 + (key_0 & 0xFF)) & 0xFFFFFFFF + key_1 = ((key_1 * 134775813) + 1) & 0xFFFFFFFF + key_2 = ~crc32(bytes_c((key_1 >> 24,)), ~key_2) & 0xFFFFFFFF + + def decrypt(chunk): + chunk = bytearray(chunk) + for i, byte in enumerate(chunk): + temp = key_2 | 2 + byte ^= ((temp * (temp ^ 1)) >> 8) & 0xFF + update_keys(byte) + chunk[i] = byte + return bytes(chunk) + + for byte in password: + update_keys(byte) + + encryption_header = decrypt(await get_num(12)) + check_password_byte = \ + (mod_time >> 8) if has_data_descriptor else \ + (crc_32_expected >> 24) + + if encryption_header[11] != check_password_byte: + raise IncorrectZipCryptoPasswordError() + + while not is_done(): + for chunk in decompress(decrypt(await next_or_truncated_error(chunks))): + yield chunk + + return_num_unused(num_unused()) + + async def decrypt_aes_decompress(chunks, decompress, is_done, num_unused, key_length_raw): + try: + key_length, salt_length = {1: (16, 8), 2: (24, 12), 3: (32, 16)}[key_length_raw] + except KeyError: + raise InvalidAESKeyLengthError(key_length_raw) + + salt = await get_num(salt_length) + password_verification_length = 2 + + keys = PBKDF2(password, salt, 2 * key_length + password_verification_length, 1000) + if keys[-password_verification_length:] != await get_num(password_verification_length): + raise IncorrectAESPasswordError() + + decrypter = AES.new( + keys[:key_length], AES.MODE_CTR, + counter=Counter.new(nbits=128, little_endian=True) + ) + hmac = HMAC.new(keys[key_length:key_length*2], digestmod=SHA1) + while not is_done(): + chunk = await next_or_truncated_error(chunks) + for c in decompress(decrypter.decrypt(chunk)): + yield c + hmac.update(chunk[:len(chunk) - num_unused()]) + + return_num_unused(num_unused()) + + if await get_num(10) != hmac.digest()[:10]: + raise HMACIntegrityError() + + async def decrypt_none_decompress(chunks, decompress, is_done, num_unused): + while not is_done(): + for chunk in decompress(await next_or_truncated_error(chunks)): + yield chunk + + return_num_unused(num_unused()) + + async def read_data_and_count_and_crc32(chunks): + offset_1 = None + offset_2 = None + crc_32_actual = zlib.crc32(b'') + l = 0 + + async def _iter(): + nonlocal offset_1, offset_2, crc_32_actual, l + + offset_1 = get_offset_from_start() + async for chunk in chunks: + crc_32_actual = zlib.crc32(chunk, crc_32_actual) + l += len(chunk) + yield chunk + offset_2 = get_offset_from_start() + + return _iter(), lambda: offset_2 - offset_1, lambda: crc_32_actual, lambda: l + + async def checked_from_local_header(chunks, is_aes_2_encrypted, get_crc_32, get_compressed_size, get_uncompressed_size): + async for chunk in chunks: + yield chunk + + crc_32_data = get_crc_32() + compressed_size_data = get_compressed_size() + uncompressed_size_data = get_uncompressed_size() + + if not is_aes_2_encrypted and crc_32_expected != crc_32_data: + raise CRC32IntegrityError() + + if compressed_size_data != compressed_size: + raise CompressedSizeIntegrityError() + + if uncompressed_size_data != uncompressed_size: + raise UncompressedSizeIntegrityError() + + async def checked_from_data_descriptor(chunks, is_sure_zip64, is_aes_2_encrypted, get_crc_32, get_compressed_size, get_uncompressed_size): + # The format of the data descriptor is unfortunately not known with absolute certainty in all cases + # so we we use a heuristic to detect it - using the known crc32 value, compressed size, uncompressed + # size of the data, and possible signature of the next section in the stream. There are 4 possible + # formats, and we choose the longest one that matches + # + # Strongly inspired by Mark Adler's unzip - see his reasoning for this at + # https://github.com/madler/unzip/commit/af0d07f95809653b669d88aa0f424c6d5aa48ba0 + + async for chunk in chunks: + yield chunk + + crc_32_data = get_crc_32() + compressed_size_data = get_compressed_size() + uncompressed_size_data = get_uncompressed_size() + best_matches = (False, False, False, False, False) + must_treat_as_zip64 = is_sure_zip64 or compressed_size_data > 0xFFFFFFFF or uncompressed_size_data > 0xFFFFFFFF + + checks = (( + (dd_struct_64_with_sig, dd_optional_signature), + (dd_struct_64, b''), + ) if allow_zip64 else ()) + (( + (dd_struct_32_with_sig, dd_optional_signature), + (dd_struct_32, b''), + ) if not must_treat_as_zip64 else ()) + + dd = await get_num(checks[0][0].size) + + for dd_struct, expected_signature in checks: + signature_dd, crc_32_dd, compressed_size_dd, uncompressed_size_dd, next_signature = dd_struct.unpack(dd[:dd_struct.size]) + matches = ( + signature_dd == expected_signature, + is_aes_2_encrypted or crc_32_dd == crc_32_data, + compressed_size_dd == compressed_size_data, + uncompressed_size_dd == uncompressed_size_data, + next_signature in (local_file_header_signature, central_directory_signature), + ) + best_matches = max(best_matches, matches, key=lambda t: t.count(True)) + + if best_matches == (True, True, True, True, True): + break + + if not best_matches[0]: + raise UnexpectedSignatureError() + + if not best_matches[1]: + raise CRC32IntegrityError() + + if not best_matches[2]: + raise CompressedSizeIntegrityError() + + if not best_matches[3]: + raise UncompressedSizeIntegrityError() + + if not best_matches[4]: + raise UnexpectedSignatureError(next_signature) + + return_bytes_unused(dd[dd_struct.size - 4:]) # 4 is the length of next signature we have already taken + + version, flags, compression_raw, mod_time, mod_date, crc_32_expected, compressed_size_raw, uncompressed_size_raw, file_name_len, extra_field_len = \ + local_file_header_struct.unpack(await get_num(local_file_header_struct.size)) + + flag_bits = tuple(get_flag_bits(flags)) + if ( + flag_bits[4] # Enhanced deflating + or flag_bits[5] # Compressed patched + or flag_bits[6] # Strong encrypted + or flag_bits[13] # Masked header values + ): + raise UnsupportedFlagsError(flag_bits) + + file_name = await get_num(file_name_len) + extra = dict(parse_extra(await get_num(extra_field_len))) + + is_weak_encrypted = flag_bits[0] and compression_raw != 99 + is_aes_encrypted = flag_bits[0] and compression_raw == 99 + aes_extra = get_extra_value(extra, is_aes_encrypted, aes_extra_signature, MissingAESExtraError, 7, TruncatedAESExtraError) + is_aes_2_encrypted = is_aes_encrypted and aes_extra[0:2] == b'\x02\x00' + + if is_weak_encrypted and password is None: + raise MissingZipCryptoPasswordError() + + if is_aes_encrypted and password is None: + raise MissingAESPasswordError() + + compression = \ + unsigned_short.unpack(aes_extra[5:7])[0] if is_aes_encrypted else \ + compression_raw + + if compression not in (0, 8, 9, 12): + raise UnsupportedCompressionTypeError(compression) + + has_data_descriptor = flag_bits[3] + might_be_zip64 = compressed_size_raw == zip64_compressed_size and uncompressed_size_raw == zip64_compressed_size + zip64_extra = get_extra_value(extra, might_be_zip64, zip64_size_signature, False, 16, TruncatedZip64ExtraError) + is_sure_zip64 = bool(zip64_extra) + + if not allow_zip64 and is_sure_zip64: + raise UnsupportedZip64Error() + + compressed_size = \ + None if has_data_descriptor and compression in (8, 9, 12) else \ + unsigned_long_long.unpack(zip64_extra[8:16])[0] if is_sure_zip64 else \ + compressed_size_raw + + uncompressed_size = \ + None if has_data_descriptor and compression in (8, 9, 12) else \ + unsigned_long_long.unpack(zip64_extra[:8])[0] if is_sure_zip64 else \ + uncompressed_size_raw + + decompressor = \ + get_decompressor_none(uncompressed_size) if compression == 0 else \ + get_decompressor_deflate() if compression == 8 else \ + get_decompressor_deflate64() if compression == 9 else \ + get_decompressor_bz2() + + decompressed_bytes = \ + decrypt_weak_decompress(yield_all(), *decompressor) if is_weak_encrypted else \ + decrypt_aes_decompress(yield_all(), *decompressor, key_length_raw=aes_extra[4]) if is_aes_encrypted else \ + decrypt_none_decompress(yield_all(), *decompressor) + + counted_decompressed_bytes, get_compressed_size, get_crc_32_actual, get_uncompressed_size = await read_data_and_count_and_crc32(decompressed_bytes) + + checked_bytes = \ + checked_from_data_descriptor(counted_decompressed_bytes, is_sure_zip64, is_aes_2_encrypted, get_crc_32_actual, get_compressed_size, get_uncompressed_size) if has_data_descriptor else \ + checked_from_local_header(counted_decompressed_bytes, is_aes_2_encrypted, get_crc_32_actual, get_compressed_size, get_uncompressed_size) + + return file_name, uncompressed_size, checked_bytes + + async def all(): + yield_all, get_num, return_num_unused, return_bytes_unused, get_offset_from_start = await get_byte_readers(zipfile_chunks) + + while True: + signature = await get_num(len(local_file_header_signature)) + if signature == local_file_header_signature: + yield await yield_file(yield_all, get_num, return_num_unused, return_bytes_unused, get_offset_from_start) + elif signature in (central_directory_signature, end_of_central_directory_signature): + async for _ in yield_all(): + pass + break + else: + raise UnexpectedSignatureError(signature) + + async for file_name, file_size, unzipped_chunks in all(): + yield file_name, file_size, unzipped_chunks + async for _ in unzipped_chunks: + raise UnfinishedIterationError() + diff --git a/docs/features.md b/docs/features.md index f8f0fe9..49f0982 100644 --- a/docs/features.md +++ b/docs/features.md @@ -20,3 +20,5 @@ In addition to being memory efficient, stream-unzip supports: - ZIP files created by Java's ZipOutputStream that are larger than 4GiB. At the time of writing libarchive-based stream readers cannot read these without error. - BZip2-compressed ZIPs. + +- Fetching asyncronously from data source (asyncio) diff --git a/docs/get-started.md b/docs/get-started.md index 5d3753b..1ccad14 100644 --- a/docs/get-started.md +++ b/docs/get-started.md @@ -25,6 +25,8 @@ If you regularly install stream-unzip, such as during application deployment, to ## Usage +### Syncronous + A single function is exposed, `stream_unzip`, that takes a single argument: an iterable that should yield the bytes of a ZIP file [with no zero-length chunks]. It returns an iterable, where each yielded item is a tuple of the file name, file size [`None` if this is not known], and another iterable itself yielding the unzipped bytes of that file. ```python @@ -43,3 +45,19 @@ for file_name, file_size, unzipped_chunks in stream_unzip(zipped_chunks(), passw ``` The file name and file size are extracted as reported from the file. If you don't trust the creator of the ZIP file, these should be treated as untrusted input. + +### Asyncronous + +Similarly to the synchronous mode, `async_stream_unzip` is exposed that takes and asyncronous generator, yields the exact same results as the syncronous. + +```python +from stream_unzip import async_stream_unzip +import httpx + +client = httpx.AsyncClient() +async with client.stream('GET', 'https://www.example.com/my.zip') as r: + for file_name, file_size, unzipped_chunks in async_stream_unzip(r.aiter_bytes(), password=b'my-password'): + async for chunk in unzipped_chunks: + print(chunk) + +``` diff --git a/pyproject.toml b/pyproject.toml index af060c2..4c65efd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,5 +41,6 @@ ci = [ [tool.hatch.build] include = [ + "async_stream_unzip.py", "stream_unzip.py", ] diff --git a/test_async.py b/test_async.py new file mode 100644 index 0000000..a4e72d1 --- /dev/null +++ b/test_async.py @@ -0,0 +1,843 @@ +import itertools +import io +import unittest +import uuid +import random +import zipfile +import asyncio + + +from stream_unzip import ( + UnfinishedIterationError, + TruncatedDataError, + UnsupportedFlagsError, + UnsupportedCompressionTypeError, + UnsupportedZip64Error, + UnexpectedSignatureError, + HMACIntegrityError, + CRC32IntegrityError, + MissingZipCryptoPasswordError, + MissingAESPasswordError, + IncorrectZipCryptoPasswordError, + IncorrectAESPasswordError, + DeflateError, +) +from async_stream_unzip import async_stream_unzip + +class TestStreamUnzipAsync(unittest.IsolatedAsyncioTestCase): + + async def test_methods_and_chunk_sizes_async(self): + rnd = random.Random() + rnd.seed(1) + + methods = [zipfile.ZIP_BZIP2, zipfile.ZIP_DEFLATED, zipfile.ZIP_STORED] + input_sizes = [1, 7, 65536] + output_sizes = [1, 7, 65536] + + contents = [ + b'short', + b''.join([uuid.UUID(int=rnd.getrandbits(128), version=4).hex.encode() for _ in range(0, 10000)]) + ] + + async def yield_input(content, method, input_size): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', method) as zf: + zf.writestr('first.txt', content) + zf.writestr('second.txt', content) + + zip_bytes = file.getvalue() + + for i in range(0, len(zip_bytes), input_size): + yield zip_bytes[i:i + input_size] + + async def subtest(content, method, input_size, output_size): + with self.subTest(content=content[:5], method=method, input_size=input_size, output_size=output_size): + files = [] + async for name, size, chunks in async_stream_unzip(yield_input(content, method, input_size), chunk_size=output_size): + byte_text = [] + async for chunk in chunks: + byte_text.append(chunk) + files.append((name, size, b''.join(byte_text))) + # list operations hang for some reason + self.assertEqual(files[0][0], b'first.txt') + self.assertEqual(files[0][1], len(content)) + self.assertEqual(files[0][2], content) + self.assertEqual(files[1][0], b'second.txt') + self.assertEqual(files[1][1], len(content)) + self.assertEqual(files[1][2], content) + + combinations_iter = itertools.product(contents, methods, input_sizes, output_sizes) + tasks = [] + for content, method, input_size, output_size in combinations_iter: + tasks.append(asyncio.create_task(subtest(content, method, input_size, output_size))) + await asyncio.gather(*tasks) + + + async def test_skipping_wrapper(self): + rnd = random.Random() + rnd.seed(1) + + methods = [zipfile.ZIP_DEFLATED, zipfile.ZIP_STORED] + input_sizes = [1, 7, 65536] + output_sizes = [1, 7, 65536] + + contents = [ + b'short', + b''.join([uuid.UUID(int=rnd.getrandbits(128), version=4).hex.encode() for _ in range(0, 10000)]) + ] + + async def yield_input(content, method, input_size): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', method) as zf: + zf.writestr('first.txt', content) + zf.writestr('second.txt', content) + + zip_bytes = file.getvalue() + + for i in range(0, len(zip_bytes), input_size): + yield zip_bytes[i:i + input_size] + + async def skippable(stream_unzip_output): + async def chunk_gen_func(chunks): + async for chunk in chunks: + yield chunk + + async for name, size, chunks in stream_unzip_output: + chunks_gen = chunk_gen_func(chunks) + yield name, size, chunks_gen + async for a in chunks_gen: + pass + + async def subtest(content, method, input_size, output_size): + with self.subTest(content=content[:5], method=method, input_size=input_size, output_size=output_size): + combined = b'' + + async for name, size, chunks in skippable(async_stream_unzip(yield_input(content, method, input_size), chunk_size=output_size)): + if name == b'first.txt': + continue + byte_text = [] + async for chunk in chunks: + byte_text.append(chunk) + combined = b''.join(byte_text) + + self.assertEqual(combined, content) + + combinations_iter = itertools.product(contents, methods, input_sizes, output_sizes) + tasks = [] + for content, method, input_size, output_size in combinations_iter: + tasks.append(asyncio.create_task(subtest(content, method, input_size, output_size))) + await asyncio.gather(*tasks) + + + async def test_exception_on_skip(self): + rnd = random.Random() + rnd.seed(1) + + methods = [zipfile.ZIP_DEFLATED, zipfile.ZIP_STORED] + input_sizes = [1, 7, 65536] + output_sizes = [1, 7, 65536] + + contents = [ + b'short', + b''.join([uuid.UUID(int=rnd.getrandbits(128), version=4).hex.encode() for _ in range(0, 10000)]) + ] + + async def yield_input(content, method, input_size): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', method) as zf: + zf.writestr('first.txt', content) + zf.writestr('second.txt', content) + + zip_bytes = file.getvalue() + + for i in range(0, len(zip_bytes), input_size): + yield zip_bytes[i:i + input_size] + + async def subtest(content, method, input_size, output_size): + with self.subTest(content=content[:5], method=method, input_size=input_size, output_size=output_size): + with self.assertRaises(UnfinishedIterationError): + async for name, size, chunks in async_stream_unzip(yield_input(content, method, input_size), chunk_size=output_size): + if name == b'first.txt': + continue + + combinations_iter = itertools.product(contents, methods, input_sizes, output_sizes) + tasks = [] + for content, method, input_size, output_size in combinations_iter: + tasks.append(asyncio.create_task(subtest(content, method, input_size, output_size))) + await asyncio.gather(*tasks) + + async def test_output_size(self): + rnd = random.Random() + rnd.seed(1) + + methods = [zipfile.ZIP_DEFLATED, zipfile.ZIP_STORED] + input_sizes = [1, 7, 65536] + output_sizes = [1, 7, 65536] + + contents = [ + b'short', + b''.join([uuid.UUID(int=rnd.getrandbits(128), version=4).hex.encode() for _ in range(0, 10000)]) + ] + + async def yield_input(content, method, input_size): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', method) as zf: + zf.writestr('first.txt', content) + zf.writestr('second.txt', content) + + zip_bytes = file.getvalue() + + for i in range(0, len(zip_bytes), input_size): + yield zip_bytes[i:i + input_size] + + all_smaller = True + async def subtest(content, method, input_size, output_size): + nonlocal all_smaller + with self.subTest(content=content[:5], method=method, input_size=input_size, output_size=output_size): + async for _, _, chunks in async_stream_unzip(yield_input(content, method, input_size), chunk_size=output_size): + async for chunk in chunks: + all_smaller = all_smaller and len(chunk) <= output_size + + + combinations_iter = itertools.product(contents, methods, input_sizes, output_sizes) + tasks = [] + for content, method, input_size, output_size in combinations_iter: + tasks.append(asyncio.create_task(subtest(content, method, input_size, output_size))) + await asyncio.gather(*tasks) + self.assertTrue(all_smaller) + + async def test_exception_propagates(self): + rnd = random.Random() + rnd.seed(1) + + methods = [zipfile.ZIP_DEFLATED, zipfile.ZIP_STORED] + input_sizes = [1, 7, 65536] + output_sizes = [1, 7, 65536] + + contents = [ + b'short', + b''.join([uuid.UUID(int=rnd.getrandbits(128), version=4).hex.encode() for _ in range(0, 10000)]) + ] + + async def yield_input(content, method, input_size): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', method) as zf: + zf.writestr('first.txt', content) + zf.writestr('second.txt', content) + + zip_bytes = file.getvalue() + + for i in range(0, len(zip_bytes), input_size): + yield zip_bytes[i:i + input_size] + raise Exception('Exception from generator') + + async def subtest(content, method, input_size, output_size): + with self.subTest(content=content[:5], method=method, input_size=input_size, output_size=output_size): + with self.assertRaisesRegex(Exception, 'Exception from generator'): + async for _, _, chunks in async_stream_unzip(yield_input(content, method, input_size), chunk_size=output_size): + async for _ in chunks: + pass + + combinations_iter = itertools.product(contents, methods, input_sizes, output_sizes) + tasks = [] + for content, method, input_size, output_size in combinations_iter: + tasks.append(asyncio.create_task(subtest(content, method, input_size, output_size))) + await asyncio.gather(*tasks) + + + async def test_bad_crc_32(self): + rnd = random.Random() + rnd.seed(1) + + methods = [zipfile.ZIP_DEFLATED, zipfile.ZIP_STORED] + input_sizes = [1, 7, 65536] + output_sizes = [1, 7, 65536] + + contents = [ + b'short', + b''.join([uuid.UUID(int=rnd.getrandbits(128), version=4).hex.encode() for _ in range(0, 10000)]) + ] + + async def yield_input(content, method, input_size): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', method) as zf: + zf.writestr('first.txt', content) + zf.writestr('second.txt', content) + + zip_bytes = file.getvalue() + zip_bytes = zip_bytes[0:16] + bytes([zip_bytes[17] + 1 % 256]) + zip_bytes[17:] + + for i in range(0, len(zip_bytes), input_size): + yield zip_bytes[i:i + input_size] + + async def subtest(content, method, input_size, output_size): + with self.subTest(content=content[:5], method=method, input_size=input_size, output_size=output_size): + with self.assertRaises(CRC32IntegrityError): + async for _, _, chunks in async_stream_unzip(yield_input(content, method, input_size), chunk_size=output_size): + async for _ in chunks: + pass + + combinations_iter = itertools.product(contents, methods, input_sizes, output_sizes) + tasks = [] + for content, method, input_size, output_size in combinations_iter: + tasks.append(asyncio.create_task(subtest(content, method, input_size, output_size))) + await asyncio.gather(*tasks) + + + async def test_bad_deflate_data(self): + rnd = random.Random() + rnd.seed(1) + + input_sizes = [1, 7, 65536] + output_sizes = [1, 7, 65536] + + content = b''.join([uuid.UUID(int=rnd.getrandbits(128), version=4).hex.encode() for _ in range(0, 10000)]) + + async def yield_input(input_size): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr('first.txt', content) + + zip_bytes = file.getvalue() + zip_bytes = zip_bytes[0:500] + b'-' + zip_bytes[502:] + + for i in range(0, len(zip_bytes), input_size): + yield zip_bytes[i:i + input_size] + + async def subtest(input_size, output_size): + with self.subTest(input_size=input_size, output_size=output_size): + with self.assertRaises(DeflateError): + async for _, _, chunks in async_stream_unzip(yield_input(input_size), chunk_size=output_size): + async for _ in chunks: + pass + + combinations_iter = itertools.product(input_sizes, output_sizes) + tasks = [] + for input_size, output_size in combinations_iter: + tasks.append(asyncio.create_task(subtest(input_size, output_size))) + await asyncio.gather(*tasks) + + #since it is an async generator it wont send generator exit + async def test_break_not_raises_generator_exit(self): + rnd = random.Random() + rnd.seed(1) + + input_size = 65536 + content = b''.join([uuid.UUID(int=rnd.getrandbits(128), version=4).hex.encode() for _ in range(0, 10000)]) + + raised_generator_exit = False + + async def yield_input(): + nonlocal raised_generator_exit + + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr('first.txt', content) + zf.writestr('second.txt', content) + + zip_bytes = file.getvalue() + + try: + for i in range(0, len(zip_bytes), input_size): + yield zip_bytes[i:i + input_size] + except GeneratorExit: + raised_generator_exit = True + + async for name, size, chunks in async_stream_unzip(yield_input()): + async for chunk in chunks: + pass + + self.assertFalse(raised_generator_exit) + + async for name, size, chunks in async_stream_unzip(yield_input()): + async for chunk in chunks: + pass + break + + self.assertFalse(raised_generator_exit) + + async def test_truncation_raises_value_error(self): + rnd = random.Random() + rnd.seed(1) + + input_sizes = [65536] + content = b''.join([uuid.UUID(int=rnd.getrandbits(128), version=4).hex.encode() for _ in range(0, 100000)]) + + async def yield_input(input_size): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr('first.txt', content) + + zip_bytes = file.getvalue() + + yield zip_bytes[:input_size] + + async def subtest(input_size): + with self.subTest(input_size=input_size): + with self.assertRaises(TruncatedDataError): + async for name, size, chunks in async_stream_unzip(yield_input(input_size)): + async for chunk in chunks: + pass + + + tasks = [] + for input_size in input_sizes: + tasks.append(asyncio.create_task(subtest(input_size))) + await asyncio.gather(*tasks) + + + async def test_streaming(self): + rnd = random.Random() + rnd.seed(1) + + contents = b''.join([uuid.UUID(int=rnd.getrandbits(128), version=4).hex.encode() for _ in range(0, 10000)]) + latest = None + + async def yield_input(): + nonlocal latest + + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr('first.txt', contents) + + zip_bytes = file.getvalue() + chunk_size = 1 + + for i in range(0, len(zip_bytes), chunk_size): + yield zip_bytes[i:i + chunk_size] + latest = i + + latest_inputs = [[latest async for _ in chunks] async for _, _, chunks in async_stream_unzip(yield_input())][0] + + # Make sure the input is progressing during the output. In test, there + # are about 100k steps, so checking that it's greater than 1000 + # shouldn't make this test too flakey + num_steps = 0 + prev_i = 0 + for i in latest_inputs: + if i != prev_i: + num_steps += 1 + prev_i = i + self.assertGreater(num_steps, 1000) + + async def test_empty_file(self): + async def yield_input(): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr('first.txt', b'') + + yield file.getvalue() + + files = [] + async for name, size, chunks in async_stream_unzip(yield_input()): + byte_text = [] + async for chunk in chunks: + byte_text.append(chunk) + files.append((name, size, b''.join(byte_text))) + + self.assertEqual(files, [(b'first.txt', 0, b'')]) + + async def test_empty_zip(self): + async def yield_input(): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf: + pass + + yield file.getvalue() + + l = [z async for z in async_stream_unzip(yield_input())] + + self.assertEqual(l, []) + + async def test_not_zip(self): + async def yield_input(): + yield b'This is not a zip file' + + with self.assertRaises(UnexpectedSignatureError): + async for _ in async_stream_unzip(yield_input()): + pass + + async def test_python_zip64(self): + async def yield_input(): + with open('fixtures/python38_zip64.zip', 'rb') as f: + while True: + chunk = f.read(65536) + if not chunk: + break + yield chunk + + num_received_bytes = 0 + async for name, size, chunks in async_stream_unzip(yield_input()): + async for chunk in chunks: + num_received_bytes += len(chunk) + + self.assertEqual(size, 5000000000) + self.assertEqual(num_received_bytes, 5000000000) + + async def test_python_zip64_disabled(self): + async def yield_input(): + with open('fixtures/python38_zip64.zip', 'rb') as f: + while True: + chunk = f.read(65536) + if not chunk: + break + yield chunk + + with self.assertRaises(UnsupportedZip64Error): + async for name, size, chunks in async_stream_unzip(yield_input(), allow_zip64=False): + async for chunk in chunks: + pass + + async def test_macos_single_file(self): + async def yield_input(): + with open('fixtures/macos_10_14_5_single_file.zip', 'rb') as f: + yield f.read() + + num_received_bytes = 0 + files = [] + async for name, size, chunks in async_stream_unzip(yield_input()): + byte_str = [] + async for chunk in chunks: + byte_str.append(chunk) + files.append((name, size, b''.join(byte_str))) + + self.assertEqual(len(files), 3) + self.assertEqual(files[0], (b'contents.txt', None, b'Contents of the zip')) + + async def test_macos_multiple_files(self): + async def yield_input(): + with open('fixtures/macos_10_14_5_multiple_files.zip', 'rb') as f: + yield f.read() + + num_received_bytes = 0 + files = [] + async for name, size, chunks in async_stream_unzip(yield_input()): + byte_str = [] + async for chunk in chunks: + byte_str.append(chunk) + files.append((name, size, b''.join(byte_str))) + + self.assertEqual(len(files), 5) + self.assertEqual(files[0], (b'first.txt', None, b'Contents of the first file')) + self.assertEqual(files[1][0], b'__MACOSX/') + self.assertEqual(files[2][0], b'__MACOSX/._first.txt') + self.assertEqual(files[3], (b'second.txt', None, b'Contents of the second file')) + self.assertEqual(files[4][0], b'__MACOSX/._second.txt') + + async def test_infozip_zip_limit_without_descriptors(self): + async def yield_input(): + with open('fixtures/infozip_3_0_zip_limit_without_descriptors.zip', 'rb') as f: + while True: + chunk = f.read(65536) + if not chunk: + break + yield chunk + + num_received_bytes = [] + sizes = [] + names = [] + async for name, size, chunks in async_stream_unzip(yield_input()): + names.append(name) + sizes.append(size) + num_received_bytes.append(0) + async for chunk in chunks: + num_received_bytes[-1] += len(chunk) + + self.assertEqual(names, [b'-']) + self.assertEqual(sizes, [4294967295]) + self.assertEqual(num_received_bytes, [4294967295]) + + async def test_infozip_zip_limit_with_descriptors(self): + async def yield_input(): + with open('fixtures/infozip_3_0_zip_limit_with_descriptors.zip', 'rb') as f: + while True: + chunk = f.read(65536) + if not chunk: + break + yield chunk + + num_received_bytes = [] + sizes = [] + names = [] + async for name, size, chunks in async_stream_unzip(yield_input()): + names.append(name) + sizes.append(size) + num_received_bytes.append(0) + async for chunk in chunks: + num_received_bytes[-1] += len(chunk) + + self.assertEqual(names, [b'-']) + self.assertEqual(sizes, [None]) + self.assertEqual(num_received_bytes, [4294967295]) + + async def test_infozip_zip_limit_stored(self): + # This file is uncompressed, so it's double-zipped to just store a zipped + # one in the repo + async def yield_input(): + with open('fixtures/infozip_3_0_zip_limit_without_descriptors_stored.zip', 'rb') as f: + while True: + chunk = f.read(65536) + if not chunk: + break + yield chunk + + size = 0 + async for name, _, chunks_outer in async_stream_unzip(yield_input()): + async for name, _, chunks in async_stream_unzip(chunks_outer): + async for chunk in chunks: + size += len(chunk) + + self.assertEqual(size, 4294967295) + + async def test_infozip_zip64_with_descriptors(self): + async def yield_input(): + with open('fixtures/infozip_3_0_zip64_with_descriptors.zip', 'rb') as f: + while True: + chunk = f.read(65536) + if not chunk: + break + yield chunk + + num_received_bytes = [] + sizes = [] + names = [] + async for name, size, chunks in async_stream_unzip(yield_input()): + names.append(name) + sizes.append(size) + num_received_bytes.append(0) + async for chunk in chunks: + num_received_bytes[-1] += len(chunk) + + self.assertEqual(names, [b'first.txt', b'second.txt']) + self.assertEqual(sizes, [None, None]) + self.assertEqual(num_received_bytes, [5000000000, 19]) + + async def test_infozip_password_protected_file_correct_password(self): + async def yield_input(): + with open('fixtures/infozip_3_0_password.zip', 'rb') as f: + while True: + chunk = f.read(4) + if not chunk: + break + yield chunk + + files = [] + async for name, size, chunks in async_stream_unzip(yield_input(), password=b'password'): + byte_str = [] + async for chunk in chunks: + byte_str.append(chunk) + files.append((name, size, b''.join(byte_str))) + + self.assertEqual(files, [ + (b'compressed.txt', None, b'Some content to be password protected\n' * 14), + (b'uncompressed.txt', 37, b'Some content to be password protected'), + ]) + + async def test_infozip_password_protected_file_no_password(self): + async def yield_input(): + with open('fixtures/infozip_3_0_password.zip', 'rb') as f: + yield f.read() + + with self.assertRaises(MissingZipCryptoPasswordError): + async for name, size, chunks in async_stream_unzip(yield_input()): + anext(chunks) + + async def test_infozip_password_protected_file_bad_password(self): + async def yield_input(): + with open('fixtures/infozip_3_0_password.zip', 'rb') as f: + yield f.read() + + with self.assertRaises(IncorrectZipCryptoPasswordError): + async for name, size, chunks in async_stream_unzip(yield_input(), password=b'bad-password'): + anext(chunks) + + async def test_infozip_password_protected_file_data_descriptor_correct_password(self): + async def yield_input(): + with open('fixtures/infozip_3_0_password_data_descriptor.zip', 'rb') as f: + while True: + chunk = f.read(4) + if not chunk: + break + yield chunk + + files = [] + async for name, size, chunks in async_stream_unzip(yield_input(), password=b'password'): + byte_str = [] + async for chunk in chunks: + byte_str.append(chunk) + files.append((name, size, b''.join(byte_str))) + + self.assertEqual(files, [ + (b'-', None, b'Some encrypted content to be compressed. Yes, compressed.'), + ]) + + async def test_7za_password_protected_aes(self): + async def yield_input(i): + with open('fixtures/7za_17_4_aes.zip', 'rb') as f: + while True: + chunk = f.read(i) + if not chunk: + break + yield chunk + + # AES has block sizes of 16 bytes, so try to make sure there + # isn't some subtle dependency on chunks being a multiple of that + for i in tuple(range(1, 17)) + (100000,): + files = [] + async for name, size, chunks in async_stream_unzip(yield_input(i), password=b'password'): + byte_str = [] + async for chunk in chunks: + byte_str.append(chunk) + files.append((name, size, b''.join(byte_str))) + + self.assertEqual(files, [ + (b'content.txt', 384, b'Some content to be compressed and AES-encrypted\n' * 8), + ]) + + async def test_7za_password_protected_aes_bad_hmac(self): + async def yield_input(): + with open('fixtures/7za_17_4_aes.zip', 'rb') as f: + data = f.read() + yield data[0:130] + b'-' + data[132:] + + with self.assertRaises(HMACIntegrityError): + async for name, size, chunks in async_stream_unzip(yield_input(), password=b'password'): + async for chunk in chunks: + pass + + async def test_7za_password_protected_aes_data_descriptor(self): + async def yield_input(i): + with open('fixtures/7za_17_4_aes_data_descriptor.zip', 'rb') as f: + while True: + chunk = f.read(i) + if not chunk: + break + yield chunk + + # AES has block sizes of 16 bytes, so try to make sure there + # isn't some subtle dependency on chunks being a multiple of that + for i in tuple(range(1, 17)) + (100000,): + files = [] + async for name, size, chunks in async_stream_unzip(yield_input(i), password=b'password'): + byte_str = [] + async for chunk in chunks: + byte_str.append(chunk) + files.append((name, size, b''.join(byte_str))) + + self.assertEqual(files, [ + (b'', None, b'Some content to be compressed and AES-encrypted\n' * 1000), + ]) + + async def test_7za_password_protected_aes_no_password(self): + async def yield_input(): + with open('fixtures/7za_17_4_aes.zip', 'rb') as f: + yield f.read() + + with self.assertRaises(MissingAESPasswordError): + async for name, size, chunks in async_stream_unzip(yield_input()): + anext(chunks) + + async def test_7za_password_protected_aes_bad_password(self): + async def yield_input(): + with open('fixtures/7za_17_4_aes.zip', 'rb') as f: + yield f.read() + + with self.assertRaises(IncorrectAESPasswordError): + async for name, size, chunks in async_stream_unzip(yield_input(), password=b'not-password'): + anext(chunks) + + async def test_7za_deflate64(self): + async def yield_input(): + with open('fixtures/7za_17_4_deflate64.zip', 'rb') as f: + yield f.read() + + async for name, size, chunks in async_stream_unzip(yield_input()): + byte_str = [] + async for chunk in chunks: + byte_str.append(chunk) + content = b''.join(byte_str) + + self.assertEqual(content, b'Some content to be compressed and AES-encrypted\n' * 1000) + + async def test_7z_password_data_descriptor(self): + async def yield_input(): + with open('fixtures/7z_17_4_password_data_descriptor.zip', 'rb') as f: + yield f.read() + + async for name, size, chunks in async_stream_unzip(yield_input(), password=b'password'): + byte_str = [] + async for chunk in chunks: + byte_str.append(chunk) + content = b''.join(byte_str) + + + self.assertEqual(content, b'Some content to be compressed and encrypted') + + async def test_java_zip_limit(self): + async def yield_input(): + with open('fixtures/java_19_0_1_zip_limit.zip', 'rb') as f: + yield f.read() + + l = 0 + async for name, size, chunks in async_stream_unzip(yield_input()): + async for chunk in chunks: + l += len(chunk) + + self.assertEqual(l, 4294967294) + + async def test_java_zip_limit_crc_32_error(self): + async def yield_input(): + with open('fixtures/java_19_0_1_zip_limit.zip', 'rb') as f: + b = f.read() + yield b[:-87] + b'\0' + b[-86:] + + with self.assertRaises(CRC32IntegrityError): + async for name, size, chunks in async_stream_unzip(yield_input()): + async for chunk in chunks: + pass + + async def test_java_zip64_limit(self): + async def yield_input(): + with open('fixtures/java_19_0_1_zip64_limit.zip', 'rb') as f: + yield f.read() + + l = 0 + async for name, size, chunks in async_stream_unzip(yield_input()): + async for chunk in chunks: + l += len(chunk) + + self.assertEqual(l, 4294967295) + + async def test_java_zip64_limit_crc_32_error(self): + async def yield_input(): + with open('fixtures/java_19_0_1_zip64_limit.zip', 'rb') as f: + b = f.read() + yield b[:-110] + b'\1' + b[-109:] + + with self.assertRaises(CRC32IntegrityError): + async for name, size, chunks in async_stream_unzip(yield_input()): + async for chunk in chunks: + pass + + async def test_java_zip64_limit_plus_one(self): + async def yield_input(): + with open('fixtures/java_19_0_1_zip64_limit_plus_one.zip', 'rb') as f: + yield f.read() + + l = 0 + async for name, size, chunks in async_stream_unzip(yield_input()): + async for chunk in chunks: + l += len(chunk) + + self.assertEqual(l, 4294967296) + + async def test_java_zip64_limit_plus_one_crc_32_error(self): + async def yield_input(): + with open('fixtures/java_19_0_1_zip64_limit_plus_one.zip', 'rb') as f: + b = f.read() + yield b[:-110] + b'\1' + b[-109:] + + with self.assertRaises(CRC32IntegrityError): + async for name, size, chunks in async_stream_unzip(yield_input()): + async for chunk in chunks: + pass