Skip to content

Commit

Permalink
feat: AES-2 encryption
Browse files Browse the repository at this point in the history
  • Loading branch information
michalc committed Jan 3, 2024
1 parent ec8a17b commit c348adf
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 2 deletions.
21 changes: 19 additions & 2 deletions stream_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def method_compressobj(offset, default_get_compressobj):
return method_compressobj


def stream_zip(files, chunk_size=65536, get_compressobj=lambda: zlib.compressobj(wbits=-zlib.MAX_WBITS, level=9), extended_timestamps=True):
def stream_zip(files, chunk_size=65536, get_compressobj=lambda: zlib.compressobj(wbits=-zlib.MAX_WBITS, level=9), extended_timestamps=True, password=None):

def evenly_sized(chunks):
chunk = b''
Expand Down Expand Up @@ -139,6 +139,21 @@ def _raise_if_beyond(offset, maximum, exception_class):
def _no_encryption(chunks):
return (yield from chunks)

def _aes_encrypted(chunks):
# We leverage the not-often used "return value" of generators. Here, we want to iterate
# over chunks to encrypt them, but still return the same "return value". So we use a
# bit of a trick to extract the return value but still have access to the chunks as
# we iterate over them
return_value = None
def with_return_value():
nonlocal return_value
return_value = yield from chunks

for chunk in with_returned():
yield chunk

return return_value

def _zip_64_local_header_and_data(name_encoded, mod_at_ms_dos, mod_at_unix_extra, aes_extra, external_attr, uncompressed_size, crc_32, _get_compress_obj, encryption_func, chunks):
file_offset = offset

Expand Down Expand Up @@ -577,7 +592,9 @@ def _no_compression_streamed_data(chunks, uncompressed_size, crc_32, maximum_siz
b'\x01', # Only modification time (as opposed to also other times)
int(modified_at.timestamp()),
) if extended_timestamps else b''
aes_extra, encryption_func = (b'', _no_encryption)
aes_extra, encryption_func = \
(b'', _aes_encrypted) if password is not None else \
(b'', _no_encryption)
external_attr = \
(mode << 16) | \
(0x10 if name_encoded[-1:] == b'/' else 0x0) # MS-DOS directory
Expand Down
50 changes: 50 additions & 0 deletions test_stream_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,3 +1082,53 @@ def test_unzip_modification_time_extended_timestamps_disabled(method, timezone,
subprocess.run(['unzip', f'{d}/test.zip', '-d', d], env={'TZ': timezone})

assert os.path.getmtime('my_file') == expected_modified_at.timestamp()


@pytest.mark.parametrize(
"method",
[
ZIP_32,
ZIP_64,
NO_COMPRESSION_64,
NO_COMPRESSION_32,
],
)
def test_password_unzips_with_stream_unzip(method):
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600
password = 'my-pass'

files = (
('file-1', now, mode, ZIP_64, (b'a' * 10000, b'b' * 10000)),
('file-2', now, mode, ZIP_32, (b'c', b'd')),
)

assert [
(b'file-1', None, b'a' * 10000 + b'b' * 10000),
(b'file-2', None, b'cd'),
] == [
(name, size, b''.join(chunks))
for name, size, chunks in stream_unzip(stream_zip(files, password=password), password=password)
]


@pytest.mark.parametrize(
"method",
[
ZIP_32,
ZIP_64,
NO_COMPRESSION_64,
NO_COMPRESSION_32,
],
)
def test_password_bytes_not_deterministic(method):
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600
password = 'my-pass'

files = (
('file-1', now, mode, ZIP_64, (b'a' * 10000, b'b' * 10000)),
('file-2', now, mode, ZIP_32, (b'c', b'd')),
)

assert b''.join(stream_zip(files, password=password)) != b''.join(stream_zip(files, password=password))

0 comments on commit c348adf

Please sign in to comment.