Skip to content

Commit

Permalink
apply lint changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahad-10 committed Aug 2, 2024
1 parent e12be4b commit 3fbc76f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 39 deletions.
6 changes: 3 additions & 3 deletions wireguard/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ def manage_menu():

choice = input("Choose an option: ")

if choice == '1':
if choice == "1":
backend.new_client()
elif choice == '2':
elif choice == "2":
backend.list_clients()
elif choice == '3':
elif choice == "3":
break
else:
print("Invalid choice. Please try again.")
Expand Down
77 changes: 41 additions & 36 deletions wireguard/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import subprocess

CONFIG_FILE = 'config.json'
CONFIG_FILE = "config.json"


def is_root():
Expand All @@ -17,7 +17,7 @@ def get_home_dir(client_name):
if os.path.exists(home_dir):
return home_dir

sudo_user = os.getenv('SUDO_USER', None)
sudo_user = os.getenv("SUDO_USER", None)
if sudo_user is not None:
if sudo_user == "root":
return "/root"
Expand All @@ -33,17 +33,17 @@ def create_config():
print()

server_pub_ip = subprocess.check_output(["ip", "-4", "addr"]).decode().strip()
server_pub_ip = re.search(r'inet (\S+)/\d+ scope global', server_pub_ip)
server_pub_ip = re.search(r"inet (\S+)/\d+ scope global", server_pub_ip)
if server_pub_ip:
server_pub_ip = server_pub_ip.group(1)
else:
server_pub_ip = subprocess.check_output(["ip", "-6", "addr"]).decode().strip()
server_pub_ip = re.search(r'inet6 (\S+)/', server_pub_ip).group(1)
server_pub_ip = re.search(r"inet6 (\S+)/", server_pub_ip).group(1)

server_pub_ip = input(f"IPv4 or IPv6 public address: [{server_pub_ip}] ") or server_pub_ip

server_pub_nic = subprocess.check_output(["ip", "-4", "route", "ls"]).decode().strip()
server_pub_nic = re.search(r'dev (\S+)', server_pub_nic).group(1)
server_pub_nic = re.search(r"dev (\S+)", server_pub_nic).group(1)
server_pub_nic = input(f"Public interface: [{server_pub_nic}] ") or server_pub_nic

server_wg_nic = input("WireGuard interface name: [wg0] ") or "wg0"
Expand All @@ -57,8 +57,10 @@ def create_config():
client_dns_1 = input("First DNS resolver to use for the clients: [1.1.1.1] ") or "1.1.1.1"
client_dns_2 = input("Second DNS resolver to use for the clients (optional): [1.0.0.1] ") or "1.0.0.1"

allowed_ips = input(
"Allowed IPs list for generated clients (leave default to route everything): [0.0.0.0/0,::/0] ") or "0.0.0.0/0,::/0"
allowed_ips = (
input("Allowed IPs list for generated clients (leave default to route everything): [0.0.0.0/0,::/0] ")
or "0.0.0.0/0,::/0"
)

config = {
"server_pub_ip": server_pub_ip,
Expand All @@ -69,50 +71,53 @@ def create_config():
"server_port": server_port,
"client_dns_1": client_dns_1,
"client_dns_2": client_dns_2,
"allowed_ips": allowed_ips
"allowed_ips": allowed_ips,
}

with open(CONFIG_FILE, "w") as f:
f.write(json.dumps(config))


def read_config():
with open(CONFIG_FILE, 'r') as f:
with open(CONFIG_FILE, "r") as f:
return json.loads(f.read())


def generate_keys():
private_key = subprocess.check_output(["wg", "genkey"]).strip().decode('utf-8')
public_key = subprocess.check_output(["wg", "pubkey"], input=private_key.encode()).strip().decode('utf-8')
private_key = subprocess.check_output(["wg", "genkey"]).strip().decode("utf-8")
public_key = subprocess.check_output(["wg", "pubkey"], input=private_key.encode()).strip().decode("utf-8")
return private_key, public_key


def get_client_name(server_wg_nic):
while True:
client_name = input("Client name: ")
if re.match(r'^[a-zA-Z0-9_-]+$', client_name) and len(client_name) < 16:
if re.match(r"^[a-zA-Z0-9_-]+$", client_name) and len(client_name) < 16:
result = subprocess.run(
['grep', '-c', f'^### Client {client_name}$', f'/etc/wireguard/{server_wg_nic}.conf'],
capture_output=True, text=True)
if result.stdout.strip() == '0':
["grep", "-c", f"^### Client {client_name}$", f"/etc/wireguard/{server_wg_nic}.conf"],
capture_output=True,
text=True,
)
if result.stdout.strip() == "0":
return client_name
print("A client with the specified name was already created, please choose another name.")
else:
print(
"The client name must consist of alphanumeric characters, underscores, or dashes and can't exceed 15 chars.")
"The client name must consist of alphanumeric characters, "
"underscores, or dashes and can't exceed 15 chars."
)


