Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type annotations #49

Draft
wants to merge 3 commits into
base: python3
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions lib/cfv/BitTorrent/bencode.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
# see LICENSE.txt for license information

from builtins import object
from typing import Sized


def decode_int(x, f):
def decode_int(x, f: int) -> tuple:
f += 1
newf = x.index(b'e', f)
try:
Expand All @@ -19,7 +20,7 @@ def decode_int(x, f):
return n, newf + 1


def decode_string(x, f):
def decode_string(x, f) -> tuple:
colon = x.index(b':', f)
try:
n = int(x[f:colon])
Expand All @@ -31,15 +32,15 @@ def decode_string(x, f):
return x[colon:colon + n], colon + n


def decode_list(x, f):
def decode_list(x, f) -> tuple:
r, f = [], f + 1
while x[f:f + 1] != b'e':
v, f = decode_func[x[f:f + 1]](x, f)
r.append(v)
return r, f + 1


def decode_dict(x, f):
def decode_dict(x, f) -> tuple:
r, f = {}, f + 1
lastkey = None
while x[f:f + 1] != b'e':
Expand Down Expand Up @@ -68,7 +69,7 @@ def decode_dict(x, f):
}


def bdecode(x):
def bdecode(x: Sized):
try:
r, pos = decode_func[x[0:1]](x, 0)
except (IndexError, KeyError):
Expand All @@ -78,7 +79,7 @@ def bdecode(x):
return r


def test_bdecode():
def test_bdecode() -> None:
try:
bdecode(b'0:0:')
assert 0
Expand Down Expand Up @@ -240,30 +241,30 @@ def test_bdecode():
class Bencached(object):
__slots__ = ['bencoded']

def __init__(self, s):
def __init__(self, s) -> None:
self.bencoded = s


def encode_bencached(x, r):
def encode_bencached(x, r) -> None:
r.append(x.bencoded)


def encode_int(x, r):
def encode_int(x, r) -> None:
r.extend((b'i', b'%d' % x, b'e'))


def encode_string(x, r):
def encode_string(x: Sized, r) -> None:
r.extend((b'%d' % len(x), b':', x))


def encode_list(x, r):
def encode_list(x, r) -> None:
r.append(b'l')
for i in x:
encode_func[type(i)](i, r)
r.append(b'e')


def encode_dict(x, r):
def encode_dict(x, r) -> None:
r.append(b'd')
ilist = list(x.items())
ilist.sort()
Expand All @@ -284,13 +285,13 @@ def encode_dict(x, r):
}


def bencode(x):
def bencode(x) -> bytes:
r = []
encode_func[type(x)](x, r)
return b''.join(r)


def test_bencode():
def test_bencode() -> None:
assert bencode(4) == b'i4e'
assert bencode(0) == b'i0e'
assert bencode(-10) == b'i-10e'
Expand Down
9 changes: 5 additions & 4 deletions lib/cfv/BitTorrent/btformats.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from re import compile

from builtins import range
from typing import Pattern


reg = compile(br'^[^/\\.~][^/\\]*$')
reg: Pattern[bytes] = compile(br'^[^/\\.~][^/\\]*$')


def check_info(info):
def check_info(info) -> None:
if type(info) != dict:
raise ValueError('bad metainfo - not a dictionary')
pieces = info.get(b'pieces')
Expand Down Expand Up @@ -53,7 +54,7 @@ def check_info(info):
raise ValueError('bad metainfo - duplicate path')


def check_message(message):
def check_message(message) -> None:
if type(message) != dict:
raise ValueError
check_info(message.get(b'info'))
Expand All @@ -62,7 +63,7 @@ def check_message(message):
raise ValueError('bad torrent file - announce is invalid')


def check_peers(message):
def check_peers(message) -> None:
if type(message) != dict:
raise ValueError
if b'failure reason' in message:
Expand Down
20 changes: 10 additions & 10 deletions lib/cfv/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# TODO: stop common.py mucking with _path_key_cache member. (Move chdir/cdup functions here, or even better don't chang directories at all.)
# TODO: make nocase_* functions not depend on current dir
class FileInfoCache(object):
def __init__(self):
def __init__(self) -> None:
# data is a mapping of <path key> -> <path cache>
# <path key> is either a (dev, inode) pair or a full pathname (from os.path.realpath)
# <path cache> is map from filename(without path, relative to path key) -> <finfo>
Expand All @@ -29,19 +29,19 @@ def __init__(self):
# this member is saved/cleared/restored by chdir and cdup functions in common.py
self._path_key_cache = {}

