Skip to content

Commit

Permalink
Merge pull request #127 from uktrade/feat/final-types-to-enable-stric…
Browse files Browse the repository at this point in the history
…t-type-checking

feat: type annotations now allow strict type checking
  • Loading branch information
michalc authored May 27, 2024
2 parents a9ca0cb + 55ae0e1 commit 1d8a18a
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
pip install ".[ci]"
- name: "Run type checking"
run: |
mypy stream_zip.py
mypy stream_zip.py --strict
- name: "Run tests"
run: |
pytest --cov
Expand Down
73 changes: 52 additions & 21 deletions stream_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
object, # Sentinel of the methods above
object, # Sentinel of auto upgrade central directory or not
_CompressObjGetter, # Function to get the zlib Compress object for
Optional[int], # The uncompressed size of the file if known
Optional[int], # The CRC32 of the file if known
int, # The uncompressed size of the file for NO_COMPRESSION_STREAMED_* types
int, # The CRC32 of the file for NO_COMPRESSION_STREAMED_* types
]

# A "Method" is an instance of a class that has a _get function that returns a _MethodTuple
Expand All @@ -46,17 +46,17 @@ def _get(self, offset: int, default_get_compressobj: _CompressObjGetter) -> _Met

class _ZIP_64_TYPE(Method):
def _get(self, offset: int, default_get_compressobj: _CompressObjGetter) -> _MethodTuple:
return _ZIP_64, _NO_AUTO_UPGRADE_CENTRAL_DIRECTORY, default_get_compressobj, None, None
return _ZIP_64, _NO_AUTO_UPGRADE_CENTRAL_DIRECTORY, default_get_compressobj, 0, 0

class _ZIP_32_TYPE(Method):
def _get(self, offset: int, default_get_compressobj: _CompressObjGetter) -> _MethodTuple:
return _ZIP_32, _NO_AUTO_UPGRADE_CENTRAL_DIRECTORY, default_get_compressobj, None, None
return _ZIP_32, _NO_AUTO_UPGRADE_CENTRAL_DIRECTORY, default_get_compressobj, 0, 0

class _NO_COMPRESSION_32_TYPE(Method):
def _get(self, offset: int, default_get_compressobj: _CompressObjGetter) -> _MethodTuple:
return _NO_COMPRESSION_BUFFERED_32, _NO_AUTO_UPGRADE_CENTRAL_DIRECTORY, default_get_compressobj, None, None
return _NO_COMPRESSION_BUFFERED_32, _NO_AUTO_UPGRADE_CENTRAL_DIRECTORY, default_get_compressobj, 0, 0

def __call__(self, uncompressed_size: int, crc_32: int, *args: Any, **kwarg: Any) -> Method:
def __call__(self, uncompressed_size: int, crc_32: int) -> Method:
class _NO_COMPRESSION_32_TYPE_STREAMED_TYPE(Method):
def _get(self, offset: int, default_get_compressobj: _CompressObjGetter) -> _MethodTuple:
return _NO_COMPRESSION_STREAMED_32, _NO_AUTO_UPGRADE_CENTRAL_DIRECTORY, default_get_compressobj, uncompressed_size, crc_32
Expand All @@ -65,7 +65,7 @@ def _get(self, offset: int, default_get_compressobj: _CompressObjGetter) -> _Met

class _NO_COMPRESSION_64_TYPE(Method):
def _get(self, offset: int, default_get_compressobj: _CompressObjGetter) -> _MethodTuple:
return _NO_COMPRESSION_BUFFERED_64, _NO_AUTO_UPGRADE_CENTRAL_DIRECTORY, default_get_compressobj, None, None
return _NO_COMPRESSION_BUFFERED_64, _NO_AUTO_UPGRADE_CENTRAL_DIRECTORY, default_get_compressobj, 0, 0

def __call__(self, uncompressed_size: int, crc_32: int) -> Method:
class _NO_COMPRESSION_64_TYPE_STREAMED_TYPE(Method):
Expand All @@ -91,7 +91,7 @@ def __call__(self, uncompressed_size: int, level: int=9) -> Method:
class _ZIP_AUTO_TYPE_INNER(Method):
def _get(self, offset: int, default_get_compressobj: _CompressObjGetter) -> _MethodTuple:
method = _ZIP_64 if uncompressed_size > 4293656841 or offset > 0xffffffff else _ZIP_32
return (method, _AUTO_UPGRADE_CENTRAL_DIRECTORY, lambda: zlib.compressobj(level=level, memLevel=8, wbits=-zlib.MAX_WBITS), None, None)
return (method, _AUTO_UPGRADE_CENTRAL_DIRECTORY, lambda: zlib.compressobj(level=level, memLevel=8, wbits=-zlib.MAX_WBITS), 0, 0)

return _ZIP_AUTO_TYPE_INNER()

