From 797a6261f1945fc23150a4504f9a0cc8b7617e4f Mon Sep 17 00:00:00 2001 From: tzing Date: Fri, 6 Dec 2019 21:39:28 +0800 Subject: [PATCH 01/10] add `passwd` in ZipFS & raise error for write --- fs/errors.py | 8 ++++++++ fs/zipfs.py | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/fs/errors.py b/fs/errors.py index b70b62e3..6379bce2 100644 --- a/fs/errors.py +++ b/fs/errors.py @@ -41,6 +41,7 @@ "NoURL", "OperationFailed", "OperationTimeout", + "PasswordUnsupported", "PathError", "PermissionDenied", "RemoteConnectionError", @@ -255,6 +256,13 @@ class RemoveRootError(OperationFailed): default_message = "root directory may not be removed" +class PasswordUnsupported(Unsupported): + """Attempt to create a password protected zip file. + """ + + default_message = "can not create password protected zip" + + class ResourceError(FSError): """Base exception class for error associated with a specific resource. """ diff --git a/fs/zipfs.py b/fs/zipfs.py index 8feb9e56..422e0733 100644 --- a/fs/zipfs.py +++ b/fs/zipfs.py @@ -160,6 +160,8 @@ class ZipFS(WrapFS): defined in the `zipfile` module in the stdlib). temp_fs (str): An FS URL for the temporary filesystem used to store data prior to zipping. + passwd (str or bytes): Password for extracting file from zip file. Only + used for read mode. """ @@ -171,11 +173,14 @@ def __new__( # type: ignore compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Text + passwd=None, # type: Optional[AnyStr] ): # type: (...) -> FS # This magic returns a different class instance based on the # value of the ``write`` parameter. if write: + if passwd is not None: + raise errors.PasswordUnsupported() return WriteZipFS( file, compression=compression, encoding=encoding, temp_fs=temp_fs ) @@ -191,6 +196,7 @@ def __init__( compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Text + passwd=None, # type: Optional[AnyStr] ): # type: (...) -> None pass From 8d3aeb411a0b424cb7359da1851b1ba5ce20776d Mon Sep 17 00:00:00 2001 From: tzing Date: Fri, 6 Dec 2019 21:50:14 +0800 Subject: [PATCH 02/10] implement set password for ReadZipFS --- fs/zipfs.py | 48 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/fs/zipfs.py b/fs/zipfs.py index 422e0733..07fc8b3a 100644 --- a/fs/zipfs.py +++ b/fs/zipfs.py @@ -43,13 +43,27 @@ R = typing.TypeVar("R", bound="ReadZipFS") +def _bytes(s): + # type: (AnyStr) -> bytes + if s is None: + return None + elif isinstance(s, six.binary_type): + return s + elif isinstance(s, six.string_types): + return s.encode() + else: + raise TypeError("expected string type or byte type, not " + type(s).__name__) + + class _ZipExtFile(RawWrapper): - def __init__(self, fs, name): - # type: (ReadZipFS, Text) -> None + def __init__(self, fs, name, passwd=None): + # type: (ReadZipFS, Text, Optional[AnyStr]) -> None self._zip = _zip = fs._zip self._end = _zip.getinfo(name).file_size self._pos = 0 - super(_ZipExtFile, self).__init__(_zip.open(name), "r", name) + super(_ZipExtFile, self).__init__( + _zip.open(name, pwd=_bytes(passwd)), "r", name + ) def read(self, size=-1): # type: (int) -> bytes @@ -185,7 +199,7 @@ def __new__( # type: ignore file, compression=compression, encoding=encoding, temp_fs=temp_fs ) else: - return ReadZipFS(file, encoding=encoding) + return ReadZipFS(file, encoding=encoding, passwd=passwd) if typing.TYPE_CHECKING: @@ -296,13 +310,15 @@ class ReadZipFS(FS): } @errors.CreateFailed.catch_all - def __init__(self, file, encoding="utf-8"): - # type: (Union[BinaryIO, Text], Text) -> None + def __init__(self, file, encoding="utf-8", passwd=None): + # type: (Union[BinaryIO, Text], Text, Optional[AnyStr]) -> None super(ReadZipFS, self).__init__() self._file = file self.encoding = encoding self._zip = zipfile.ZipFile(file, "r") self._directory_fs = None # type: Optional[MemoryFS] + if passwd is not None: + self.setpassword(_bytes(passwd)) def __repr__(self): # type: () -> Text @@ -415,8 +431,8 @@ def makedir( self.check() raise errors.ResourceReadOnly(path) - def openbin(self, path, mode="r", buffering=-1, **kwargs): - # type: (Text, Text, int, **Any) -> BinaryIO + def openbin(self, path, mode="r", buffering=-1, passwd=None, **kwargs): + # type: (Text, Text, int, Optional[AnyStr], **Any) -> BinaryIO self.check() if "w" in mode or "+" in mode or "a" in mode: raise errors.ResourceReadOnly(path) @@ -427,7 +443,7 @@ def openbin(self, path, mode="r", buffering=-1, **kwargs): raise errors.FileExpected(path) zip_name = self._path_to_zip_name(path) - return _ZipExtFile(self, zip_name) # type: ignore + return _ZipExtFile(self, zip_name, passwd) # type: ignore def remove(self, path): # type: (Text) -> None @@ -445,13 +461,13 @@ def close(self): if hasattr(self, "_zip"): self._zip.close() - def readbytes(self, path): - # type: (Text) -> bytes + def readbytes(self, path, pwd=None): + # type: (Text, Optional[AnyStr]) -> bytes self.check() if not self._directory.isfile(path): raise errors.ResourceNotFound(path) zip_name = self._path_to_zip_name(path) - zip_bytes = self._zip.read(zip_name) + zip_bytes = self._zip.read(zip_name, pwd=_bytes(pwd)) return zip_bytes def geturl(self, path, purpose="download"): @@ -462,3 +478,11 @@ def geturl(self, path, purpose="download"): return "zip://{}!/{}".format(quoted_file, quoted_path) else: raise errors.NoURL(path, purpose) + + def setpassword(self, passwd): + # type: (AnyStr) -> None + """Set *passwd* as default password to extract encrypted files. + """ + if passwd is None: + raise ValueError('passwd') + self._zip.setpassword(_bytes(passwd)) From 47cca196de4423d71a6193b3db4b9215408652e2 Mon Sep 17 00:00:00 2001 From: tzing Date: Fri, 6 Dec 2019 22:24:06 +0800 Subject: [PATCH 03/10] add tests --- fs/zipfs.py | 4 ++-- tests/test_zipfs.py | 55 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/fs/zipfs.py b/fs/zipfs.py index 07fc8b3a..89dc6c46 100644 --- a/fs/zipfs.py +++ b/fs/zipfs.py @@ -461,13 +461,13 @@ def close(self): if hasattr(self, "_zip"): self._zip.close() - def readbytes(self, path, pwd=None): + def readbytes(self, path, passwd=None): # type: (Text, Optional[AnyStr]) -> bytes self.check() if not self._directory.isfile(path): raise errors.ResourceNotFound(path) zip_name = self._path_to_zip_name(path) - zip_bytes = self._zip.read(zip_name, pwd=_bytes(pwd)) + zip_bytes = self._zip.read(zip_name, pwd=_bytes(passwd)) return zip_bytes def geturl(self, path, purpose="download"): diff --git a/tests/test_zipfs.py b/tests/test_zipfs.py index 9b2e82ea..00974f8c 100644 --- a/tests/test_zipfs.py +++ b/tests/test_zipfs.py @@ -1,6 +1,7 @@ # -*- encoding: UTF-8 from __future__ import unicode_literals +import codecs import os import sys import tempfile @@ -13,13 +14,22 @@ from fs.compress import write_zip from fs.opener import open_fs from fs.opener.errors import NotWriteable -from fs.errors import NoURL +from fs.errors import NoURL, PasswordUnsupported from fs.test import FSTestCases from fs.enums import Seek from .test_archives import ArchiveTestCases +class TestBytes(unittest.TestCase): + def test_conversion(self): + self.assertIsNone(zipfs._bytes(None)) + self.assertEqual(zipfs._bytes("passwd"), b"passwd") + self.assertEqual(zipfs._bytes(b"passwd"), b"passwd") + with self.assertRaises(TypeError): + zipfs._bytes(1234) + + class TestWriteReadZipFS(unittest.TestCase): def setUp(self): fh, self._temp_path = tempfile.mkstemp() @@ -40,6 +50,10 @@ def test_unicode_paths(self): with zip_fs.openbin(path) as f: f.read() + def test_create_password(self): + with self.assertRaises(PasswordUnsupported): + zipfs.ZipFS(self._temp_path, write=True, passwd="hello") + class TestWriteZipFS(FSTestCases, unittest.TestCase): """ @@ -220,6 +234,45 @@ def test_implied(self): os.remove(path) +class TestPasswordReadZipFS(unittest.TestCase): + + ZIP_BIN = ( + b"UEsDBAoACQAAAH2whk8tOwivGAAAAAwAAAADABwAZm9vVVQJAAPNX+pdzl/qXXV4CwABBPUBAAAE" + b"FAAAAJ6pj1kohibjIq4YqnEKUZ8SCJMeUkl9oVBLBwgtOwivGAAAAAwAAABQSwECHgMKAAkAAAB9" + b"sIZPLTsIrxgAAAAMAAAAAwAYAAAAAAABAAAApIEAAAAAZm9vVVQFAAPNX+pddXgLAAEE9QEAAAQU" + b"AAAAUEsFBgAAAAABAAEASQAAAGUAAAAAAA==" + ) + + PASSWD = "P@ssw0rd" + + def setUp(self): + fh, path = tempfile.mkstemp("testzip.zip") + os.write(fh, codecs.decode(self.ZIP_BIN, "base64")) + os.close(fh) + self.path = path + + def tearDown(self): + os.remove(self.path) + + def test_openbin(self): + with zipfs.ReadZipFS(self.path, passwd=self.PASSWD) as zip_fs: + with zip_fs.openbin("foo") as fp: + self.assertEqual(fp.read(), b"hello world\n") + + with zipfs.ReadZipFS(self.path) as zip_fs: + with zip_fs.openbin("foo", passwd=self.PASSWD) as fp: + self.assertEqual(fp.read(), b"hello world\n") + + def test_readbytes(self): + with zipfs.ReadZipFS(self.path, passwd=self.PASSWD) as zip_fs: + self.assertEqual(zip_fs.readbytes("foo"), b"hello world\n") + + with zipfs.ReadZipFS(self.path) as zip_fs: + self.assertEqual( + zip_fs.readbytes("foo", passwd=self.PASSWD), b"hello world\n" + ) + + class TestOpener(unittest.TestCase): def test_not_writeable(self): with self.assertRaises(NotWriteable): From 787af8bcd673db26f6bd103fd087c96ce0267231 Mon Sep 17 00:00:00 2001 From: tzing Date: Fri, 6 Dec 2019 22:35:58 +0800 Subject: [PATCH 04/10] missing tests --- fs/zipfs.py | 2 -- tests/test_zipfs.py | 8 ++++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/fs/zipfs.py b/fs/zipfs.py index 89dc6c46..059d923f 100644 --- a/fs/zipfs.py +++ b/fs/zipfs.py @@ -483,6 +483,4 @@ def setpassword(self, passwd): # type: (AnyStr) -> None """Set *passwd* as default password to extract encrypted files. """ - if passwd is None: - raise ValueError('passwd') self._zip.setpassword(_bytes(passwd)) diff --git a/tests/test_zipfs.py b/tests/test_zipfs.py index 00974f8c..fb580cd8 100644 --- a/tests/test_zipfs.py +++ b/tests/test_zipfs.py @@ -272,6 +272,14 @@ def test_readbytes(self): zip_fs.readbytes("foo", passwd=self.PASSWD), b"hello world\n" ) + def test_setpassword(self): + with zipfs.ReadZipFS(self.path) as zip_fs: + with self.assertRaises(RuntimeError): + zip_fs._zip.read("foo") + + zip_fs.setpassword(self.PASSWD) + self.assertEqual(zip_fs._zip.read("foo"), b"hello world\n") + class TestOpener(unittest.TestCase): def test_not_writeable(self): From 052e94b60d16eaafbe25382999b85b3d81c91878 Mon Sep 17 00:00:00 2001 From: tzing Date: Fri, 6 Dec 2019 22:38:56 +0800 Subject: [PATCH 05/10] update changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7126b0a9..d1b17a44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [2.4.12] - (Unreleased) +### Added + +- Added `passwd` argument and `setpassword` for ReadZipFS to extract password + protected date from zip file. [#360](https://github.com/PyFilesystem/pyfilesystem2/issues/360) + ### Changed - Start testing on PyPy. Due to [#342](https://github.com/PyFilesystem/pyfilesystem2/issues/342) From b9eb55a0c52092d003c51864e331c7d9af30f113 Mon Sep 17 00:00:00 2001 From: tzing Date: Fri, 6 Dec 2019 23:11:29 +0800 Subject: [PATCH 06/10] fix typing --- fs/zipfs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fs/zipfs.py b/fs/zipfs.py index 059d923f..b46933ce 100644 --- a/fs/zipfs.py +++ b/fs/zipfs.py @@ -27,6 +27,7 @@ if typing.TYPE_CHECKING: from typing import ( Any, + AnyStr, BinaryIO, Collection, Dict, @@ -44,7 +45,7 @@ def _bytes(s): - # type: (AnyStr) -> bytes + # type: (AnyStr) -> Optional[bytes] if s is None: return None elif isinstance(s, six.binary_type): From 83e55575539978183d51b5cca4c7b7ef074c6ed9 Mon Sep 17 00:00:00 2001 From: tzing Date: Fri, 6 Dec 2019 23:12:27 +0800 Subject: [PATCH 07/10] missing optional tag --- fs/zipfs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/zipfs.py b/fs/zipfs.py index b46933ce..b3b0a68a 100644 --- a/fs/zipfs.py +++ b/fs/zipfs.py @@ -45,7 +45,7 @@ def _bytes(s): - # type: (AnyStr) -> Optional[bytes] + # type: (Optional[AnyStr]) -> Optional[bytes] if s is None: return None elif isinstance(s, six.binary_type): From 70d482ea0c12afdd0b800bad012e81845d8ff0a7 Mon Sep 17 00:00:00 2001 From: tzing Date: Fri, 6 Dec 2019 23:47:16 +0800 Subject: [PATCH 08/10] rename `passwd` to `password` --- fs/zipfs.py | 34 +++++++++++++++++----------------- tests/test_zipfs.py | 10 +++++----- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/fs/zipfs.py b/fs/zipfs.py index b3b0a68a..db3f6712 100644 --- a/fs/zipfs.py +++ b/fs/zipfs.py @@ -57,13 +57,13 @@ def _bytes(s): class _ZipExtFile(RawWrapper): - def __init__(self, fs, name, passwd=None): + def __init__(self, fs, name, password=None): # type: (ReadZipFS, Text, Optional[AnyStr]) -> None self._zip = _zip = fs._zip self._end = _zip.getinfo(name).file_size self._pos = 0 super(_ZipExtFile, self).__init__( - _zip.open(name, pwd=_bytes(passwd)), "r", name + _zip.open(name, pwd=_bytes(password)), "r", name ) def read(self, size=-1): @@ -175,7 +175,7 @@ class ZipFS(WrapFS): defined in the `zipfile` module in the stdlib). temp_fs (str): An FS URL for the temporary filesystem used to store data prior to zipping. - passwd (str or bytes): Password for extracting file from zip file. Only + password (str or bytes): Password for extracting file from zip file. Only used for read mode. """ @@ -188,19 +188,19 @@ def __new__( # type: ignore compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Text - passwd=None, # type: Optional[AnyStr] + password=None, # type: Optional[AnyStr] ): # type: (...) -> FS # This magic returns a different class instance based on the # value of the ``write`` parameter. if write: - if passwd is not None: + if password is not None: raise errors.PasswordUnsupported() return WriteZipFS( file, compression=compression, encoding=encoding, temp_fs=temp_fs ) else: - return ReadZipFS(file, encoding=encoding, passwd=passwd) + return ReadZipFS(file, encoding=encoding, password=password) if typing.TYPE_CHECKING: @@ -211,7 +211,7 @@ def __init__( compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Text - passwd=None, # type: Optional[AnyStr] + password=None, # type: Optional[AnyStr] ): # type: (...) -> None pass @@ -311,15 +311,15 @@ class ReadZipFS(FS): } @errors.CreateFailed.catch_all - def __init__(self, file, encoding="utf-8", passwd=None): + def __init__(self, file, encoding="utf-8", password=None): # type: (Union[BinaryIO, Text], Text, Optional[AnyStr]) -> None super(ReadZipFS, self).__init__() self._file = file self.encoding = encoding self._zip = zipfile.ZipFile(file, "r") self._directory_fs = None # type: Optional[MemoryFS] - if passwd is not None: - self.setpassword(_bytes(passwd)) + if password is not None: + self.setpassword(_bytes(password)) def __repr__(self): # type: () -> Text @@ -432,7 +432,7 @@ def makedir( self.check() raise errors.ResourceReadOnly(path) - def openbin(self, path, mode="r", buffering=-1, passwd=None, **kwargs): + def openbin(self, path, mode="r", buffering=-1, password=None, **kwargs): # type: (Text, Text, int, Optional[AnyStr], **Any) -> BinaryIO self.check() if "w" in mode or "+" in mode or "a" in mode: @@ -444,7 +444,7 @@ def openbin(self, path, mode="r", buffering=-1, passwd=None, **kwargs): raise errors.FileExpected(path) zip_name = self._path_to_zip_name(path) - return _ZipExtFile(self, zip_name, passwd) # type: ignore + return _ZipExtFile(self, zip_name, password) # type: ignore def remove(self, path): # type: (Text) -> None @@ -462,13 +462,13 @@ def close(self): if hasattr(self, "_zip"): self._zip.close() - def readbytes(self, path, passwd=None): + def readbytes(self, path, password=None): # type: (Text, Optional[AnyStr]) -> bytes self.check() if not self._directory.isfile(path): raise errors.ResourceNotFound(path) zip_name = self._path_to_zip_name(path) - zip_bytes = self._zip.read(zip_name, pwd=_bytes(passwd)) + zip_bytes = self._zip.read(zip_name, pwd=_bytes(password)) return zip_bytes def geturl(self, path, purpose="download"): @@ -480,8 +480,8 @@ def geturl(self, path, purpose="download"): else: raise errors.NoURL(path, purpose) - def setpassword(self, passwd): + def setpassword(self, password): # type: (AnyStr) -> None - """Set *passwd* as default password to extract encrypted files. + """Set *password* as default password to extract encrypted files. """ - self._zip.setpassword(_bytes(passwd)) + self._zip.setpassword(_bytes(password)) diff --git a/tests/test_zipfs.py b/tests/test_zipfs.py index fb580cd8..b0329a39 100644 --- a/tests/test_zipfs.py +++ b/tests/test_zipfs.py @@ -52,7 +52,7 @@ def test_unicode_paths(self): def test_create_password(self): with self.assertRaises(PasswordUnsupported): - zipfs.ZipFS(self._temp_path, write=True, passwd="hello") + zipfs.ZipFS(self._temp_path, write=True, password="hello") class TestWriteZipFS(FSTestCases, unittest.TestCase): @@ -255,21 +255,21 @@ def tearDown(self): os.remove(self.path) def test_openbin(self): - with zipfs.ReadZipFS(self.path, passwd=self.PASSWD) as zip_fs: + with zipfs.ReadZipFS(self.path, password=self.PASSWD) as zip_fs: with zip_fs.openbin("foo") as fp: self.assertEqual(fp.read(), b"hello world\n") with zipfs.ReadZipFS(self.path) as zip_fs: - with zip_fs.openbin("foo", passwd=self.PASSWD) as fp: + with zip_fs.openbin("foo", password=self.PASSWD) as fp: self.assertEqual(fp.read(), b"hello world\n") def test_readbytes(self): - with zipfs.ReadZipFS(self.path, passwd=self.PASSWD) as zip_fs: + with zipfs.ReadZipFS(self.path, password=self.PASSWD) as zip_fs: self.assertEqual(zip_fs.readbytes("foo"), b"hello world\n") with zipfs.ReadZipFS(self.path) as zip_fs: self.assertEqual( - zip_fs.readbytes("foo", passwd=self.PASSWD), b"hello world\n" + zip_fs.readbytes("foo", password=self.PASSWD), b"hello world\n" ) def test_setpassword(self): From b03ab32619e41aad9e3aaa2cd268cc24fa699cea Mon Sep 17 00:00:00 2001 From: tzing Date: Sat, 7 Dec 2019 00:28:59 +0800 Subject: [PATCH 09/10] Only allow bytes for password --- fs/zipfs.py | 52 +++++++++++++++++++++------------------------ tests/test_zipfs.py | 19 ++++++++--------- 2 files changed, 33 insertions(+), 38 deletions(-) diff --git a/fs/zipfs.py b/fs/zipfs.py index db3f6712..4e23a629 100644 --- a/fs/zipfs.py +++ b/fs/zipfs.py @@ -27,7 +27,6 @@ if typing.TYPE_CHECKING: from typing import ( Any, - AnyStr, BinaryIO, Collection, Dict, @@ -44,27 +43,15 @@ R = typing.TypeVar("R", bound="ReadZipFS") -def _bytes(s): - # type: (Optional[AnyStr]) -> Optional[bytes] - if s is None: - return None - elif isinstance(s, six.binary_type): - return s - elif isinstance(s, six.string_types): - return s.encode() - else: - raise TypeError("expected string type or byte type, not " + type(s).__name__) - - class _ZipExtFile(RawWrapper): def __init__(self, fs, name, password=None): - # type: (ReadZipFS, Text, Optional[AnyStr]) -> None + # type: (ReadZipFS, Text, Optional[bytes]) -> None + if password is not None: + _password_type_check(password) self._zip = _zip = fs._zip self._end = _zip.getinfo(name).file_size self._pos = 0 - super(_ZipExtFile, self).__init__( - _zip.open(name, pwd=_bytes(password)), "r", name - ) + super(_ZipExtFile, self).__init__(_zip.open(name, pwd=password), "r", name) def read(self, size=-1): # type: (int) -> bytes @@ -175,8 +162,8 @@ class ZipFS(WrapFS): defined in the `zipfile` module in the stdlib). temp_fs (str): An FS URL for the temporary filesystem used to store data prior to zipping. - password (str or bytes): Password for extracting file from zip file. Only - used for read mode. + password (bytes): Password for extracting file from zip file. Only used + for read mode. """ @@ -188,7 +175,7 @@ def __new__( # type: ignore compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Text - password=None, # type: Optional[AnyStr] + password=None, # type: Optional[bytes] ): # type: (...) -> FS # This magic returns a different class instance based on the @@ -211,7 +198,7 @@ def __init__( compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Text - password=None, # type: Optional[AnyStr] + password=None, # type: Optional[bytes] ): # type: (...) -> None pass @@ -312,14 +299,14 @@ class ReadZipFS(FS): @errors.CreateFailed.catch_all def __init__(self, file, encoding="utf-8", password=None): - # type: (Union[BinaryIO, Text], Text, Optional[AnyStr]) -> None + # type: (Union[BinaryIO, Text], Text, Optional[bytes]) -> None super(ReadZipFS, self).__init__() self._file = file self.encoding = encoding self._zip = zipfile.ZipFile(file, "r") self._directory_fs = None # type: Optional[MemoryFS] if password is not None: - self.setpassword(_bytes(password)) + self.setpassword(password) def __repr__(self): # type: () -> Text @@ -433,7 +420,7 @@ def makedir( raise errors.ResourceReadOnly(path) def openbin(self, path, mode="r", buffering=-1, password=None, **kwargs): - # type: (Text, Text, int, Optional[AnyStr], **Any) -> BinaryIO + # type: (Text, Text, int, Optional[bytes], **Any) -> BinaryIO self.check() if "w" in mode or "+" in mode or "a" in mode: raise errors.ResourceReadOnly(path) @@ -463,12 +450,14 @@ def close(self): self._zip.close() def readbytes(self, path, password=None): - # type: (Text, Optional[AnyStr]) -> bytes + # type: (Text, Optional[bytes]) -> bytes self.check() if not self._directory.isfile(path): raise errors.ResourceNotFound(path) + if password is not None: + _password_type_check(password) zip_name = self._path_to_zip_name(path) - zip_bytes = self._zip.read(zip_name, pwd=_bytes(password)) + zip_bytes = self._zip.read(zip_name, pwd=password) return zip_bytes def geturl(self, path, purpose="download"): @@ -481,7 +470,14 @@ def geturl(self, path, purpose="download"): raise errors.NoURL(path, purpose) def setpassword(self, password): - # type: (AnyStr) -> None + # type: (bytes) -> None """Set *password* as default password to extract encrypted files. """ - self._zip.setpassword(_bytes(password)) + _password_type_check(password) + self._zip.setpassword(password) + + +def _password_type_check(password): + if isinstance(password, six.binary_type): + return + raise TypeError("except bytes for password, not " + type(password).__name__) diff --git a/tests/test_zipfs.py b/tests/test_zipfs.py index b0329a39..1c82cc13 100644 --- a/tests/test_zipfs.py +++ b/tests/test_zipfs.py @@ -21,15 +21,6 @@ from .test_archives import ArchiveTestCases -class TestBytes(unittest.TestCase): - def test_conversion(self): - self.assertIsNone(zipfs._bytes(None)) - self.assertEqual(zipfs._bytes("passwd"), b"passwd") - self.assertEqual(zipfs._bytes(b"passwd"), b"passwd") - with self.assertRaises(TypeError): - zipfs._bytes(1234) - - class TestWriteReadZipFS(unittest.TestCase): def setUp(self): fh, self._temp_path = tempfile.mkstemp() @@ -243,7 +234,7 @@ class TestPasswordReadZipFS(unittest.TestCase): b"AAAAUEsFBgAAAAABAAEASQAAAGUAAAAAAA==" ) - PASSWD = "P@ssw0rd" + PASSWD = b"P@ssw0rd" def setUp(self): fh, path = tempfile.mkstemp("testzip.zip") @@ -281,6 +272,14 @@ def test_setpassword(self): self.assertEqual(zip_fs._zip.read("foo"), b"hello world\n") +class TestPasswordTypeCheck(unittest.TestCase): + def test_raise(self): + with self.assertRaises(TypeError): + zipfs._password_type_check("string") + + zipfs._password_type_check(b"bytes") + + class TestOpener(unittest.TestCase): def test_not_writeable(self): with self.assertRaises(NotWriteable): From a4da02683730ff0797f10ccaba3a978c6802bc35 Mon Sep 17 00:00:00 2001 From: tzing Date: Sat, 7 Dec 2019 00:35:24 +0800 Subject: [PATCH 10/10] add url parameter support for zipfs --- fs/opener/zipfs.py | 7 ++++++- tests/test_zipfs.py | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/fs/opener/zipfs.py b/fs/opener/zipfs.py index 81e48455..64dd7b7e 100644 --- a/fs/opener/zipfs.py +++ b/fs/opener/zipfs.py @@ -38,5 +38,10 @@ def open_fs( if not create and writeable: raise NotWriteable("Unable to open existing ZIP file for writing") - zip_fs = ZipFS(parse_result.resource, write=create) + + password = parse_result.params.get("password") + if password is not None: + password = password.encode() + + zip_fs = ZipFS(parse_result.resource, write=create, password=password) return zip_fs diff --git a/tests/test_zipfs.py b/tests/test_zipfs.py index 1c82cc13..0f438aec 100644 --- a/tests/test_zipfs.py +++ b/tests/test_zipfs.py @@ -284,3 +284,5 @@ class TestOpener(unittest.TestCase): def test_not_writeable(self): with self.assertRaises(NotWriteable): open_fs("zip://foo.zip", writeable=True) + + open_fs("zip://foo.zip?password=1234")