def set_verified(self, fn):
def set_verified(self, fn) -> None:
self.getfinfo(fn)['_verified'] = 1

def is_verified(self, fn):
return self.getfinfo(fn).get('_verified', 0)

def set_flag(self, fn, flag):
def set_flag(self, fn, flag) -> None:
self.getfinfo(fn)[flag] = 1

def has_flag(self, fn, flag):
def has_flag(self, fn, flag) -> bool:
return flag in self.getfinfo(fn)

def get_path_key(self, path):
def get_path_key(self, path) -> tuple:
dk = self._path_key_cache.get(path)
if dk is not None:
return dk
Expand All @@ -53,14 +53,14 @@ def get_path_key(self, path):
self._path_key_cache[path] = dk
return dk

def getpathcache(self, path):
def getpathcache(self, path) -> dict:
pathkey = self.get_path_key(path)
pathcache = self.data.get(pathkey)
if pathcache is None:
self.data[pathkey] = pathcache = {}
return pathcache

def getfinfo(self, fn):
def getfinfo(self, fn) -> dict:
if fn == '':
return self.stdin_finfo
else:
Expand All @@ -71,7 +71,7 @@ def getfinfo(self, fn):
pathdata[ftail] = finfo = {}
return finfo

def rename(self, oldfn, newfn):
def rename(self, oldfn, newfn) -> None:
ofinfo = self.getfinfo(oldfn)
nfinfo = self.getfinfo(newfn)
nfinfo.clear()
Expand All @@ -81,7 +81,7 @@ def rename(self, oldfn, newfn):
# nfinfo.update(ofinfo)
ofinfo.clear()

def nocase_dirfiles(self, dir, match):
def nocase_dirfiles(self, dir, match) -> list:
"""return list of filenames in dir whose lowercase value equals match
"""
dirkey = self.get_path_key(dir)
Expand All @@ -103,7 +103,7 @@ def nocase_dirfiles(self, dir, match):
_FINDFILE = 1
_FINDDIR = 0

def nocase_findfile(self, filename, find=_FINDFILE):
def nocase_findfile(self, filename, find: int=_FINDFILE):
cur = osutil.curdiru
parts = osutil.path_split(filename.lower())
# print 'nocase_findfile:',filename,parts,len(parts)
Expand Down
10 changes: 5 additions & 5 deletions lib/cfv/cftypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@
_cftypes_match_order = []


def add_user_cf_fn_regex(match, typename):
def add_user_cf_fn_regex(match, typename) -> None:
_user_cf_fn_regexs.append((re.compile(match, re.I).search, get_handler(typename)))


def auto_filename_match(*names):
def auto_filename_match(*names) -> None:
for searchfunc, cftype in _user_cf_fn_regexs + _cf_fn_matches + _cf_fn_exts + _cf_fn_searches:
for name in names:
if searchfunc(name):
return cftype
return None


def auto_chksumfile_match(file):
def auto_chksumfile_match(file) -> None:
for cftype in _cftypes_match_order:
if cftype.auto_chksumfile_match(file):
return cftype
return None


def register_cftype(cftype):
def register_cftype(cftype) -> None:
_cftypes[cftype.name] = cftype

if hasattr(cftype, 'auto_filename_match'):
Expand All @@ -49,5 +49,5 @@ def get_handler(name):
return _cftypes[name]


def has_handler(name):
def has_handler(name) -> bool:
return name in _cftypes
21 changes: 11 additions & 10 deletions lib/cfv/fileutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import codecs
import sys
from io import BytesIO, TextIOWrapper
from io import BufferedWriter, BytesIO, TextIOWrapper

from cfv import osutil
from typing import Union


_badbytesmarker = '\ufffe'
Expand All @@ -18,12 +19,12 @@ def _markbadbytes(exc):


