From 4bba7829876e98b23aa5561031312851955571f1 Mon Sep 17 00:00:00 2001 From: mkoppmann Date: Fri, 16 Dec 2022 15:41:55 +0100 Subject: [PATCH 1/3] Add type annotations generated by infer-types See: https://github.com/orsinium-labs/infer-types --- lib/cfv/BitTorrent/bencode.py | 26 ++++++------ lib/cfv/BitTorrent/btformats.py | 6 +-- lib/cfv/caching.py | 18 ++++----- lib/cfv/cftypes.py | 10 ++--- lib/cfv/hash.py | 10 ++--- lib/cfv/progress.py | 20 +++++----- lib/cfv/strutil.py | 12 +++--- lib/cfv/term.py | 2 +- lib/cfv/ui.py | 70 ++++++++++++++++----------------- test/benchmark.py | 20 +++++----- test/cfvtest.py | 28 +++++++------ test/test_bencode.py | 4 +- test/test_caching.py | 16 ++++---- test/test_strutil.py | 22 +++++------ 14 files changed, 133 insertions(+), 131 deletions(-) diff --git a/lib/cfv/BitTorrent/bencode.py b/lib/cfv/BitTorrent/bencode.py index c247763..3d55f40 100644 --- a/lib/cfv/BitTorrent/bencode.py +++ b/lib/cfv/BitTorrent/bencode.py @@ -4,7 +4,7 @@ from builtins import object -def decode_int(x, f): +def decode_int(x, f) -> tuple: f += 1 newf = x.index(b'e', f) try: @@ -19,7 +19,7 @@ def decode_int(x, f): return n, newf + 1 -def decode_string(x, f): +def decode_string(x, f) -> tuple: colon = x.index(b':', f) try: n = int(x[f:colon]) @@ -31,7 +31,7 @@ def decode_string(x, f): return x[colon:colon + n], colon + n -def decode_list(x, f): +def decode_list(x, f) -> tuple: r, f = [], f + 1 while x[f:f + 1] != b'e': v, f = decode_func[x[f:f + 1]](x, f) @@ -39,7 +39,7 @@ def decode_list(x, f): return r, f + 1 -def decode_dict(x, f): +def decode_dict(x, f) -> tuple: r, f = {}, f + 1 lastkey = None while x[f:f + 1] != b'e': @@ -78,7 +78,7 @@ def bdecode(x): return r -def test_bdecode(): +def test_bdecode() -> None: try: bdecode(b'0:0:') assert 0 @@ -240,30 +240,30 @@ def test_bdecode(): class Bencached(object): __slots__ = ['bencoded'] - def __init__(self, s): + def __init__(self, s) -> None: self.bencoded = s -def encode_bencached(x, r): +def encode_bencached(x, r) -> None: r.append(x.bencoded) -def encode_int(x, r): +def encode_int(x, r) -> None: r.extend((b'i', b'%d' % x, b'e')) -def encode_string(x, r): +def encode_string(x, r) -> None: r.extend((b'%d' % len(x), b':', x)) -def encode_list(x, r): +def encode_list(x, r) -> None: r.append(b'l') for i in x: encode_func[type(i)](i, r) r.append(b'e') -def encode_dict(x, r): +def encode_dict(x, r) -> None: r.append(b'd') ilist = list(x.items()) ilist.sort() @@ -284,13 +284,13 @@ def encode_dict(x, r): } -def bencode(x): +def bencode(x) -> bytes: r = [] encode_func[type(x)](x, r) return b''.join(r) -def test_bencode(): +def test_bencode() -> None: assert bencode(4) == b'i4e' assert bencode(0) == b'i0e' assert bencode(-10) == b'i-10e' diff --git a/lib/cfv/BitTorrent/btformats.py b/lib/cfv/BitTorrent/btformats.py index 3d6c12c..77e7627 100644 --- a/lib/cfv/BitTorrent/btformats.py +++ b/lib/cfv/BitTorrent/btformats.py @@ -9,7 +9,7 @@ reg = compile(br'^[^/\\.~][^/\\]*$') -def check_info(info): +def check_info(info) -> None: if type(info) != dict: raise ValueError('bad metainfo - not a dictionary') pieces = info.get(b'pieces') @@ -53,7 +53,7 @@ def check_info(info): raise ValueError('bad metainfo - duplicate path') -def check_message(message): +def check_message(message) -> None: if type(message) != dict: raise ValueError check_info(message.get(b'info')) @@ -62,7 +62,7 @@ def check_message(message): raise ValueError('bad torrent file - announce is invalid') -def check_peers(message): +def check_peers(message) -> None: if type(message) != dict: raise ValueError if b'failure reason' in message: diff --git a/lib/cfv/caching.py b/lib/cfv/caching.py index 44c04e9..0eba2ae 100644 --- a/lib/cfv/caching.py +++ b/lib/cfv/caching.py @@ -13,7 +13,7 @@ # TODO: stop common.py mucking with _path_key_cache member. (Move chdir/cdup functions here, or even better don't chang directories at all.) # TODO: make nocase_* functions not depend on current dir class FileInfoCache(object): - def __init__(self): + def __init__(self) -> None: # data is a mapping of -> # is either a (dev, inode) pair or a full pathname (from os.path.realpath) # is map from filename(without path, relative to path key) -> @@ -29,19 +29,19 @@ def __init__(self): # this member is saved/cleared/restored by chdir and cdup functions in common.py self._path_key_cache = {} - def set_verified(self, fn): + def set_verified(self, fn) -> None: self.getfinfo(fn)['_verified'] = 1 def is_verified(self, fn): return self.getfinfo(fn).get('_verified', 0) - def set_flag(self, fn, flag): + def set_flag(self, fn, flag) -> None: self.getfinfo(fn)[flag] = 1 - def has_flag(self, fn, flag): + def has_flag(self, fn, flag) -> bool: return flag in self.getfinfo(fn) - def get_path_key(self, path): + def get_path_key(self, path) -> tuple: dk = self._path_key_cache.get(path) if dk is not None: return dk @@ -53,14 +53,14 @@ def get_path_key(self, path): self._path_key_cache[path] = dk return dk - def getpathcache(self, path): + def getpathcache(self, path) -> dict: pathkey = self.get_path_key(path) pathcache = self.data.get(pathkey) if pathcache is None: self.data[pathkey] = pathcache = {} return pathcache - def getfinfo(self, fn): + def getfinfo(self, fn) -> dict: if fn == '': return self.stdin_finfo else: @@ -71,7 +71,7 @@ def getfinfo(self, fn): pathdata[ftail] = finfo = {} return finfo - def rename(self, oldfn, newfn): + def rename(self, oldfn, newfn) -> None: ofinfo = self.getfinfo(oldfn) nfinfo = self.getfinfo(newfn) nfinfo.clear() @@ -81,7 +81,7 @@ def rename(self, oldfn, newfn): # nfinfo.update(ofinfo) ofinfo.clear() - def nocase_dirfiles(self, dir, match): + def nocase_dirfiles(self, dir, match) -> list: """return list of filenames in dir whose lowercase value equals match """ dirkey = self.get_path_key(dir) diff --git a/lib/cfv/cftypes.py b/lib/cfv/cftypes.py index b45283b..7c13c9b 100644 --- a/lib/cfv/cftypes.py +++ b/lib/cfv/cftypes.py @@ -7,11 +7,11 @@ _cftypes_match_order = [] -def add_user_cf_fn_regex(match, typename): +def add_user_cf_fn_regex(match, typename) -> None: _user_cf_fn_regexs.append((re.compile(match, re.I).search, get_handler(typename))) -def auto_filename_match(*names): +def auto_filename_match(*names) -> None: for searchfunc, cftype in _user_cf_fn_regexs + _cf_fn_matches + _cf_fn_exts + _cf_fn_searches: for name in names: if searchfunc(name): @@ -19,14 +19,14 @@ def auto_filename_match(*names): return None -def auto_chksumfile_match(file): +def auto_chksumfile_match(file) -> None: for cftype in _cftypes_match_order: if cftype.auto_chksumfile_match(file): return cftype return None -def register_cftype(cftype): +def register_cftype(cftype) -> None: _cftypes[cftype.name] = cftype if hasattr(cftype, 'auto_filename_match'): @@ -49,5 +49,5 @@ def get_handler(name): return _cftypes[name] -def has_handler(name): +def has_handler(name) -> bool: return name in _cftypes diff --git a/lib/cfv/hash.py b/lib/cfv/hash.py index cf111d2..83cc175 100644 --- a/lib/cfv/hash.py +++ b/lib/cfv/hash.py @@ -29,7 +29,7 @@ def dommap(fileno, len): # generic mmap. ACCESS_* args work on both nix and wi sha1 = hashlib.sha1 -def _getfilechecksum(filename, hasher, callback): +def _getfilechecksum(filename, hasher, callback) -> tuple: if filename == '': f = sys.stdin.buffer else: @@ -68,7 +68,7 @@ def finish(m, s): return m.digest(), s -def getfilechecksumgeneric(algo): +def getfilechecksumgeneric(algo) -> tuple: if hasattr(hashlib, algo): hasher = getattr(hashlib, algo) else: @@ -80,15 +80,15 @@ def hasher(): class CRC32(object): digest_size = 4 - def __init__(self, s=b''): + def __init__(self, s=b'') -> None: self.value = crc32(s) - def update(self, s): + def update(self, s) -> None: self.value = crc32(s, self.value) def digest(self): return struct.pack('>I', self.value & 0xFFFFFFFF) -def getfilecrc(filename, callback): +def getfilecrc(filename, callback) -> tuple: return _getfilechecksum(filename, CRC32, callback) diff --git a/lib/cfv/progress.py b/lib/cfv/progress.py index a3665dc..dc4f449 100644 --- a/lib/cfv/progress.py +++ b/lib/cfv/progress.py @@ -12,10 +12,10 @@ class INF(object): """object that is always larger than what it is compared to """ - def __cmp__(self, other): + def __cmp__(self, other) -> int: return 1 - def __lt__(self, other): + def __lt__(self, other) -> bool: return False def __mul__(self, other): @@ -27,10 +27,10 @@ def __div__(self, other): def __floordiv__(self, other): return self - def __rdiv__(self, other): + def __rdiv__(self, other) -> int: return 0 - def __rfloordiv__(self, other): + def __rfloordiv__(self, other) -> int: return 0 @@ -51,7 +51,7 @@ def __rfloordiv__(self, other): class ProgressMeter(object): spinnerchars = r'\|/-' - def __init__(self, fd, steps=20, scrwidth=80, frobfn=lambda x: x): + def __init__(self, fd, steps=20, scrwidth=80, frobfn=lambda x: x) -> None: self.wantsteps = steps self.needrefresh = 1 self.filename = None @@ -59,7 +59,7 @@ def __init__(self, fd, steps=20, scrwidth=80, frobfn=lambda x: x): self.frobfn = frobfn self.scrwidth = scrwidth - def init(self, name, size=None, cursize=0): + def init(self, name, size=None, cursize=0) -> None: self.steps = self.wantsteps self.filename = name if size is None: @@ -78,7 +78,7 @@ def init(self, name, size=None, cursize=0): self.needrefresh = 1 self.update(cursize) - def update(self, cursize): + def update(self, cursize) -> None: if self.needrefresh: donesteps = cursize // self.stepsize stepsleft = self.steps - donesteps @@ -96,18 +96,18 @@ def update(self, cursize): self.fd.flush() self.spinneridx = (self.spinneridx + 1) % len(self.spinnerchars) - def cleanup(self): + def cleanup(self) -> None: if not self.needrefresh: self.fd.write('\r' + _CLR_TO_EOL) self.needrefresh = 1 class TimedProgressMeter(ProgressMeter): - def __init__(self, *args, **kw): + def __init__(self, *args, **kw) -> None: ProgressMeter.__init__(self, *args, **kw) self.nexttime = 0 - def update(self, cursize): + def update(self, cursize) -> None: curtime = time.time() if curtime > self.nexttime: self.nexttime = curtime + 0.06 diff --git a/lib/cfv/strutil.py b/lib/cfv/strutil.py index 8eebaa3..1bfeedf 100644 --- a/lib/cfv/strutil.py +++ b/lib/cfv/strutil.py @@ -6,7 +6,7 @@ from cfv import osutil -def safesort(seq): +def safesort(seq) -> None: sl = [] ul = [] for s in seq: @@ -39,7 +39,7 @@ def chompnulls(line): return line[:p] -def uwidth(u): +def uwidth(u) -> int: # TODO: should it return -1 or something for control chars, like wcswidth? # see http://www.cl.cam.ac.uk/~mgk25/ucs/wcwidth.c for a sample implementation # if isinstance(u, str): @@ -60,7 +60,7 @@ def uwidth(u): return w -def lchoplen(line, max): +def lchoplen(line, max) -> str: """Return line cut on left so it takes at most max character cells when printed. >>> lchoplen('hello world',6) @@ -89,7 +89,7 @@ def lchoplen(line, max): return ''.join(chars) -def rchoplen(line, max): +def rchoplen(line, max) -> str: """Return line cut on right so it takes at most max character cells when printed. >>> rchoplen('hello world',6) @@ -119,9 +119,9 @@ def rchoplen(line, max): return ''.join(chars) -def hexlify(data): +def hexlify(data) -> str: return binascii.hexlify(data).decode('ascii') -def unhexlify(data): +def unhexlify(data) -> bytes: return binascii.unhexlify(data) diff --git a/lib/cfv/term.py b/lib/cfv/term.py index f447b33..617982d 100644 --- a/lib/cfv/term.py +++ b/lib/cfv/term.py @@ -3,7 +3,7 @@ import sys -def getscrwidth(): +def getscrwidth() -> int: w = -1 try: from fcntl import ioctl diff --git a/lib/cfv/ui.py b/lib/cfv/ui.py index ec81a62..6ce8377 100644 --- a/lib/cfv/ui.py +++ b/lib/cfv/ui.py @@ -21,7 +21,7 @@ class View(object): - def __init__(self, config): + def __init__(self, config) -> None: self.stdout = sys.stdout self.stderr = sys.stderr self._stdout_special = 0 @@ -30,13 +30,13 @@ def __init__(self, config): self.config = config self.perhaps_showpath = config.perhaps_showpath - def set_stdout_special(self): + def set_stdout_special(self) -> None: """If stdout is being used for special purposes, redirect informational messages to stderr. """ self._stdout_special = 1 self.stdinfo = self.stderr - def setup_output(self): + def setup_output(self) -> None: self.stdinfo = self._stdout_special and self.stderr or self.stdout # if one of stdinfo (usually stdout) or stderr is a tty, use it. Otherwise use stdinfo. progressfd = self.stdinfo.isatty() and self.stdinfo or self.stderr.isatty() and self.stderr or self.stdinfo @@ -50,24 +50,24 @@ def setup_output(self): else: self.progress = None - def pverbose(self, s, nl='\n'): + def pverbose(self, s, nl='\n') -> None: if self.config.verbose > 0: self.stdinfo.write(s + nl) - def pinfo(self, s, nl='\n'): + def pinfo(self, s, nl='\n') -> None: if self.config.verbose >= 0 or self.config.verbose == -3: self.stdinfo.write(s + nl) - def perror(self, s, nl='\n'): + def perror(self, s, nl='\n') -> None: # import traceback;traceback.print_stack()#### if self.config.verbose >= -1: self.stdout.flush() # avoid inconsistent screen state if stdout has unflushed data self.stderr.write(s + nl) - def plistf(self, filename): + def plistf(self, filename) -> None: self.stdout.write(self.perhaps_showpath(filename) + self.config.listsep) - def ev_test_cf_begin(self, cftypename, filename, comment): + def ev_test_cf_begin(self, cftypename, filename, comment) -> None: if comment: comment = ', ' + comment comment = strutil.rchoplen(comment, 102) # limit the length in case its a really long one. @@ -75,79 +75,79 @@ def ev_test_cf_begin(self, cftypename, filename, comment): comment = '' self.pverbose('testing from %s (%s%s)' % (strutil.showfn(filename), cftypename.lower(), comment)) - def ev_test_cf_done(self, filename, cf_stats): + def ev_test_cf_done(self, filename, cf_stats) -> None: self.pinfo('%s: %s' % (self.perhaps_showpath(filename), cf_stats)) ev_make_cf_done = ev_test_cf_done - def ev_test_cf_unrecognized_line(self, filename, lineno): + def ev_test_cf_unrecognized_line(self, filename, lineno) -> None: self.perror('%s : unrecognized line %i (CF)' % (self.perhaps_showpath(filename), lineno)) - def ev_test_cf_lineencodingerror(self, filename, lineno, ex): + def ev_test_cf_lineencodingerror(self, filename, lineno, ex) -> None: self.perror('%s : line %i: %s (CF)' % (self.perhaps_showpath(filename), lineno, ex)) - def ev_test_cf_filenameencodingerror(self, filename, fileid, ex): + def ev_test_cf_filenameencodingerror(self, filename, fileid, ex) -> None: self.perror('%s : file %s: %s (CF)' % (self.perhaps_showpath(filename), fileid, ex)) - def ev_test_cf_invaliddata(self, filename, e): + def ev_test_cf_invaliddata(self, filename, e) -> None: self.perror('%s : %s (CF)' % (self.perhaps_showpath(filename), e)) - def ev_test_cf_unrecognized(self, filename, decode_errors): + def ev_test_cf_unrecognized(self, filename, decode_errors) -> None: if decode_errors: self.perror("I don't recognize the type or encoding of %s" % strutil.showfn(filename)) else: self.perror("I don't recognize the type of %s" % strutil.showfn(filename)) - def ev_cf_enverror(self, filename, e): + def ev_cf_enverror(self, filename, e) -> None: self.perror('%s : %s (CF)' % (self.perhaps_showpath(filename), enverrstr(e))) - def ev_make_filenameencodingerror(self, filename, e): + def ev_make_filenameencodingerror(self, filename, e) -> None: self.perror('%s : unencodable filename: %s' % (self.perhaps_showpath(filename), e)) - def ev_make_filenamedecodingerror(self, filename, e): + def ev_make_filenamedecodingerror(self, filename, e) -> None: self.perror('%s : undecodable filename: %s' % (self.perhaps_showpath(filename), e)) - def ev_make_filenameinvalid(self, filename): + def ev_make_filenameinvalid(self, filename) -> None: self.perror('%s : filename invalid for this cftype' % (self.perhaps_showpath(filename))) - def ev_make_cf_typenotsupported(self, filename, cftype): + def ev_make_cf_typenotsupported(self, filename, cftype) -> None: self.perror('%s : %s not supported in create mode' % (strutil.showfn(filename), cftype.__name__.lower())) - def ev_make_cf_alreadyexists(self, filename): + def ev_make_cf_alreadyexists(self, filename) -> None: self.perror('%s already exists' % self.perhaps_showpath(filename)) - def ev_d_enverror(self, path, ex): + def ev_d_enverror(self, path, ex) -> None: self.perror('%s%s : %s' % (strutil.showfn(path), os.sep, enverrstr(ex))) - def ev_f_enverror(self, l_filename, ex): + def ev_f_enverror(self, l_filename, ex) -> None: if isinstance(ex, EnvironmentError) and ex.errno == errno.ENOENT: if self.config.list & LISTNOTFOUND: self.plistf(l_filename) self.perror('%s : %s' % (self.perhaps_showpath(l_filename), enverrstr(ex))) - def ev_f_verifyerror(self, l_filename, msg, foundok): + def ev_f_verifyerror(self, l_filename, msg, foundok) -> None: if not foundok: if self.config.list & LISTBAD: self.plistf(l_filename) self.perror('%s : %s' % (self.perhaps_showpath(l_filename), msg)) - def ev_f_verifyerror_dupe(self, filename, msg, dupefilename, foundok): + def ev_f_verifyerror_dupe(self, filename, msg, dupefilename, foundok) -> None: self.ev_f_verifyerror(filename, msg + ' (dupe of %s removed)' % strutil.showfn(dupefilename), foundok) - def ev_f_verifyerror_renamed(self, filename, msg, newfilename, foundok): + def ev_f_verifyerror_renamed(self, filename, msg, newfilename, foundok) -> None: self.ev_f_verifyerror(filename, msg + ' (renamed to %s)' % strutil.showfn(newfilename), foundok) - def ev_f_found_renameetcerror(self, filename, filesize, filecrc, found_fn, action, e): + def ev_f_found_renameetcerror(self, filename, filesize, filecrc, found_fn, action, e) -> None: eaction = 'but error %r occured %s' % (enverrstr(e), action) self.ev_f_found(filename, filesize, filecrc, found_fn, eaction) - def ev_f_found_renameetc(self, filename, filesize, filecrc, found_fn, action): + def ev_f_found_renameetc(self, filename, filesize, filecrc, found_fn, action) -> None: self.ev_f_found(filename, filesize, filecrc, found_fn, action) - def ev_f_found(self, filename, filesize, filecrc, found_fn, action='found'): + def ev_f_found(self, filename, filesize, filecrc, found_fn, action='found') -> None: self.ev_f_ok(filename, filesize, filecrc, 'OK(%s %s)' % (action, strutil.showfn(found_fn))) - def ev_f_ok(self, filename, filesize, filecrc, msg): + def ev_f_ok(self, filename, filesize, filecrc, msg) -> None: if self.config.list & LISTOK: self.plistf(filename) if filesize >= 0: @@ -155,22 +155,22 @@ def ev_f_ok(self, filename, filesize, filecrc, msg): else: self.pverbose('%s : %s (%s)' % (self.perhaps_showpath(filename), msg, filecrc)) - def ev_generic_warning(self, msg): + def ev_generic_warning(self, msg) -> None: self.perror('warning: %s' % msg) - def ev_unverified_file(self, filename): + def ev_unverified_file(self, filename) -> None: self.perror('%s : not verified' % self.perhaps_showpath(filename)) - def ev_unverified_dir(self, path): + def ev_unverified_dir(self, path) -> None: self.ev_unverified_file(osutil.path_join(path, '*')) - def ev_unverified_dirrecursive(self, path): + def ev_unverified_dirrecursive(self, path) -> None: self.ev_unverified_file(osutil.path_join(path, '**')) - def ev_unverified_file_plistf(self, filename): + def ev_unverified_file_plistf(self, filename) -> None: if self.config.list & LISTUNVERIFIED: self.plistf(filename) -def enverrstr(e): +def enverrstr(e) -> bool: return getattr(e, 'strerror', None) or str(e) diff --git a/test/benchmark.py b/test/benchmark.py index 256308d..bac2648 100755 --- a/test/benchmark.py +++ b/test/benchmark.py @@ -32,7 +32,7 @@ import cfvtest -def human_int(value): +def human_int(value) -> int: """Convert values with size suffix to integers. >>> human_int('10') 10 @@ -62,7 +62,7 @@ def human_int(value): return int(value) * multiplier -def create_test_file(path, max_size, verbose=False): +def create_test_file(path, max_size, verbose=False) -> None: size = random.randint(1, max_size) if verbose: print('creating', path, 'size', size) @@ -73,7 +73,7 @@ def create_test_file(path, max_size, verbose=False): size -= 1 -def create_test_dir(root, num_files, branch_factor, max_size, verbose=False): +def create_test_dir(root, num_files, branch_factor, max_size, verbose=False) -> None: levels = int(math.ceil(math.log(num_files, branch_factor))) formatlen = int(math.ceil(math.log(branch_factor, 16))) path_counter = [0] * levels @@ -102,19 +102,19 @@ def create_test_dir(root, num_files, branch_factor, max_size, verbose=False): print(remaining, path_counter) -def create(args): +def create(args) -> None: start_path = os.getcwd() create_test_dir(start_path, args.files, args.branch_factor, args.max_size, verbose=args.verbose) -def print_times(name, results, iterations, verbose=False): +def print_times(name, results, iterations, verbose=False) -> None: best = min(results) print('%s: best=%.4g msec' % (name, best * 1000 / iterations)) if verbose: print(' raw results:', results) -def run_cfv(args, verbose): +def run_cfv(args, verbose) -> None: if verbose >= 2: print('running cfv', args) s, o = cfvtest.runcfv(args) @@ -125,17 +125,17 @@ def run_cfv(args, verbose): raise RuntimeError('cfv returned %s' % s) -def run_create_test(cftype, output_root, input_root, verbose): +def run_create_test(cftype, output_root, input_root, verbose) -> None: output_fn = os.path.join(output_root, 'create_test.%s.%s' % (random.randint(0, sys.maxsize), cftype)) run_cfv('-C -rr -t %s -p %s -f %s' % (cftype, input_root, output_fn), verbose) -def run_test_test(cftype, cfname, input_root, f_repeat, verbose): +def run_test_test(cftype, cfname, input_root, f_repeat, verbose) -> None: f_arg = ' -f ' + cfname run_cfv('-T -t %s -p %s %s' % (cftype, input_root, f_arg * f_repeat), verbose) -def run(args): +def run(args) -> None: cfvtest.setcfv(args.cfv, not args.run_external) output_root = tempfile.mkdtemp() input_root = os.getcwd() @@ -156,7 +156,7 @@ def run(args): print_times('multitest', times, args.iterations, args.verbose) -def main(): +def main() -> None: parser = argparse.ArgumentParser(description='Create test data and run cfv benchmarks.') parser.add_argument('-v', '--verbose', action='count') diff --git a/test/cfvtest.py b/test/cfvtest.py index e8917ac..9215e24 100644 --- a/test/cfvtest.py +++ b/test/cfvtest.py @@ -36,6 +36,8 @@ from unittest import main +from typing import Iterator +from unittest.suite import TestSuite cfvenv = '' cfvfn = None @@ -46,26 +48,26 @@ class NullFile(object): - def isatty(self): + def isatty(self) -> int: return 0 - def write(self, s): + def write(self, s) -> None: pass - def writelines(self, lines): + def writelines(self, lines) -> None: pass - def flush(self): + def flush(self) -> None: pass - def close(self): + def close(self) -> None: pass nullfile = NullFile() -def expand_cmdline(cmd): +def expand_cmdline(cmd) -> list: argv = [] for arg in shlex.split(cmd): if '*' in arg or '?' in arg or '[' in arg: @@ -75,7 +77,7 @@ def expand_cmdline(cmd): return argv -def runcfv_exe(cmd, stdin=None, stdout=None, stderr=None, need_reload=0): +def runcfv_exe(cmd, stdin=None, stdout=None, stderr=None, need_reload=0) -> tuple: import subprocess def open_output(fn): @@ -111,7 +113,7 @@ def open_output(fn): # TODO: make the runcfv_* functions (optionally?) take args as a list instead of a string -def runcfv_py(cmd, stdin=None, stdout=None, stderr=None, need_reload=0): +def runcfv_py(cmd, stdin=None, stdout=None, stderr=None, need_reload=0) -> tuple: from io import BytesIO, TextIOWrapper obuf = BytesIO() obuftext = TextIOWrapper(obuf) @@ -175,7 +177,7 @@ def open_output(file): return s, o -def get_version_flags(): +def get_version_flags() -> None: global ver_cfv, ver_mmap s, o = runcfv("--version", need_reload=1) if o.find('cfv ') >= 0: @@ -185,7 +187,7 @@ def get_version_flags(): ver_mmap = o.find('mmap') >= 0 -def setcfv(fn=None, internal=None): +def setcfv(fn=None, internal=None) -> None: global cfvfn, cfv_compiled, runcfv if internal is not None: @@ -207,7 +209,7 @@ def setcfv(fn=None, internal=None): get_version_flags() -def setenv(k, v): +def setenv(k, v) -> None: global cfvenv cfvenv = "%s=%s %s" % (k, v, cfvenv) os.environ[k] = v @@ -222,7 +224,7 @@ def my_import(name): return mod -def rfind(root, match): +def rfind(root, match) -> Iterator: root = os.path.join(root, '') for path, dirs, files in os.walk(root): subpath = path.replace(root, '', 1) @@ -231,7 +233,7 @@ def rfind(root, match): yield os.path.join(subpath, file) -def all_unittests_suite(): +def all_unittests_suite() -> TestSuite: modules_to_test = [os.path.splitext(f)[0].replace(os.sep, '.') for f in rfind(testpath, 'test_*.py')] assert modules_to_test alltests = unittest.TestSuite() diff --git a/test/test_bencode.py b/test/test_bencode.py index 43ca9f9..74b5a89 100644 --- a/test/test_bencode.py +++ b/test/test_bencode.py @@ -3,8 +3,8 @@ class BencodeTestCase(TestCase): - def test_bencode(self): + def test_bencode(self) -> None: bencode.test_bencode() - def test_bdecode(self): + def test_bdecode(self) -> None: bencode.test_bdecode() diff --git a/test/test_caching.py b/test/test_caching.py index 7db575d..bc9692d 100644 --- a/test/test_caching.py +++ b/test/test_caching.py @@ -27,10 +27,10 @@ class AbsTestCase(TestCase): - def setUp(self): + def setUp(self) -> None: self.tempdir = tempfile.mkdtemp() - def tearDown(self): + def tearDown(self) -> None: shutil.rmtree(self.tempdir) def mkpath(self, name): @@ -48,12 +48,12 @@ def mkfile(self, name, contents): class RelTestCase(TestCase): - def setUp(self): + def setUp(self) -> None: self.olddir = os.getcwd() self.tempdir = tempfile.mkdtemp() os.chdir(self.tempdir) - def tearDown(self): + def tearDown(self) -> None: os.chdir(self.olddir) shutil.rmtree(self.tempdir) @@ -70,7 +70,7 @@ def mkfile(self, name, contents): class AbsPathKeyTest(AbsTestCase): - def test_get_path_key(self): + def test_get_path_key(self) -> None: cache = FileInfoCache() with self.assertRaises(OSError): @@ -95,7 +95,7 @@ def test_get_path_key(self): else: self.assertNotEqual(keya, keyc) - def test_rename(self): + def test_rename(self) -> None: cache = FileInfoCache() a = self.mkfile('a', 'a') b = self.mkfile('b', 'b') @@ -130,7 +130,7 @@ def test_rename(self): class RelPathKeyTest(RelTestCase): - def test_nocase_findfile(self): + def test_nocase_findfile(self) -> None: cache = FileInfoCache() a1 = self.mkfile('aAaA/AaA1', '1') self.mkfile('aAaA/Aaa2', '2') @@ -149,7 +149,7 @@ def test_nocase_findfile(self): cache.nocase_findfile(self.mkpath('aaAA/aaa2')) self.assertEqual(errno.EEXIST, cm.exception.errno) - def test_nocase_findfile_parent(self): + def test_nocase_findfile_parent(self) -> None: cache = FileInfoCache() self.mkfile('aaaA/aaA1', '1') self.mkfile('aAaA/aaa2', '2') diff --git a/test/test_strutil.py b/test/test_strutil.py index f7a56f9..1785a7d 100755 --- a/test/test_strutil.py +++ b/test/test_strutil.py @@ -23,42 +23,42 @@ class uwidthTestCase(TestCase): - def test_simple(self): + def test_simple(self) -> None: self.assertEqual(uwidth('hello world'), 11) - def test_nonspacing(self): + def test_nonspacing(self) -> None: self.assertEqual(uwidth('\0'), 0) self.assertEqual(uwidth('\u200b'), 0) self.assertEqual(uwidth('\u3099'), 0) self.assertEqual(uwidth('\u0327'), 0) - def test_wide(self): + def test_wide(self) -> None: self.assertEqual(uwidth('\u3053\u3093\u306b\u3061\u306f'), 10) self.assertEqual(uwidth('\u304c\u304e\u3050\u3052\u3054'), 10) self.assertEqual(uwidth('\u304b\u3099\u304d\u3099\u304f\u3099\u3051\u3099\u3053\u3099'), 10) - def test_halfwidth(self): + def test_halfwidth(self) -> None: self.assertEqual(uwidth('\uFF79'), 1) self.assertEqual(uwidth('\uFF73\uFF79\uFF79\uFF79'), 4) - def test_compose_noprecombined(self): + def test_compose_noprecombined(self) -> None: self.assertEqual(uwidth('\u01b5\u0327\u0308\u01b6\u0327\u0308\u01b7\u0327\u0308\u01b8\u0327\u0308\u01b9\u0327\u0308\u01ba\u0327\u0308'), 6) class chopTestCase(TestCase): - def test_lchoplen_simple(self): + def test_lchoplen_simple(self) -> None: self.assertEqual(lchoplen('hello world', 12), 'hello world') self.assertEqual(lchoplen('hello world', 11), 'hello world') self.assertEqual(lchoplen('hello world', 10), '...o world') self.assertEqual(lchoplen('hello world', 3), '...') - def test_rchoplen_simple(self): + def test_rchoplen_simple(self) -> None: self.assertEqual(rchoplen('hello world', 12), 'hello world') self.assertEqual(rchoplen('hello world', 11), 'hello world') self.assertEqual(rchoplen('hello world', 10), 'hello w...') self.assertEqual(rchoplen('hello world', 3), '...') - def test_lchoplen_wide(self): + def test_lchoplen_wide(self) -> None: self.assertEqual(lchoplen('\u3053\u3093\u306b\u3061\u306f', 11), '\u3053\u3093\u306b\u3061\u306f') self.assertEqual(lchoplen('\u3053\u3093\u306b\u3061\u306f', 10), '\u3053\u3093\u306b\u3061\u306f') self.assertEqual(lchoplen('\u3053\u3093\u306b\u3061\u306f', 9), '...\u306b\u3061\u306f') @@ -66,7 +66,7 @@ def test_lchoplen_wide(self): self.assertEqual(lchoplen('\u3053\u3093\u306b\u3061\u306f', 4), '...') self.assertEqual(lchoplen('\u3053\u3093\u306b\u3061\u306f', 3), '...') - def test_rchoplen_wide(self): + def test_rchoplen_wide(self) -> None: self.assertEqual(rchoplen('\u3053\u3093\u306b\u3061\u306f', 11), '\u3053\u3093\u306b\u3061\u306f') self.assertEqual(rchoplen('\u3053\u3093\u306b\u3061\u306f', 10), '\u3053\u3093\u306b\u3061\u306f') self.assertEqual(rchoplen('\u3053\u3093\u306b\u3061\u306f', 9), '\u3053\u3093\u306b...') @@ -74,7 +74,7 @@ def test_rchoplen_wide(self): self.assertEqual(rchoplen('\u3053\u3093\u306b\u3061\u306f', 4), '...') self.assertEqual(rchoplen('\u3053\u3093\u306b\u3061\u306f', 3), '...') - def test_lchoplen_compose(self): + def test_lchoplen_compose(self) -> None: self.assertEqual(lchoplen( '\u01b5\u0327\u0308\u01b6\u0327\u0308\u01b7\u0327\u0308\u01b8\u0327\u0308\u01b9\u0327\u0308\u01ba\u0327\u0308', 7), '\u01b5\u0327\u0308\u01b6\u0327\u0308\u01b7\u0327\u0308\u01b8\u0327\u0308\u01b9\u0327\u0308\u01ba\u0327\u0308') @@ -91,7 +91,7 @@ def test_lchoplen_compose(self): '\u01b5\u0327\u0308\u01b6\u0327\u0308\u01b7\u0327\u0308\u01b8\u0327\u0308\u01b9\u0327\u0308\u01ba\u0327\u0308', 3), '...') - def test_rchoplen_compose(self): + def test_rchoplen_compose(self) -> None: self.assertEqual(rchoplen( '\u01b5\u0327\u0308\u01b6\u0327\u0308\u01b7\u0327\u0308\u01b8\u0327\u0308\u01b9\u0327\u0308\u01ba\u0327\u0308', 7), '\u01b5\u0327\u0308\u01b6\u0327\u0308\u01b7\u0327\u0308\u01b8\u0327\u0308\u01b9\u0327\u0308\u01ba\u0327\u0308') From 1edfdde5f5ab6e6c99134863251dfa189f49f7e5 Mon Sep 17 00:00:00 2001 From: mkoppmann Date: Fri, 16 Dec 2022 15:49:10 +0100 Subject: [PATCH 2/3] Add type annotations generated by pyre See: https://pyre-check.org --- lib/cfv/BitTorrent/bencode.py | 7 +- lib/cfv/BitTorrent/btformats.py | 3 +- lib/cfv/caching.py | 2 +- lib/cfv/fileutil.py | 21 +++-- lib/cfv/hash.py | 7 +- lib/cfv/osutil.py | 19 ++-- lib/cfv/strutil.py | 5 +- lib/cfv/term.py | 2 +- setup.py | 2 +- test/benchmark.py | 10 +- test/cfvtest.py | 10 +- test/test.py | 160 ++++++++++++++++---------------- test/test_caching.py | 4 +- 13 files changed, 133 insertions(+), 119 deletions(-) diff --git a/lib/cfv/BitTorrent/bencode.py b/lib/cfv/BitTorrent/bencode.py index 3d55f40..75d7ce6 100644 --- a/lib/cfv/BitTorrent/bencode.py +++ b/lib/cfv/BitTorrent/bencode.py @@ -2,9 +2,10 @@ # see LICENSE.txt for license information from builtins import object +from typing import Sized -def decode_int(x, f) -> tuple: +def decode_int(x, f: int) -> tuple: f += 1 newf = x.index(b'e', f) try: @@ -68,7 +69,7 @@ def decode_dict(x, f) -> tuple: } -def bdecode(x): +def bdecode(x: Sized): try: r, pos = decode_func[x[0:1]](x, 0) except (IndexError, KeyError): @@ -252,7 +253,7 @@ def encode_int(x, r) -> None: r.extend((b'i', b'%d' % x, b'e')) -def encode_string(x, r) -> None: +def encode_string(x: Sized, r) -> None: r.extend((b'%d' % len(x), b':', x)) diff --git a/lib/cfv/BitTorrent/btformats.py b/lib/cfv/BitTorrent/btformats.py index 77e7627..62af90c 100644 --- a/lib/cfv/BitTorrent/btformats.py +++ b/lib/cfv/BitTorrent/btformats.py @@ -4,9 +4,10 @@ from re import compile from builtins import range +from typing import Pattern -reg = compile(br'^[^/\\.~][^/\\]*$') +reg: Pattern[bytes] = compile(br'^[^/\\.~][^/\\]*$') def check_info(info) -> None: diff --git a/lib/cfv/caching.py b/lib/cfv/caching.py index 0eba2ae..40a5ad5 100644 --- a/lib/cfv/caching.py +++ b/lib/cfv/caching.py @@ -103,7 +103,7 @@ def nocase_dirfiles(self, dir, match) -> list: _FINDFILE = 1 _FINDDIR = 0 - def nocase_findfile(self, filename, find=_FINDFILE): + def nocase_findfile(self, filename, find: int=_FINDFILE): cur = osutil.curdiru parts = osutil.path_split(filename.lower()) # print 'nocase_findfile:',filename,parts,len(parts) diff --git a/lib/cfv/fileutil.py b/lib/cfv/fileutil.py index cc3dcce..1776627 100644 --- a/lib/cfv/fileutil.py +++ b/lib/cfv/fileutil.py @@ -2,9 +2,10 @@ import codecs import sys -from io import BytesIO, TextIOWrapper +from io import BufferedWriter, BytesIO, TextIOWrapper from cfv import osutil +from typing import Union _badbytesmarker = '\ufffe' @@ -18,12 +19,12 @@ def _markbadbytes(exc): class PeekFile(object): - def __init__(self, fileobj, filename=None, encoding='auto'): + def __init__(self, fileobj, filename=None, encoding: str='auto') -> None: self.fileobj = fileobj self._init_decodeobj(encoding) self.name = filename or fileobj.name - def _init_decodeobj(self, encoding): + def _init_decodeobj(self, encoding) -> None: self._encoding = None self._decode_start = 0 self._decode_errs = 0 @@ -42,7 +43,7 @@ def _init_decodeobj(self, encoding): self._encodeerrors = osutil.getencodeerrors(encoding, default='markbadbytes') self._reset_decodeobj() - def _reset_decodeobj(self): + def _reset_decodeobj(self) -> None: self.fileobj.seek(self._decode_start) if self._encoding is not None: self.decodeobj = codecs.getreader(self._encoding)(self.fileobj, errors=self._encodeerrors) @@ -88,7 +89,7 @@ def peeknextline(self, *args): self._decode_errs = 1 return '' - def _done_peeking(self, raw): + def _done_peeking(self, raw) -> None: if raw: fileobj = self.fileobj fileobj.seek(0) @@ -118,11 +119,11 @@ def read(self, *args): return self.read(*args) -def PeekFileNonseekable(fileobj, filename, encoding): +def PeekFileNonseekable(fileobj, filename, encoding) -> PeekFile: return PeekFile(BytesIO(fileobj.read()), filename, encoding) -def PeekFileGzip(filename, encoding): +def PeekFileGzip(filename, encoding) -> PeekFile: import gzip if filename == '-': f = gzip.GzipFile(mode='rb', fileobj=sys.stdin.buffer) @@ -132,10 +133,10 @@ def PeekFileGzip(filename, encoding): class NoCloseFile(object): - def __init__(self, fileobj): + def __init__(self, fileobj) -> None: self.fileobj = fileobj - def __getattr__(self, attr): + def __getattr__(self, attr: str): if attr == 'close': attr = 'flush' return getattr(self.fileobj, attr) @@ -155,7 +156,7 @@ def open_read(filename, config): return PeekFile(open(filename, mode), filename, config.encoding) -def open_write(filename, config, force_raw=False): +def open_write(filename, config, force_raw: bool=False): if config.gzip >= 2 or (config.gzip >= 0 and filename[-3:].lower() == '.gz'): import gzip kwargs = { diff --git a/lib/cfv/hash.py b/lib/cfv/hash.py index 83cc175..f54a6bb 100644 --- a/lib/cfv/hash.py +++ b/lib/cfv/hash.py @@ -5,6 +5,7 @@ import struct import sys from zlib import crc32 +from typing import Union try: @@ -12,7 +13,7 @@ raise ImportError import mmap - def dommap(fileno, len): # generic mmap. ACCESS_* args work on both nix and win. + def dommap(fileno, len) -> Union[bytes, mmap.mmap]: # generic mmap. ACCESS_* args work on both nix and win. if len == 0: return b'' # mmap doesn't like length=0 return mmap.mmap(fileno, len, access=mmap.ACCESS_READ) @@ -68,7 +69,7 @@ def finish(m, s): return m.digest(), s -def getfilechecksumgeneric(algo) -> tuple: +def getfilechecksumgeneric(algo: str) -> tuple: if hasattr(hashlib, algo): hasher = getattr(hashlib, algo) else: @@ -86,7 +87,7 @@ def __init__(self, s=b'') -> None: def update(self, s) -> None: self.value = crc32(s, self.value) - def digest(self): + def digest(self) -> bytes: return struct.pack('>I', self.value & 0xFFFFFFFF) diff --git a/lib/cfv/osutil.py b/lib/cfv/osutil.py index a6a3beb..303eae4 100644 --- a/lib/cfv/osutil.py +++ b/lib/cfv/osutil.py @@ -2,23 +2,24 @@ import os import re import sys +from typing import Pattern, Union if hasattr(locale, 'getpreferredencoding'): - preferredencoding = locale.getpreferredencoding() or 'ascii' + preferredencoding: str = locale.getpreferredencoding() or 'ascii' else: - preferredencoding = 'ascii' + preferredencoding: str = 'ascii' if hasattr(sys, 'getfilesystemencoding'): - fsencoding = sys.getfilesystemencoding() + fsencoding: str = sys.getfilesystemencoding() else: - fsencoding = preferredencoding + fsencoding: str = preferredencoding if hasattr(sys, 'getfilesystemencodeerrors'): - fsencodeerrors = sys.getfilesystemencodeerrors() + fsencodeerrors: str = sys.getfilesystemencodeerrors() else: - fsencodeerrors = 'surrogateescape' + fsencodeerrors: str = 'surrogateescape' def getencoding(encoding, preferred=None): @@ -52,7 +53,7 @@ def getencodeerrors(encoding, default=None): getcwdu = os.getcwd -curdiru = os.curdir +curdiru: str = os.curdir listdir = os.listdir @@ -83,7 +84,7 @@ def path_split(filename): return parts -def strippath(filename, num='a', _splitdrivere=re.compile(r'[a-z]:[/\\]', re.IGNORECASE)): +def strippath(filename, num: str='a', _splitdrivere: Pattern[str]=re.compile(r'[a-z]:[/\\]', re.IGNORECASE)): """Strip off path components from the left side of the filename. >>> strippath(os.path.join('c:','foo','bar','baz')) @@ -120,6 +121,6 @@ def strippath(filename, num='a', _splitdrivere=re.compile(r'[a-z]:[/\\]', re.IGN return os.path.join(*parts[num:]) -def fcmp(f1, f2): +def fcmp(f1: Union[os.PathLike[bytes], os.PathLike[str], bytes, str], f2: Union[os.PathLike[bytes], os.PathLike[str], bytes, str]) -> bool: import filecmp return filecmp.cmp(f1, f2, shallow=0) diff --git a/lib/cfv/strutil.py b/lib/cfv/strutil.py index 1bfeedf..62f7e98 100644 --- a/lib/cfv/strutil.py +++ b/lib/cfv/strutil.py @@ -4,6 +4,7 @@ import unicodedata from cfv import osutil +from typing import Union def safesort(seq) -> None: @@ -60,7 +61,7 @@ def uwidth(u) -> int: return w -def lchoplen(line, max) -> str: +def lchoplen(line: Union[bytes, str], max) -> str: """Return line cut on left so it takes at most max character cells when printed. >>> lchoplen('hello world',6) @@ -89,7 +90,7 @@ def lchoplen(line, max) -> str: return ''.join(chars) -def rchoplen(line, max) -> str: +def rchoplen(line: Union[bytes, str], max) -> str: """Return line cut on right so it takes at most max character cells when printed. >>> rchoplen('hello world',6) diff --git a/lib/cfv/term.py b/lib/cfv/term.py index 617982d..475da34 100644 --- a/lib/cfv/term.py +++ b/lib/cfv/term.py @@ -25,4 +25,4 @@ def getscrwidth() -> int: return 80 -scrwidth = getscrwidth() +scrwidth: int = getscrwidth() diff --git a/setup.py b/setup.py index 0369c6e..d834e03 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ RE_VERSION = r"^__version__\s*=\s*'([^']*)'$" -def _read(path): +def _read(path) -> str: with open(path, 'rt', encoding='utf8') as f: return f.read() diff --git a/test/benchmark.py b/test/benchmark.py index bac2648..bb05b31 100755 --- a/test/benchmark.py +++ b/test/benchmark.py @@ -30,9 +30,11 @@ from functools import partial import cfvtest +from typing import Sized, SupportsFloat, Union +from typing_extensions import SupportsIndex -def human_int(value) -> int: +def human_int(value: Sized) -> int: """Convert values with size suffix to integers. >>> human_int('10') 10 @@ -62,7 +64,7 @@ def human_int(value) -> int: return int(value) * multiplier -def create_test_file(path, max_size, verbose=False) -> None: +def create_test_file(path, max_size, verbose: bool=False) -> None: size = random.randint(1, max_size) if verbose: print('creating', path, 'size', size) @@ -73,7 +75,7 @@ def create_test_file(path, max_size, verbose=False) -> None: size -= 1 -def create_test_dir(root, num_files, branch_factor, max_size, verbose=False) -> None: +def create_test_dir(root, num_files: Union[SupportsFloat, SupportsIndex], branch_factor: Union[SupportsFloat, SupportsIndex], max_size, verbose: bool=False) -> None: levels = int(math.ceil(math.log(num_files, branch_factor))) formatlen = int(math.ceil(math.log(branch_factor, 16))) path_counter = [0] * levels @@ -107,7 +109,7 @@ def create(args) -> None: create_test_dir(start_path, args.files, args.branch_factor, args.max_size, verbose=args.verbose) -def print_times(name, results, iterations, verbose=False) -> None: +def print_times(name, results, iterations, verbose: bool=False) -> None: best = min(results) print('%s: best=%.4g msec' % (name, best * 1000 / iterations)) if verbose: diff --git a/test/cfvtest.py b/test/cfvtest.py index 9215e24..460e63d 100644 --- a/test/cfvtest.py +++ b/test/cfvtest.py @@ -38,6 +38,8 @@ from typing import Iterator from unittest.suite import TestSuite +from types import ModuleType + cfvenv = '' cfvfn = None @@ -77,7 +79,7 @@ def expand_cmdline(cmd) -> list: return argv -def runcfv_exe(cmd, stdin=None, stdout=None, stderr=None, need_reload=0) -> tuple: +def runcfv_exe(cmd, stdin=None, stdout=None, stderr=None, need_reload: int=0) -> tuple: import subprocess def open_output(fn): @@ -113,7 +115,7 @@ def open_output(fn): # TODO: make the runcfv_* functions (optionally?) take args as a list instead of a string -def runcfv_py(cmd, stdin=None, stdout=None, stderr=None, need_reload=0) -> tuple: +def runcfv_py(cmd, stdin=None, stdout=None, stderr=None, need_reload: int=0) -> tuple: from io import BytesIO, TextIOWrapper obuf = BytesIO() obuftext = TextIOWrapper(obuf) @@ -209,14 +211,14 @@ def setcfv(fn=None, internal=None) -> None: get_version_flags() -def setenv(k, v) -> None: +def setenv(k: str, v: str) -> None: global cfvenv cfvenv = "%s=%s %s" % (k, v, cfvenv) os.environ[k] = v get_version_flags() -def my_import(name): +def my_import(name: str) -> ModuleType: mod = __import__(name) components = name.split('.') for comp in components[1:]: diff --git a/test/test.py b/test/test.py index 4fd5f7c..e0d2c20 100755 --- a/test/test.py +++ b/test/test.py @@ -46,15 +46,19 @@ from glob import glob import cfvtest +from io import BufferedReader, TextIOWrapper +from typing import List, Optional, Pattern, Tuple, Union +optlist: List[Tuple[str, str]] +args: List[str] if hasattr(locale, 'getpreferredencoding'): - preferredencoding = locale.getpreferredencoding() or 'ascii' + preferredencoding: str = locale.getpreferredencoding() or 'ascii' else: - preferredencoding = 'ascii' + preferredencoding: str = 'ascii' -def is_encodable(s, enc=preferredencoding): +def is_encodable(s, enc=preferredencoding) -> bool: try: s.encode(enc) return True @@ -100,35 +104,35 @@ def is_encodable(s, enc=preferredencoding): } -def fmt_hascrc(f): +def fmt_hascrc(f: str): return fmt_info[f][0] -def fmt_hassize(f): +def fmt_hassize(f: str): return fmt_info[f][1] -def fmt_cancreate(f): +def fmt_cancreate(f: str): return fmt_info[f][2] -def fmt_available(f): +def fmt_available(f: str): return fmt_info[f][3] -def fmt_istext(f): +def fmt_istext(f: str): return fmt_info[f][4] -def fmt_preferredencoding(f): +def fmt_preferredencoding(f: str): return fmt_info[f][5] -def fmt_iscoreutils(f): +def fmt_iscoreutils(f: str): return fmt_info[f][6] -def allfmts(): +def allfmts() -> List[str]: return list(fmt_info.keys()) @@ -145,7 +149,7 @@ def coreutilsfmts(): class rcurry(object): - def __init__(self, func, *args, **kw): + def __init__(self, func, *args, **kw) -> None: self.curry_func = func self.curry_args = args self.curry_kw = kw @@ -156,7 +160,7 @@ def __call__(self, *_args, **_kwargs): return self.curry_func(*(_args + self.curry_args), **kw) -def pathfind(p, path=os.environ.get('PATH', os.defpath).split(os.pathsep)): +def pathfind(p, path: List[str]=os.environ.get('PATH', os.defpath).split(os.pathsep)) -> Optional[int]: for d in path: if os.path.exists(os.path.join(d, p)): return 1 @@ -172,7 +176,7 @@ def pathjoin_and_mkdir(*components): return result -def readfile(fn, textmode=False): +def readfile(fn, textmode: bool=False): if textmode: mode = 't' else: @@ -182,13 +186,13 @@ def readfile(fn, textmode=False): return d -def writefile(fn, data): +def writefile(fn, data) -> None: with open(fn, 'wb') as f: if data: f.write(data) -def writefile_and_reopen(fn, data): +def writefile_and_reopen(fn, data) -> BufferedReader: """Write data to file, close, and then reopen readonly, and return the fd. This is for the benefit of windows, where you need to close and reopen the @@ -204,19 +208,19 @@ class stats(object): failed = 0 -def logr(text): +def logr(text) -> None: logfile.write(text) -def log(text): +def log(text) -> None: logr(text + '\n') -def test_log_start(cmd, kw): +def test_log_start(cmd, kw) -> None: log('*** testing ' + cmd + (kw and ' ' + str(kw) or '')) -def test_log_finish(cmd, s, r, output, kw): +def test_log_finish(cmd, s, r: int, output, kw) -> None: if r: stats.failed += 1 print('\n>>> failed test:', cmd, (kw and ' ' + str(kw) or '')) @@ -242,7 +246,7 @@ def test_log_finish(cmd, s, r, output, kw): log('') -def test_log_results(cmd, s, o, r, kw): +def test_log_results(cmd, s, o, r, kw) -> None: """ cmd=command being tested (info only) s=return status @@ -254,7 +258,7 @@ def test_log_results(cmd, s, o, r, kw): test_log_finish(cmd, s, r, o, kw) -def test_external(cmd, test): +def test_external(cmd: Union[bytes, str], test) -> None: # TODO: replace this with subprocess from subprocess import getstatusoutput s, o = getstatusoutput(cmd) @@ -262,7 +266,7 @@ def test_external(cmd, test): test_log_results(cmd, s, o, r, None) -def test_generic(cmd, test, **kw): +def test_generic(cmd, test, **kw) -> None: # s, o = cfvtest.runcfv(cmd) s, o = cfvtest.runcfv(*(cmd,), **kw) r = test(s, o) @@ -273,7 +277,7 @@ class cst_err(Exception): pass -def cfv_stdin_test(cmd, file): +def cfv_stdin_test(cmd, file) -> None: s1 = s2 = None o1 = o2 = '' r = 0 @@ -295,7 +299,7 @@ def cfv_stdin_test(cmd, file): test_log_results('stdin/out of ' + cmd + ' with file ' + file, (s1, s2), o1 + '\n' + o2, r, None) -def cfv_stdin_progress_test(t, file): +def cfv_stdin_progress_test(t, file) -> None: s1 = s2 = None o1 = o2 = c1 = c2 = '' r = 0 @@ -333,13 +337,13 @@ def cfv_stdin_progress_test(t, file): shutil.rmtree(dir) -def rx_test(pat, str): +def rx_test(pat, str) -> int: if re.search(pat, str): return 0 return 1 -def status_test(s, o, expected=0): +def status_test(s, o, expected: int=0) -> int: if s == expected: return 0 return 1 @@ -355,28 +359,28 @@ def status_test(s, o, expected=0): rx_cferror = r', (\d+) chksum file errors' rx_misnamed = r', (\d+) misnamed' rx_End = r'(, \d+ differing cases)?(, \d+ quoted filenames)?. [\d.]+ seconds, [\d.]+K(/s)?$' -rxo_TestingFrom = re.compile(r'^testing from .* \((.+?)\b.*\)[\n\r]*$', re.M) +rxo_TestingFrom: Pattern[str] = re.compile(r'^testing from .* \((.+?)\b.*\)[\n\r]*$', re.M) -def optionalize(s): +def optionalize(s) -> str: return '(?:%s)?' % s -rx_StatusLine = rx_Begin + ''.join(map(optionalize, [rx_badcrc, rx_badsize, rx_notfound, rx_ferror, rx_unv, rx_cferror, rx_misnamed])) + rx_End +rx_StatusLine: str = rx_Begin + ''.join(map(optionalize, [rx_badcrc, rx_badsize, rx_notfound, rx_ferror, rx_unv, rx_cferror, rx_misnamed])) + rx_End class OneOf(object): - def __init__(self, *possibilities): + def __init__(self, *possibilities) -> None: self.possible = possibilities - def __eq__(self, a): + def __eq__(self, a: object): return a in self.possible - def __repr__(self): + def __repr__(self) -> str: return 'OneOf' + repr(self.possible) -def intize(s): +def intize(s) -> int: return s and int(s) or 0 @@ -398,8 +402,8 @@ def tail(s): return '' -re_sfv_comment = re.compile('^; Generated by .* on .*$', re.M | re.I) -re_crc_comment = re.compile('^Generated at: .*$', re.M | re.I) +re_sfv_comment: Pattern[str] = re.compile('^; Generated by .* on .*$', re.M | re.I) +re_crc_comment: Pattern[str] = re.compile('^Generated at: .*$', re.M | re.I) def remove_varying_comments(t, text): @@ -410,28 +414,28 @@ def remove_varying_comments(t, text): return text -def cfv_test(s, o, op=operator.gt, opval=0): +def cfv_test(s, o, op=operator.gt, opval: int=0) -> int: x = re.search(rx_Begin + rx_End, tail(o)) if s == 0 and x and x.group(1) == x.group(2) and op(int(x.group(1)), opval): return 0 return 1 -def cfv_substatus_test(s, o, unv=0, notfound=0, badcrc=0, badsize=0, cferror=0, ferror=0): +def cfv_substatus_test(s, o, unv: int=0, notfound: int=0, badcrc: int=0, badsize: int=0, cferror: int=0, ferror: int=0) -> Union[int, str]: expected_status = (badcrc and 2) | (badsize and 4) | (notfound and 8) | (ferror and 16) | (unv and 32) | (cferror and 64) if s & expected_status == expected_status and not s & 1: return 0 return 'bad status expected %s got %s' % (expected_status, s) -def cfv_status_test(s, o, unv=0, notfound=0, badcrc=0, badsize=0, cferror=0, ferror=0): +def cfv_status_test(s, o, unv: int=0, notfound: int=0, badcrc: int=0, badsize: int=0, cferror: int=0, ferror: int=0) -> Union[int, str]: expected_status = (badcrc and 2) | (badsize and 4) | (notfound and 8) | (ferror and 16) | (unv and 32) | (cferror and 64) if s == expected_status: return 0 return 'bad status expected %s got %s' % (expected_status, s) -def cfv_all_test(s, o, files=-2, ok=0, unv=0, notfound=0, badcrc=0, badsize=0, cferror=0, ferror=0, misnamed=0): +def cfv_all_test(s, o, files: int=-2, ok: int=0, unv: int=0, notfound: int=0, badcrc: int=0, badsize: int=0, cferror: int=0, ferror: int=0, misnamed: int=0): x = re.search(rx_StatusLine, tail(o)) if x: if files == -2: @@ -447,7 +451,7 @@ def cfv_all_test(s, o, files=-2, ok=0, unv=0, notfound=0, badcrc=0, badsize=0, c return 'status line not found in output' -def cfv_unv_test(s, o, unv=1): +def cfv_unv_test(s, o, unv: int=1) -> int: x = re.search(rx_Begin + rx_unv + rx_End, tail(o)) if s != 0 and x and x.group(1) == x.group(2) and int(x.group(1)) > 0: if unv and int(x.group(3)) != unv: @@ -456,14 +460,14 @@ def cfv_unv_test(s, o, unv=1): return 1 -def cfv_unvonly_test(s, o, unv=1): +def cfv_unvonly_test(s, o, unv: int=1) -> int: x = re.search(rx_Begin + rx_unv + rx_End, tail(o)) if s != 0 and x and int(x.group(3)) == unv: return 0 return 1 -def cfv_notfound_test(s, o, unv=1): +def cfv_notfound_test(s, o, unv: int=1) -> int: x = re.search(rx_Begin + rx_notfound + rx_End, tail(o)) if s != 0 and x and int(x.group(2)) == 0 and int(x.group(1)) > 0: if int(x.group(3)) != unv: @@ -472,7 +476,7 @@ def cfv_notfound_test(s, o, unv=1): return 1 -def cfv_cferror_test(s, o, bad=1): +def cfv_cferror_test(s, o, bad: int=1) -> int: x = re.search(rx_Begin + rx_cferror + rx_End, tail(o)) if s != 0 and x and int(x.group(3)) > 0: if bad > 0 and int(x.group(3)) != bad: @@ -481,7 +485,7 @@ def cfv_cferror_test(s, o, bad=1): return 1 -def cfv_bad_test(s, o, bad=-1): +def cfv_bad_test(s, o, bad: int=-1) -> int: x = re.search(rx_Begin + rx_bad + rx_End, tail(o)) if s != 0 and x and int(x.group(1)) > 0 and int(x.group(3)) > 0: if bad > 0 and int(x.group(3)) != bad: @@ -490,7 +494,7 @@ def cfv_bad_test(s, o, bad=-1): return 1 -def cfv_typerestrict_test(s, o, t): +def cfv_typerestrict_test(s, o, t) -> int: matches = rxo_TestingFrom.findall(o) if not matches: return 1 @@ -500,7 +504,7 @@ def cfv_typerestrict_test(s, o, t): return 0 -def cfv_listdata_test(s, o): +def cfv_listdata_test(s, o) -> int: if s == 0 and re.search('^data1\0data2\0data3\0data4\0$', o, re.I): return 0 return 1 @@ -510,25 +514,25 @@ def joincurpath(f): return os.path.join(os.getcwd(), f) -def cfv_listdata_abs_test(s, o): +def cfv_listdata_abs_test(s, o) -> int: if s == 0 and re.search('^' + re.escape('\0'.join(map(joincurpath, ['data1', 'data2', 'data3', 'data4']))) + '\0$', o, re.I): return 0 return 1 -def cfv_listdata_unv_test(s, o): +def cfv_listdata_unv_test(s, o) -> int: if s == 32 and re.search('^testfix.csv\0unchecked.dat\0$', o, re.I): return 0 return 1 -def cfv_listdata_bad_test(s, o): +def cfv_listdata_bad_test(s, o) -> int: if s & 6 and not s & ~6 and re.search('^(d2.)?test4.foo\0test.ext.end\0test2.foo\0test3\0$', o, re.I): return 0 return 1 -def cfv_version_test(s, o): +def cfv_version_test(s, o) -> int: x = re.search(r'cfv v([\d.]+(?:\.dev\d+)?) -', o) with open(os.path.join(cfvtest.testpath, os.pardir, 'Changelog'), 'rt') as f: x3 = re.search(r' v([\d.]+(?:\.dev\d+)?):', f.readline()) @@ -548,7 +552,7 @@ def cfv_version_test(s, o): return 1 -def cfv_cftypehelp_test(s, o, expected): +def cfv_cftypehelp_test(s, o, expected) -> Union[int, str]: if s != expected: return 1 for tname in allfmts() + ['auto']: @@ -557,7 +561,7 @@ def cfv_cftypehelp_test(s, o, expected): return 0 -def cfv_nooutput_test(s, o, expected=0): +def cfv_nooutput_test(s, o, expected: int=0) -> Union[int, str]: if s != expected: return 1 if o: @@ -565,7 +569,7 @@ def cfv_nooutput_test(s, o, expected=0): return 0 -def T_test(f, extra=None): +def T_test(f, extra=None) -> None: cmd = cfvcmd if extra: cmd += ' ' + extra @@ -616,7 +620,7 @@ def noprogress_test(s, o): test_generic(cmd + ' -T --progress=no -f test' + f, noprogress_test) -def gzC_test(f, extra=None, verify=None, t=None, d=None): +def gzC_test(f, extra=None, verify=None, t=None, d=None) -> None: cmd = cfvcmd if not t: t = f @@ -656,7 +660,7 @@ def gzC_test(f, extra=None, verify=None, t=None, d=None): shutil.rmtree(tmpd) -def C_test(f, extra=None, verify=None, t=None, d='data?'): +def C_test(f, extra=None, verify=None, t=None, d: Union[os.PathLike[bytes], os.PathLike[str], bytes, str]='data?') -> None: gzC_test(f, extra=extra, t=t, d=d) cmd = cfvcmd if not t: @@ -706,7 +710,7 @@ def C_test_encoding(enc): C_test_encoding('utf-16') -def create_funkynames(t, d, chr, deep): +def create_funkynames(t, d, chr, deep) -> int: num = 0 for i in range(1, 256): n = chr(i) @@ -748,7 +752,7 @@ def create_funkynames(t, d, chr, deep): return num -def C_funkynames_test(t): +def C_funkynames_test(t) -> None: def fschr(i): return os.fsdecode(b'%c' % i) @@ -857,7 +861,7 @@ def is_fmtokfn(s): shutil.rmtree(d3) -def ren_test(f, extra=None, verify=None, t=None): +def ren_test(f, extra=None, verify=None, t=None) -> None: join = os.path.join dir = tempfile.mkdtemp() try: @@ -922,7 +926,7 @@ def flscmp(t, n, fls): shutil.rmtree(dir) -def search_test(t, test_nocrc=0, extra=None): +def search_test(t, test_nocrc: int=0, extra=None) -> None: cfn = os.path.join(os.getcwd(), 'test.' + t) hassize = fmt_hassize(t) if test_nocrc: @@ -1105,7 +1109,7 @@ def dont_find_dir_test(s, o): shutil.rmtree(d) -def quoted_search_test(): +def quoted_search_test() -> None: d = tempfile.mkdtemp() try: join = os.path.join @@ -1131,7 +1135,7 @@ def quoted_search_test(): shutil.rmtree(d) -def symlink_test(): +def symlink_test() -> None: dir = tempfile.mkdtemp() dir1 = 'd1' dir2 = 'd2' @@ -1176,7 +1180,7 @@ def r_unv_verbose_test(s, o): shutil.rmtree(dir) -def deep_unverified_test(): +def deep_unverified_test() -> None: dir = tempfile.mkdtemp() try: join = os.path.join @@ -1251,7 +1255,7 @@ def r_unv_verbose_test(s, o): shutil.rmtree(dir) -def test_encoding_detection(): +def test_encoding_detection() -> None: datad = tempfile.mkdtemp() d = tempfile.mkdtemp() try: @@ -1295,7 +1299,7 @@ def test_encoding_detection(): shutil.rmtree(datad) -def test_encoding2(): +def test_encoding2() -> None: """Non-trivial (actual non-ascii characters) encoding test. These tests will probably always fail unless you use a unicode locale and python 2.3+. """ @@ -1391,7 +1395,7 @@ def test_encoding2(): shutil.rmtree(d) -def largefile2GB_test(): +def largefile2GB_test() -> None: # hope you have sparse file support ;) fn = os.path.join('bigfile2', 'bigfile') f = open(fn, 'wb') @@ -1407,7 +1411,7 @@ def largefile2GB_test(): os.unlink(fn) -def largefile4GB_test(): +def largefile4GB_test() -> None: # hope you have sparse file support ;) fn = os.path.join('bigfile', 'bigfile') f = open(fn, 'wb') @@ -1425,7 +1429,7 @@ def largefile4GB_test(): os.unlink(fn) -def manyfiles_test(t): +def manyfiles_test(t) -> None: try: max_open = os.sysconf('SC_OPEN_MAX') except (AttributeError, ValueError, OSError): @@ -1448,7 +1452,7 @@ def manyfiles_test(t): shutil.rmtree(d) -def specialfile_test(cfpath): +def specialfile_test(cfpath: Union[os.PathLike[bytes], os.PathLike[str], bytes, str]) -> None: try: import threading except ImportError: @@ -1489,7 +1493,7 @@ def pusher(fpath): shutil.rmtree(d) -def unrecognized_cf_test(): +def unrecognized_cf_test() -> None: def cfv_unrectype(s, o): r = cfv_all_test(s, o, cferror=1) if r: @@ -1516,7 +1520,7 @@ def cfv_unrecenc(s, o): test_generic(cfvcmd + ' -T --encoding=utf-16 -f data1', cfv_unrecenc) -def private_torrent_test(): +def private_torrent_test() -> None: cmd = cfvcmd tmpd = tempfile.mkdtemp() try: @@ -1534,7 +1538,7 @@ def private_torrent_test(): shutil.rmtree(tmpd) -def all_unittest_tests(): +def all_unittest_tests() -> int: if not run_internal: return 0 test_log_start('all_unittests_suite', None) @@ -1557,7 +1561,7 @@ def all_unittest_tests(): run_exit_early = 0 -def show_help_and_exit(err=None): +def show_help_and_exit(err=None) -> None: if err: print('error:', err) print() @@ -1602,17 +1606,17 @@ def show_help_and_exit(err=None): cfvtest.setcfv(fn=args and args[0] or None, internal=run_internal) if run_unittests_only: - logfile = sys.stdout + logfile: TextIOWrapper = sys.stdout all_unittest_tests() sys.exit() # set everything to default in case user has different in config file cfvcmd = '-ZNVRMUI --unquote=no --fixpaths="" --strippaths=0 --showpaths=auto-relative --progress=no --announceurl=url --noprivate_torrent' -logfile = open(os.path.join(tempfile.gettempdir(), 'cfv_%s_test-%s.log' % (cfvtest.ver_cfv, time.strftime('%Y%m%dT%H%M%S'))), 'wt') +logfile: TextIOWrapper = open(os.path.join(tempfile.gettempdir(), 'cfv_%s_test-%s.log' % (cfvtest.ver_cfv, time.strftime('%Y%m%dT%H%M%S'))), 'wt') -def all_tests(): +def all_tests() -> int: stats.ok = stats.failed = 0 symlink_test() @@ -1853,7 +1857,7 @@ def sfvverify(f): return stats.failed -def copytree(src, dst, ignore=None): +def copytree(src, dst, ignore=None) -> None: if ignore is None: ignore = [] for name in os.listdir(src): @@ -1873,7 +1877,7 @@ def copytree(src, dst, ignore=None): # copy the testdata into a temp dir in order to avoid .svn dirs breaking some tests -tmpdatapath = tempfile.mkdtemp() +tmpdatapath: str = tempfile.mkdtemp() try: copytree(cfvtest.datapath, tmpdatapath, ignore=['.svn']) os.chdir(tmpdatapath) # do this after the setcfv, since the user may have specified a relative path diff --git a/test/test_caching.py b/test/test_caching.py index bc9692d..8233338 100644 --- a/test/test_caching.py +++ b/test/test_caching.py @@ -36,7 +36,7 @@ def tearDown(self) -> None: def mkpath(self, name): return os.path.join(self.tempdir, name) - def mkfile(self, name, contents): + def mkfile(self, name, contents: str): head, tail = os.path.split(name) fullhead = os.path.join(self.tempdir, head) if head and not os.path.exists(fullhead): @@ -60,7 +60,7 @@ def tearDown(self) -> None: def mkpath(self, name): return name - def mkfile(self, name, contents): + def mkfile(self, name, contents: str): head, tail = os.path.split(name) if head and not os.path.exists(head): os.makedirs(head) From fb24929af217e0836bb369fe1fd5a2e1ce7f77da Mon Sep 17 00:00:00 2001 From: mkoppmann Date: Fri, 16 Dec 2022 16:19:59 +0100 Subject: [PATCH 3/3] Add typing-extensions to setup requirements --- setup.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/setup.py b/setup.py index d834e03..4b0b795 100644 --- a/setup.py +++ b/setup.py @@ -7,6 +7,9 @@ RE_VERSION = r"^__version__\s*=\s*'([^']*)'$" +install_requires: list[str] = [ + "typing_extensions", +] def _read(path) -> str: with open(path, 'rt', encoding='utf8') as f: @@ -57,6 +60,7 @@ def _get_version(path): 'Original Project': 'http://cfv.sourceforge.net/', }, python_requires='>=3.5', + install_requires=install_requires, packages=find_packages('lib'), package_dir={'': 'lib'}, include_package_data=True,