def find_available_ip(base_ip, conf_file):
for dot_ip in range(2, 255):
result = subprocess.run(['grep', '-c', f'{base_ip[:-1]}{dot_ip}', conf_file], capture_output=True,
text=True)
if result.stdout.strip() == '0':
result = subprocess.run(["grep", "-c", f"{base_ip[:-1]}{dot_ip}", conf_file], capture_output=True, text=True)
if result.stdout.strip() == "0":
return dot_ip
raise RuntimeError("The subnet configured supports only 253 clients.")


def generate_preshared_key():
preshared_key = subprocess.check_output(["wg", "genpsk"]).strip().decode('utf-8')
preshared_key = subprocess.check_output(["wg", "genpsk"]).strip().decode("utf-8")
return preshared_key


Expand All @@ -125,27 +130,27 @@ def new_client():
config = read_config()

server_pub_ip = config["server_pub_ip"]
server_port = config['server_port']
server_wg_nic = config['server_wg_nic']
server_wg_ipv4 = config['server_wg_ipv4']
server_wg_ipv6 = config['server_wg_ipv6']
client_dns_1 = config['client_dns_1']
client_dns_2 = config['client_dns_2']
server_port = config["server_port"]
server_wg_nic = config["server_wg_nic"]
server_wg_ipv4 = config["server_wg_ipv4"]
server_wg_ipv6 = config["server_wg_ipv6"]
client_dns_1 = config["client_dns_1"]
client_dns_2 = config["client_dns_2"]
private_key, public_key = generate_keys()
server_pub_key = public_key
allowed_ips = config['allowed_ips']
allowed_ips = config["allowed_ips"]

if ':' in server_pub_ip and not (server_pub_ip.startswith('[') and server_pub_ip.endswith(']')):
if ":" in server_pub_ip and not (server_pub_ip.startswith("[") and server_pub_ip.endswith("]")):
server_pub_ip = f"[{server_pub_ip}]"

endpoint = f"{server_pub_ip}:{server_port}"

client_name = get_client_name(server_wg_nic)

dot_ip = find_available_ip(server_wg_ipv4, f'/etc/wireguard/{server_wg_nic}.conf')
dot_ip = find_available_ip(server_wg_ipv4, f"/etc/wireguard/{server_wg_nic}.conf")
client_wg_ipv4 = f"{server_wg_ipv4.rsplit('.', 1)[0]}.{dot_ip}"

dot_ip = find_available_ip(server_wg_ipv6, f'/etc/wireguard/{server_wg_nic}.conf')
dot_ip = find_available_ip(server_wg_ipv6, f"/etc/wireguard/{server_wg_nic}.conf")
client_wg_ipv6 = f"{server_wg_ipv6.split('::')[0]}::{dot_ip}"

client_priv_key = private_key
Expand All @@ -168,7 +173,7 @@ def new_client():
"""
client_dir = os.path.join(home_dir, f"{client_name}.conf")

with open(client_dir, 'w') as f:
with open(client_dir, "w") as f:
f.write(client_conf)

server_conf = f"""
Expand All @@ -178,10 +183,10 @@ def new_client():
PresharedKey = {client_pre_shared_key}
AllowedIPs = {client_wg_ipv4}/32,{client_wg_ipv6}/128
"""
with open(f"/etc/wireguard/{server_wg_nic}.conf", 'a') as f:
with open(f"/etc/wireguard/{server_wg_nic}.conf", "a") as f:
f.write(server_conf)

subprocess.run(['wg', 'syncconf', server_wg_nic, f"<(wg-quick strip {server_wg_nic})"], shell=True)
subprocess.run(["wg", "syncconf", server_wg_nic, f"<(wg-quick strip {server_wg_nic})"], shell=True)

qr_file = f"{client_dir}.png"
subprocess.run(["qrencode", "-o", qr_file, "-t", "png", server_conf], check=True)
Expand All @@ -193,7 +198,7 @@ def new_client():
def list_clients():
is_root()

wg_dir = '/etc/wireguard'
wg_dir = "/etc/wireguard"

if not os.path.exists(wg_dir):
print("WireGuard directory does not exist.")
Expand All @@ -208,13 +213,13 @@ def list_clients():
config_file = f"{wg_dir}/{server_wg_nic}.conf"

try:
with open(config_file, 'r') as file:
with open(config_file, "r") as file:
lines = file.readlines()
except FileNotFoundError:
print(f"Configuration file {config_file} not found.")
sys.exit(1)

clients = [line.split(' ')[2].strip() for line in lines if line.startswith('### Client')]
clients = [line.split(" ")[2].strip() for line in lines if line.startswith("### Client")]

if len(clients) == 0:
print("\nYou have no existing clients!")
Expand Down

0 comments on commit 3fbc76f

Please sign in to comment.