class PeekFile(object):
def __init__(self, fileobj, filename=None, encoding='auto'):
def __init__(self, fileobj, filename=None, encoding: str='auto') -> None:
self.fileobj = fileobj
self._init_decodeobj(encoding)
self.name = filename or fileobj.name

def _init_decodeobj(self, encoding):
def _init_decodeobj(self, encoding) -> None:
self._encoding = None
self._decode_start = 0
self._decode_errs = 0
Expand All @@ -42,7 +43,7 @@ def _init_decodeobj(self, encoding):
self._encodeerrors = osutil.getencodeerrors(encoding, default='markbadbytes')
self._reset_decodeobj()

def _reset_decodeobj(self):
def _reset_decodeobj(self) -> None:
self.fileobj.seek(self._decode_start)
if self._encoding is not None:
self.decodeobj = codecs.getreader(self._encoding)(self.fileobj, errors=self._encodeerrors)
Expand Down Expand Up @@ -88,7 +89,7 @@ def peeknextline(self, *args):
self._decode_errs = 1
return ''

def _done_peeking(self, raw):
def _done_peeking(self, raw) -> None:
if raw:
fileobj = self.fileobj
fileobj.seek(0)
Expand Down Expand Up @@ -118,11 +119,11 @@ def read(self, *args):
return self.read(*args)


def PeekFileNonseekable(fileobj, filename, encoding):
def PeekFileNonseekable(fileobj, filename, encoding) -> PeekFile:
return PeekFile(BytesIO(fileobj.read()), filename, encoding)


def PeekFileGzip(filename, encoding):
def PeekFileGzip(filename, encoding) -> PeekFile:
import gzip
if filename == '-':
f = gzip.GzipFile(mode='rb', fileobj=sys.stdin.buffer)
Expand All @@ -132,10 +133,10 @@ def PeekFileGzip(filename, encoding):


class NoCloseFile(object):
def __init__(self, fileobj):
def __init__(self, fileobj) -> None:
self.fileobj = fileobj

def __getattr__(self, attr):
def __getattr__(self, attr: str):
if attr == 'close':
attr = 'flush'
return getattr(self.fileobj, attr)
Expand All @@ -155,7 +156,7 @@ def open_read(filename, config):
return PeekFile(open(filename, mode), filename, config.encoding)


def open_write(filename, config, force_raw=False):
def open_write(filename, config, force_raw: bool=False):
if config.gzip >= 2 or (config.gzip >= 0 and filename[-3:].lower() == '.gz'):
import gzip
kwargs = {
Expand Down
15 changes: 8 additions & 7 deletions lib/cfv/hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
import struct
import sys
from zlib import crc32
from typing import Union


try:
if os.environ.get('CFV_NOMMAP'):
raise ImportError
import mmap

def dommap(fileno, len): # generic mmap. ACCESS_* args work on both nix and win.
def dommap(fileno, len) -> Union[bytes, mmap.mmap]: # generic mmap. ACCESS_* args work on both nix and win.
if len == 0:
return b'' # mmap doesn't like length=0
return mmap.mmap(fileno, len, access=mmap.ACCESS_READ)
Expand All @@ -29,7 +30,7 @@ def dommap(fileno, len): # generic mmap. ACCESS_* args work on both nix and wi
sha1 = hashlib.sha1


def _getfilechecksum(filename, hasher, callback):
def _getfilechecksum(filename, hasher, callback) -> tuple:
if filename == '':
f = sys.stdin.buffer
else:
Expand Down Expand Up @@ -68,7 +69,7 @@ def finish(m, s):
return m.digest(), s


def getfilechecksumgeneric(algo):
def getfilechecksumgeneric(algo: str) -> tuple:
if hasattr(hashlib, algo):
hasher = getattr(hashlib, algo)
else:
Expand All @@ -80,15 +81,15 @@ def hasher():
class CRC32(object):
digest_size = 4

def __init__(self, s=b''):
def __init__(self, s=b'') -> None:
self.value = crc32(s)

def update(self, s):
def update(self, s) -> None:
self.value = crc32(s, self.value)

def digest(self):
def digest(self) -> bytes:
return struct.pack('>I', self.value & 0xFFFFFFFF)


def getfilecrc(filename, callback):
def getfilecrc(filename, callback) -> tuple:
return _getfilechecksum(filename, CRC32, callback)
Loading