diff --git a/.github/workflows/test_and_release.yml b/.github/workflows/test_and_release.yml index ed7b54f..42843b4 100644 --- a/.github/workflows/test_and_release.yml +++ b/.github/workflows/test_and_release.yml @@ -41,7 +41,7 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - python-version: ['3.5', '3.6', '3.7'] + python-version: ['3.6', '3.7'] steps: - uses: actions/checkout@v3 diff --git a/setup.py b/setup.py index 664c384..c969d5f 100755 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ "setproctitle": ["setproctitle"], "dnspython": ["dnspython"], }, - python_requires=">=3.5", + python_requires=">=3.6", license='GPL v3 or later', url="https://github.com/dlenski/vpn-slice", packages=["vpn_slice"], diff --git a/vpn_slice/__main__.py b/vpn_slice/__main__.py index f341bbe..7b90072 100755 --- a/vpn_slice/__main__.py +++ b/vpn_slice/__main__.py @@ -68,7 +68,7 @@ def get_default_providers(): ) else: return dict( - platform = OSError('Your platform, {}, is unsupported'.format(platform)) + platform = OSError(f'Your platform, {platform}, is unsupported') ) @@ -118,19 +118,19 @@ def do_disconnect(env, args): try: pid = int(open(pidfile).read()) except (OSError, ValueError): - print("WARNING: could not read pid from %s" % pidfile, file=stderr) + print(f"WARNING: could not read pid from {pidfile}", file=stderr) else: try: providers.process.kill(pid) except OSError as e: - print("WARNING: could not kill pid %d from %s: %s" % (pid, pidfile, str(e)), file=stderr) + print(f"WARNING: could not kill pid {pid} from {pidfile}: {e!s}", file=stderr) else: if args.verbose: - print("Killed pid %d from %s" % (pid, pidfile), file=stderr) + print(f"Killed pid {pid} from {pidfile}", file=stderr) if 'hosts' in providers: removed = providers.hosts.write_hosts({}, args.name) if args.verbose: - print("Removed %d hosts from /etc/hosts" % removed, file=stderr) + print(f"Removed {removed} hosts from /etc/hosts", file=stderr) # delete all explicitly created routes: # 1. Explicit route to gateway (won't get deleted automatically because @@ -144,7 +144,7 @@ def do_disconnect(env, args): try: providers.route.remove_route(dest) except CalledProcessError: - print("WARNING: could not delete %s (%s)" % (desc, dest), file=stderr) + print(f"WARNING: could not delete {desc} ({dest})", file=stderr) else: providers.route.flush_cache() @@ -153,7 +153,7 @@ def do_disconnect(env, args): try: providers.firewall.deconfigure_firewall(env.tundev) except CalledProcessError: - print("WARNING: failed to deconfigure firewall for VPN interface (%s)" % env.tundev, file=stderr) + print(f"WARNING: failed to deconfigure firewall for VPN interface ({env.tundev})", file=stderr) if args.vpn_domains is not None: try: @@ -178,7 +178,7 @@ def do_connect(env, args): if args.verbose > 1: print("Set explicit route to VPN gateway %s (%s)" % (env.gateway, ', '.join('%s %s' % kv for kv in gwr.items())), file=stderr) else: - print("WARNING: no route to VPN gateway found %s; cannot set explicit route to it." % env.gateway) + print(f"WARNING: no route to VPN gateway found {env.gateway}; cannot set explicit route to it.") # drop incoming traffic from VPN if not args.incoming: @@ -205,10 +205,10 @@ def do_connect(env, args): if dev_mtu: mtu = int(dev_mtu) - 88 if mtu: - print("WARNING: guessing MTU is %d (the MTU of %s - 88)" % (mtu, dev), file=stderr) + print(f"WARNING: guessing MTU is {mtu} (the MTU of {dev} - 88)", file=stderr) else: mtu = 1412 - print("WARNING: guessing default MTU of %d (couldn't determine MTU of %s)" % (mtu, dev), file=stderr) + print(f"WARNING: guessing default MTU of {mtu} (couldn't determine MTU of {dev})", file=stderr) providers.route.set_link_info(env.tundev, state='up', mtu=mtu) # set IPv4, IPv6 addresses for tunnel device @@ -224,18 +224,18 @@ def do_connect(env, args): if r: exc_subnets.append((dest, r)) else: - print("WARNING: Ignoring unroutable split-exclude %s" % dest, file=stderr) + print(f"WARNING: Ignoring unroutable split-exclude {dest}", file=stderr) # set up routes to the DNS and Windows name servers, subnets, and local aliases ns = env.dns + env.dns6 + (env.nbns if args.nbns else []) for dest, tag in chain(tagged(ns, "nameserver"), tagged(args.subnets, "subnet"), tagged(args.aliases, "alias")): if args.verbose > 1: - print("Adding route to %s %s through %s." % (tag, dest, env.tundev), file=stderr) + print(f"Adding route to {tag} {dest} through {env.tundev}.", file=stderr) providers.route.replace_route(dest, dev=env.tundev) else: providers.route.flush_cache() if args.verbose: - print("Added routes for %d nameservers, %d subnets, %d aliases." % (len(ns), len(args.subnets), len(args.aliases)), file=stderr) + print(f"Added routes for {len(ns)} nameservers, {len(args.subnets)} subnets, {len(args.aliases)} aliases.", file=stderr) # restore routes to excluded subnets for dest, exc_route in exc_subnets: @@ -245,7 +245,7 @@ def do_connect(env, args): else: providers.route.flush_cache() if args.verbose: - print("Restored routes for %d excluded subnets." % len(exc_subnets), file=stderr) + print(f"Restored routes for {len(exc_subnets)} excluded subnets.", file=stderr) # Use vpn dns for provided domains if args.vpn_domains is not None: @@ -263,14 +263,14 @@ def do_post_connect(env, args): host_map = [] if args.ns_hosts: - ns_names = [ (ip, ('dns%d.%s' % (ii, args.name),)) for ii, ip in enumerate(env.dns + env.dns6) ] + ns_names = [ (ip, (f'dns{ii}.{args.name}',)) for ii, ip in enumerate(env.dns + env.dns6) ] if args.nbns: - ns_names += [ (ip, ('nbns%d.%s' % (ii, args.name),)) for ii, ip in enumerate(env.nbns) ] + ns_names += [ (ip, (f'nbns{ii}.{args.name}',)) for ii, ip in enumerate(env.nbns) ] host_map += ns_names if args.verbose: - print("Adding /etc/hosts entries for %d nameservers..." % len(ns_names), file=stderr) + print(f"Adding /etc/hosts entries for {len(ns_names)} nameservers...", file=stderr) for ip, names in ns_names: - print(" %s = %s" % (ip, ', '.join(map(str, names))), file=stderr) + print(f" {ip} = {', '.join(map(str, names))}", file=stderr) if args.hosts or args.prevent_idle_timeout or args.kerberos_dc: providers.dns.configure(dns_servers=(env.dns + env.dns6), search_domains=args.domain, bind_addresses=env.myaddrs) @@ -278,30 +278,30 @@ def do_post_connect(env, args): kdc_hosts = [] if args.kerberos_dc: if args.verbose: - print("Looking up Kerberos5 DC hosts for realm %r using VPN DNS servers..." % args.kerberos_dc, file=stderr) + print(f"Looking up Kerberos5 DC hosts for realm {args.kerberos_dc!r} using VPN DNS servers...", file=stderr) try: - kdc_hosts = providers.dns.lookup_srv('_kerberos._tcp.%s' % args.kerberos_dc) + kdc_hosts = providers.dns.lookup_srv(f'_kerberos._tcp.{args.kerberos_dc}') except Exception as e: - print("WARNING: Lookup for Kerberos5 DC hosts for realm %r on VPN DNS servers failed:\n\t%s" % (args.kerberos_dc, e), file=stderr) + print(f"WARNING: Lookup for Kerberos5 DC hosts for realm {args.kerberos_dc!r} on VPN DNS servers failed:\n\t{e}", file=stderr) else: if args.verbose: - print("Got %d Kerberos5 DC hosts." % len(kdc_hosts), file=stderr) + print(f"Got {len(kdc_hosts)} Kerberos5 DC hosts.", file=stderr) hosts_to_lookup = list(chain(tagged(args.hosts, 'host'), tagged(kdc_hosts, 'kdc'))) if hosts_to_lookup: if args.verbose: - print("Looking up %d hosts using VPN DNS servers..." % len(hosts_to_lookup), file=stderr) + print(f"Looking up {len(hosts_to_lookup)} hosts using VPN DNS servers...", file=stderr) for host, why in hosts_to_lookup: try: ips = providers.dns.lookup_host(host) except Exception as e: - print("WARNING: Lookup for %s on VPN DNS servers failed:\n\t%s" % (host, e), file=stderr) + print(f"WARNING: Lookup for {host} on VPN DNS servers failed:\n\t{e}", file=stderr) else: if ips is None: - print("WARNING: Lookup for %s on VPN DNS servers returned nothing." % host, file=stderr) + print(f"WARNING: Lookup for {host} on VPN DNS servers returned nothing.", file=stderr) else: if args.verbose: - print(" %s = %s" % (host, ', '.join(map(str, ips))), file=stderr) + print(f" {host} = {', '.join(map(str, ips))}", file=stderr) ip_routes.update(ips) if why == 'kdc': host_map.extend((ip, [host]) for ip in ips) @@ -315,35 +315,35 @@ def do_post_connect(env, args): if host_map: providers.hosts.write_hosts(host_map, args.name) if args.verbose: - print("Added hostnames and aliases for %d addresses to /etc/hosts." % len(host_map), file=stderr) + print(f"Added hostnames and aliases for {len(host_map)} addresses to /etc/hosts.", file=stderr) # add routes to hosts for ip in ip_routes: if args.verbose > 1: - print("Adding route to %s (for named hosts) through %s." % (ip, env.tundev), file=stderr) + print(f"Adding route to {ip} (for named hosts) through {env.tundev}.", file=stderr) providers.route.replace_route(ip, dev=env.tundev) else: providers.route.flush_cache() if args.verbose: - print("Added %d routes for named hosts." % len(ip_routes), file=stderr) + print(f"Added {len(ip_routes)} routes for named hosts.", file=stderr) # run DNS queries in background to prevent idle timeout if args.prevent_idle_timeout: dns = env.dns + env.dns6 idle_timeout = env.idle_timeout - setproctitle('vpn-slice --prevent-idle-timeout --name %s' % args.name) + setproctitle(f'vpn-slice --prevent-idle-timeout --name {args.name}') if args.verbose: - print("Continuing in background as PID %d, attempting to prevent idle timeout every %d seconds." % (providers.process.pid(), idle_timeout)) + print(f"Continuing in background as PID {providers.process.pid()}, attempting to prevent idle timeout every {idle_timeout} seconds.") while True: delay = randint(2 * idle_timeout // 3, 9 * idle_timeout // 10) if args.verbose > 1: - print("Sleeping %d seconds until we issue a DNS query to prevent idle timeout..." % delay, file=stderr) + print(f"Sleeping {delay} seconds until we issue a DNS query to prevent idle timeout...", file=stderr) sleep(delay) # FIXME: netlink(7) may be a much better way to poll here if not providers.process.is_alive(args.ppid): - print("Caller (PID %d) has terminated; idle preventer exiting." % args.ppid, file=stderr) + print(f"Caller (PID {args.ppid}) has terminated; idle preventer exiting.", file=stderr) break # pick random host or IP to look up without leaking any new information @@ -353,11 +353,11 @@ def do_post_connect(env, args): dummy = choice(pool) shuffle(dns) if args.verbose > 1: - print("Issuing DNS lookup of %s to prevent idle timeout..." % dummy, file=stderr) + print(f"Issuing DNS lookup of {dummy} to prevent idle timeout...", file=stderr) providers.dns.lookup_host(dummy, keep_going=False) elif args.verbose: - print("Connection setup done, child process %d exiting." % providers.process.pid()) + print(f"Connection setup done, child process {providers.process.pid()} exiting.") ######################################## @@ -397,7 +397,7 @@ def parse_env(environ=os.environ): if envar in environ: try: val = maker(environ[envar]) except Exception as e: - print('Exception while setting %s from environment variable %s=%r' % (var, envar, environ[envar]), file=stderr) + print(f'Exception while setting {var} from environment variable {envar}={environ[envar]!r}', file=stderr) raise elif default: val, = default else: val = None @@ -408,7 +408,7 @@ def parse_env(environ=os.environ): orig_netaddr = env.network env.network = IPv4Network(env.network).supernet(new_prefix=env.netmasklen) if env.network.network_address != orig_netaddr: - print("WARNING: IPv4 network %s/%d has host bits set, replacing with %s" % (orig_netaddr, env.netmasklen, env.network), file=stderr) + print(f"WARNING: IPv4 network {orig_netaddr}/{env.netmasklen} has host bits set, replacing with {env.network}", file=stderr) assert env.network.netmask == env.netmask, \ "IPv4 network (INTERNAL_IP4_{{NETADDR,NETMASK}}) {ad}/{nm} does not match INTERNAL_IP4_NETMASKLEN={nml} (implies /{nmi})".format( ad=orig_netaddr, nm=env.netmask, nml=env.netmasklen, nmi=env.network.netmask) @@ -435,9 +435,9 @@ def parse_env(environ=os.environ): env.splitexc = [] for pfx, n in chain((('INC', n) for n in range(env.nsplitinc)), (('EXC', n) for n in range(env.nsplitexc))): - ad = IPv4Address(environ['CISCO_SPLIT_%s_%d_ADDR' % (pfx, n)]) - nm = IPv4Address(environ['CISCO_SPLIT_%s_%d_MASK' % (pfx, n)]) - nml = int(environ['CISCO_SPLIT_%s_%d_MASKLEN' % (pfx, n)]) + ad = IPv4Address(environ[f'CISCO_SPLIT_{pfx}_{n}_ADDR']) + nm = IPv4Address(environ[f'CISCO_SPLIT_{pfx}_{n}_MASK']) + nml = int(environ[f'CISCO_SPLIT_{pfx}_{n}_MASKLEN']) net = IPv4Network(ad).supernet(new_prefix=nml) if net.network_address != ad: print("WARNING: IPv4 split network (CISCO_SPLIT_%s_%d_{ADDR,MASK}) %s/%d has host bits set, replacing with %s" % (pfx, n, ad, nml, net), file=stderr) @@ -448,8 +448,8 @@ def parse_env(environ=os.environ): for pfx, n in chain((('INC', n) for n in range(env.nsplitinc6)), (('EXC', n) for n in range(env.nsplitexc6))): - ad = IPv6Address(environ['CISCO_IPV6_SPLIT_%s_%d_ADDR' % (pfx, n)]) - nml = int(environ['CISCO_IPV6_SPLIT_%s_%d_MASKLEN' % (pfx, n)]) + ad = IPv6Address(environ[f'CISCO_IPV6_SPLIT_{pfx}_{n}_ADDR']) + nml = int(environ[f'CISCO_IPV6_SPLIT_{pfx}_{n}_MASKLEN']) net = IPv6Network(ad).supernet(new_prefix=nml) if net.network_address != ad: print("WARNING: IPv6 split network (CISCO_IPV6_SPLIT_%s_%d_{ADDR,MASKLEN}) %s/%d has host bits set, replacing with %s" % (pfx, n, ad, nml, net), file=stderr) @@ -554,7 +554,7 @@ def main(args=None, environ=os.environ): raise pv providers[pn] = pv() except Exception as e: - print("WARNING: Couldn't configure {} provider: {}".format(pn, e), file=stderr) + print(f"WARNING: Couldn't configure {pn} provider: {e}", file=stderr) # Fail if necessary providers are missing required = {'route', 'process'} @@ -574,7 +574,7 @@ def main(args=None, environ=os.environ): required.add('dns') missing_required = {p for p in required if p not in providers} if missing_required: - raise RuntimeError("Aborting because providers for %s are required; use --help for more information" % ' '.join(missing_required)) + raise RuntimeError(f"Aborting because providers for {' '.join(missing_required)} are required; use --help for more information") # Finalize arguments that depend on providers finalize_args_and_env(args, env) @@ -594,12 +594,12 @@ def main(args=None, environ=os.environ): raise SystemExit() if env.myaddr6 or env.netmask6: - print('WARNING: IPv6 address or netmask set. Support for IPv6 in %s should be considered BETA-QUALITY.' % p.prog, file=stderr) + print(f'WARNING: IPv6 address or netmask set. Support for IPv6 in {p.prog} should be considered BETA-QUALITY.', file=stderr) if args.dump: exe = providers.process.pid2exe(args.ppid) - caller = '%s (PID %d)' % (exe, args.ppid) if exe else 'PID %d' % args.ppid + caller = f'{exe} (PID {args.ppid})' if exe else f'PID {args.ppid}' - print('Called by %s with environment variables for vpnc-script:' % caller, file=stderr) + print(f'Called by {caller} with environment variables for vpnc-script:', file=stderr) width = max((len(envar) for var, envar, *rest in vpncenv if envar in environ), default=0) for var, envar, *rest in vpncenv: if envar in environ: @@ -642,7 +642,7 @@ def main(args=None, environ=os.environ): # https://github.com/dlenski/vpn-slice/pull/14#issuecomment-488129621 if args.verbose: - print('WARNING: %s ignores reason=%s' % (p.prog, env.reason.name), file=stderr) + print(f'WARNING: {p.prog} ignores reason={env.reason.name}', file=stderr) elif env.reason == reasons.connect: do_connect(env, args) diff --git a/vpn_slice/freebsd.py b/vpn_slice/freebsd.py index 6ea6add..03e9401 100644 --- a/vpn_slice/freebsd.py +++ b/vpn_slice/freebsd.py @@ -6,7 +6,7 @@ class ProcfsProvider(PosixProcessProvider): def pid2exe(self, pid): try: - return os.readlink('/proc/%d/file' % pid) + return os.readlink(f'/proc/{pid}/file') except OSError: return None @@ -14,6 +14,6 @@ def ppid_of(self, pid=None): if pid is None: return os.getppid() try: - return int(next(open('/proc/%d/status' % pid)).split()[3]) + return int(next(open(f'/proc/{pid}/status')).split()[3]) except (OSError, ValueError): return None diff --git a/vpn_slice/linux.py b/vpn_slice/linux.py index bfc8acd..fa3a325 100644 --- a/vpn_slice/linux.py +++ b/vpn_slice/linux.py @@ -10,7 +10,7 @@ class ProcfsProvider(PosixProcessProvider): def pid2exe(self, pid): try: - return os.readlink('/proc/%d/exe' % pid) + return os.readlink(f'/proc/{pid}/exe') except OSError: return None @@ -18,7 +18,7 @@ def ppid_of(self, pid=None): if pid is None: return os.getppid() try: - return int(next(open('/proc/%d/stat' % pid)).split()[3]) + return int(next(open(f'/proc/{pid}/stat')).split()[3]) except (OSError, ValueError): return None diff --git a/vpn_slice/mac.py b/vpn_slice/mac.py index 073b212..c9176b0 100644 --- a/vpn_slice/mac.py +++ b/vpn_slice/mac.py @@ -119,14 +119,14 @@ def configure_domain_vpn_dns(self, domains, nameservers): if not os.path.exists('/etc/resolver'): os.makedirs('/etc/resolver') for domain in domains: - resolver_file_name = "/etc/resolver/{0}".format(domain) + resolver_file_name = f"/etc/resolver/{domain}" with open(resolver_file_name, "w") as resolver_file: for nameserver in nameservers: - resolver_file.write("nameserver {}\n".format(nameserver)) + resolver_file.write(f"nameserver {nameserver}\n") def deconfigure_domain_vpn_dns(self, domains, nameservers): for domain in domains: - resolver_file_name = "/etc/resolver/{0}".format(domain) + resolver_file_name = f"/etc/resolver/{domain}" if os.path.exists(resolver_file_name): os.remove(resolver_file_name) if not len(os.listdir('/etc/resolver')): @@ -167,10 +167,10 @@ def configure_firewall(self, device): if not enable_token: print("WARNING: failed to get pf enable reference token, packet filter might not shutdown correctly") - anchor = '{}/{}'.format(self._PF_ANCHOR, device) + anchor = f'{self._PF_ANCHOR}/{device}' # add anchor to generate rules with with open(self._PF_CONF_FILE, 'a') as file: - file.write('anchor "{}" # vpn-slice-{} AUTOCREATED {}\n'.format(anchor, device, enable_token)) + file.write(f'anchor "{anchor}" # vpn-slice-{device} AUTOCREATED {enable_token}\n') # reload config file self._reload_conf() @@ -191,7 +191,7 @@ def configure_firewall(self, device): def deconfigure_firewall(self, device): # disable anchor - anchor = '{}/{}'.format(self._PF_ANCHOR, device) + anchor = f'{self._PF_ANCHOR}/{device}' subprocess.check_call([self.pfctl, '-a', anchor, '-F', 'all']) with open(self._PF_CONF_FILE, 'r') as file: diff --git a/vpn_slice/posix.py b/vpn_slice/posix.py index b64138f..ca267cd 100644 --- a/vpn_slice/posix.py +++ b/vpn_slice/posix.py @@ -20,7 +20,7 @@ def lookup_host(self, hostname, keep_going=True): search_domains = self.search_domains if not bind_addresses: - some_cls = [self.base_cl + ['@{!s}'.format(dns) for dns in dns_servers]] + some_cls = [self.base_cl + [f'@{dns!s}' for dns in dns_servers]] field_requests = [hostname, 'A', hostname, 'AAAA'] else: some_cls = [] @@ -32,7 +32,7 @@ def lookup_host(self, hostname, keep_going=True): field_requests.extend([hostname, ('AAAA' if bind.version == 6 else 'A')]) # We can only do a lookup via DNS-over-IPv[X] if we have an IPv[X] address to bind to. - matching_dns = ['@{!s}'.format(dns) for dns in dns_servers if dns.version == bind.version] + matching_dns = [f'@{dns!s}' for dns in dns_servers if dns.version == bind.version] if matching_dns: some_cls.append(self.base_cl + ['-b', str(bind)] + matching_dns) @@ -43,7 +43,7 @@ def lookup_host(self, hostname, keep_going=True): all_cls = [] if search_domains: for cl in some_cls: - all_cls.extend(cl + ['+domain={!s}'.format(sd)] + field_requests for sd in search_domains) + all_cls.extend(cl + [f'+domain={sd!s}'] + field_requests for sd in search_domains) else: for cl in some_cls: all_cls.extend([cl + field_requests]) @@ -71,10 +71,10 @@ def lookup_srv(self, query): bind_addresses = self.bind_addresses if not bind_addresses: - all_cls = [self.base_cl + ['@{!s}'.format(dns) for dns in dns_servers] + [query, 'SRV']] + all_cls = [self.base_cl + [f'@{dns!s}' for dns in dns_servers] + [query, 'SRV']] else: all_cls = [self.base_cl + ['-b', str(bind)] + - ['@{!s}'.format(n) for n in dns_servers if n.version == bind.version] + + [f'@{n!s}' for n in dns_servers if n.version == bind.version] + [query, 'SRV'] for bind in bind_addresses] result = set() @@ -95,18 +95,18 @@ class HostsFileProvider(HostsProvider): def __init__(self, path): self.path = path if not os.access(path, os.R_OK | os.W_OK): - raise OSError('Cannot read/write {}'.format(path)) + raise OSError(f'Cannot read/write {path}') def write_hosts(self, host_map, name): - tag = 'vpn-slice-{} AUTOCREATED'.format(name) + tag = f'vpn-slice-{name} AUTOCREATED' with open(self.path, 'r+') as hostf: fcntl.flock(hostf, fcntl.LOCK_EX) # POSIX only, obviously lines = hostf.readlines() - keeplines = [l for l in lines if not l.endswith('# %s\n' % tag)] + keeplines = [l for l in lines if not l.endswith(f'# {tag}\n')] hostf.seek(0, 0) hostf.writelines(keeplines) for ip, names in host_map: - print('%s %s\t\t# %s' % (ip, ' '.join(names), tag), file=hostf) + print(f"{ip} {' '.join(names)}\t\t# {tag}", file=hostf) hostf.truncate() return len(host_map) or len(lines) - len(keeplines) diff --git a/vpn_slice/util.py b/vpn_slice/util.py index 7100d61..7af65fe 100644 --- a/vpn_slice/util.py +++ b/vpn_slice/util.py @@ -6,7 +6,7 @@ def get_executable(path): path = which(os.path.basename(path)) or path if not os.access(path, os.X_OK): - raise OSError('cannot execute {}'.format(path)) + raise OSError(f'cannot execute {path}') return path