Expand Down Expand Up @@ -246,7 +246,12 @@ def _encrypt_aes(chunks: Generator[bytes, None, Any]) -> Generator[bytes, None,
return get_return_value()
return _encrypt_aes

def _zip_64_local_header_and_data(compression, aes_size_increase, aes_flags, name_encoded, mod_at_ms_dos, mod_at_unix_extra, aes_extra, external_attr, uncompressed_size, crc_32, crc_32_mask, _get_compress_obj, encryption_func, chunks) -> Generator[bytes, None, Any]:
def _zip_64_local_header_and_data(
compression: int, aes_size_increase: int, aes_flags: int, name_encoded: bytes, mod_at_ms_dos: bytes,
mod_at_unix_extra: bytes, aes_extra: bytes, external_attr: int, uncompressed_size: int, crc_32: int,
crc_32_mask: int, _get_compress_obj: _CompressObjGetter, encryption_func: Callable[[Generator[bytes, None, Any]], Generator[bytes, None, Any]],
chunks: Iterable[bytes],
) -> Generator[bytes, None, Tuple[bytes, bytes, bytes]]:
file_offset = offset

_raise_if_beyond(file_offset, maximum=0xffffffffffffffff, exception_class=OffsetOverflowError)
Expand Down Expand Up @@ -313,7 +318,12 @@ def _zip_64_local_header_and_data(compression, aes_size_increase, aes_flags, nam
0xffffffff, # Offset of local header - since zip64
), name_encoded, extra

def _zip_32_local_header_and_data(compression, aes_size_increase, aes_flags, name_encoded, mod_at_ms_dos, mod_at_unix_extra, aes_extra, external_attr, uncompressed_size, crc_32, crc_32_mask, _get_compress_obj, encryption_func, chunks) -> Generator[bytes, None, Any]:
def _zip_32_local_header_and_data(
compression: int, aes_size_increase: int, aes_flags: int, name_encoded: bytes, mod_at_ms_dos: bytes,
mod_at_unix_extra: bytes, aes_extra: bytes, external_attr: int, uncompressed_size: int, crc_32: int,
crc_32_mask: int, _get_compress_obj: _CompressObjGetter, encryption_func: Callable[[Generator[bytes, None, Any]], Generator[bytes, None, Any]],
chunks: Iterable[bytes],
) -> Generator[bytes, None, Tuple[bytes, bytes, bytes]]:
file_offset = offset

_raise_if_beyond(file_offset, maximum=0xffffffff, exception_class=OffsetOverflowError)
Expand Down Expand Up @@ -368,7 +378,8 @@ def _zip_32_local_header_and_data(compression, aes_size_increase, aes_flags, nam
file_offset,
), name_encoded, extra

def _zip_data(chunks, _get_compress_obj, max_uncompressed_size, max_compressed_size) -> Generator[bytes, None, Any]:
def _zip_data(chunks: Iterable[bytes], _get_compress_obj: _CompressObjGetter,
max_uncompressed_size: int, max_compressed_size: int) -> Generator[bytes, None, Tuple[int, int, int]]:
uncompressed_size = 0
compressed_size = 0
crc_32 = zlib.crc32(b'')
Expand All @@ -395,7 +406,12 @@ def _zip_data(chunks, _get_compress_obj, max_uncompressed_size, max_compressed_s

return uncompressed_size, compressed_size, crc_32

def _no_compression_64_local_header_and_data(compression, aes_size_increase, aes_flags, name_encoded, mod_at_ms_dos, mod_at_unix_extra, aes_extra, external_attr, uncompressed_size, crc_32, crc_32_mask, _get_compress_obj, encryption_func, chunks) -> Generator[bytes, None, Any]:
def _no_compression_64_local_header_and_data(
compression: int, aes_size_increase: int, aes_flags: int, name_encoded: bytes, mod_at_ms_dos: bytes,
mod_at_unix_extra: bytes, aes_extra: bytes, external_attr: int, uncompressed_size: int, crc_32: int,
crc_32_mask: int, _get_compress_obj: _CompressObjGetter, encryption_func: Callable[[Generator[bytes, None, Any]], Generator[bytes, None, Any]],
chunks: Iterable[bytes],
) -> Generator[bytes, None, Tuple[bytes, bytes, bytes]]:
file_offset = offset

_raise_if_beyond(file_offset, maximum=0xffffffffffffffff, exception_class=OffsetOverflowError)
Expand Down Expand Up @@ -427,7 +443,7 @@ def _no_compression_64_local_header_and_data(compression, aes_size_increase, aes
yield from _(name_encoded)
yield from _(extra)

yield from encryption_func(chunks)
yield from encryption_func((chunk for chunk in chunks))

extra = zip_64_central_directory_extra_struct.pack(
zip_64_extra_signature,
Expand Down Expand Up @@ -457,7 +473,12 @@ def _no_compression_64_local_header_and_data(compression, aes_size_increase, aes
), name_encoded, extra


def _no_compression_32_local_header_and_data(compression, aes_size_increase, aes_flags, name_encoded, mod_at_ms_dos, mod_at_unix_extra, aes_extra, external_attr, uncompressed_size, crc_32, crc_32_mask, _get_compress_obj, encryption_func, chunks) -> Generator[bytes, None, Any]:
def _no_compression_32_local_header_and_data(
compression: int, aes_size_increase: int, aes_flags: int, name_encoded: bytes, mod_at_ms_dos: bytes,
mod_at_unix_extra: bytes, aes_extra: bytes, external_attr: int, uncompressed_size: int, crc_32: int,
crc_32_mask: int, _get_compress_obj: _CompressObjGetter, encryption_func: Callable[[Generator[bytes, None, Any]], Generator[bytes, None, Any]],
chunks: Iterable[bytes],
) -> Generator[bytes, None, Tuple[bytes, bytes, bytes]]:
file_offset = offset

_raise_if_beyond(file_offset, maximum=0xffffffff, exception_class=OffsetOverflowError)
Expand All @@ -484,7 +505,7 @@ def _no_compression_32_local_header_and_data(compression, aes_size_increase, aes
yield from _(name_encoded)
yield from _(extra)

yield from encryption_func(chunks)
yield from encryption_func((chunk for chunk in chunks))

return central_directory_header_struct.pack(
20, # Version made by
Expand Down Expand Up @@ -513,19 +534,24 @@ def _no_compression_buffered_data_size_crc_32(chunks: Iterable[bytes], maximum_s
size = 0
crc_32 = zlib.crc32(b'')

def _chunks() -> Iterable[bytes]:
def _chunks() -> Generator[bytes, None, Any]:
nonlocal size, crc_32
for chunk in chunks:
size += len(chunk)
_raise_if_beyond(size, maximum=maximum_size, exception_class=UncompressedSizeOverflowError)
crc_32 = zlib.crc32(chunk, crc_32)
yield chunk

chunks = tuple(_chunks())
__chunks = tuple(_chunks())

return chunks, size, crc_32
return __chunks, size, crc_32

def _no_compression_streamed_64_local_header_and_data(compression, aes_size_increase, aes_flags, name_encoded, mod_at_ms_dos, mod_at_unix_extra, aes_extra, external_attr, uncompressed_size, crc_32, crc_32_mask, _get_compress_obj, encryption_func, chunks) -> Generator[bytes, None, Any]:
def _no_compression_streamed_64_local_header_and_data(
compression: int, aes_size_increase: int, aes_flags: int, name_encoded: bytes, mod_at_ms_dos: bytes,
mod_at_unix_extra: bytes, aes_extra: bytes, external_attr: int, uncompressed_size: int, crc_32: int,
crc_32_mask: int, _get_compress_obj: _CompressObjGetter, encryption_func: Callable[[Generator[bytes, None, Any]], Generator[bytes, None, Any]],
chunks: Iterable[bytes],
) -> Generator[bytes, None, Tuple[bytes, bytes, bytes]]:
file_offset = offset

_raise_if_beyond(file_offset, maximum=0xffffffffffffffff, exception_class=OffsetOverflowError)
Expand Down Expand Up @@ -585,7 +611,12 @@ def _no_compression_streamed_64_local_header_and_data(compression, aes_size_incr
), name_encoded, extra


def _no_compression_streamed_32_local_header_and_data(compression, aes_size_increase, aes_flags, name_encoded, mod_at_ms_dos, mod_at_unix_extra, aes_extra, external_attr, uncompressed_size, crc_32, crc_32_mask, _get_compress_obj, encryption_func, chunks) -> Generator[bytes, None, Any]:
def _no_compression_streamed_32_local_header_and_data(
compression: int, aes_size_increase: int, aes_flags: int, name_encoded: bytes, mod_at_ms_dos: bytes,
mod_at_unix_extra: bytes, aes_extra: bytes, external_attr: int, uncompressed_size: int, crc_32: int,
crc_32_mask: int, _get_compress_obj: _CompressObjGetter, encryption_func: Callable[[Generator[bytes, None, Any]], Generator[bytes, None, Any]],
chunks: Iterable[bytes],
) -> Generator[bytes, None, Any]:
file_offset = offset

_raise_if_beyond(file_offset, maximum=0xffffffff, exception_class=OffsetOverflowError)
Expand Down Expand Up @@ -632,7 +663,7 @@ def _no_compression_streamed_32_local_header_and_data(compression, aes_size_incr
file_offset,
), name_encoded, extra

def _no_compression_streamed_data(chunks, uncompressed_size, crc_32, maximum_size) -> Generator[bytes, None, Any]:
def _no_compression_streamed_data(chunks: Iterable[bytes], uncompressed_size: int, crc_32: int, maximum_size: int) -> Generator[bytes, None, Any]:
actual_crc_32 = zlib.crc32(b'')
size = 0
for chunk in chunks:
Expand Down

0 comments on commit 1d8a18a

Please sign in to comment.