Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) Rpi arm64 #61

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
10 changes: 3 additions & 7 deletions confconsole.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,17 @@
import os
import sys
import dialog
import ipaddr
from string import Template

import ifutil
import netinfo
import getopt

import conf

from io import StringIO
import traceback
import subprocess
from subprocess import PIPE, CalledProcessError
import shlex

import netinfo

from libconfconsole import ipaddr, ifutil, conf
import plugin

PLUGIN_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)),
Expand Down
11 changes: 6 additions & 5 deletions debian/confconsole.install
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
plugins.d/ /usr/lib/confconsole/
*.py /usr/lib/confconsole/
conf/* /etc/confconsole/
share/* /usr/share/confconsole/
add-water/* /lib/systemd/system
plugins.d/ /usr/lib/confconsole/
*.py /usr/lib/confconsole/
libconfconsole/ /usr/lib/python3/dist-packages/
conf/* /etc/confconsole/
share/* /usr/share/confconsole/
add-water/* /lib/systemd/system
Empty file added libconfconsole/__init__.py
Empty file.
File renamed without changes.
151 changes: 89 additions & 62 deletions ifutil.py → libconfconsole/ifutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

import os
from time import sleep

import subprocess
from typing import Optional, Callable

from netinfo import InterfaceInfo
from netinfo import get_hostname

Expand All @@ -27,8 +28,8 @@ class EtcNetworkInterfaces:
def __init__(self):
self.read_conf()

def read_conf(self):
self.conf = {}
def read_conf(self) -> None:
self.conf: dict[str, str] = {}
self.unconfigured = False

ifname = None
Expand All @@ -42,13 +43,13 @@ def read_conf(self):
if not line or line.startswith("#"):
continue

if line.startswith("auto"):
if line.startswith("auto") or line.startswith("allow-hotplug"):
ifname = line.split()[1]
self.conf[ifname] = line + "\n"
elif ifname:
self.conf[ifname] += line + "\n"

def _get_iface_opts(self, ifname):
def _get_iface_opts(self, ifname: str) -> list[str]:
iface_opts = ('pre-up', 'up', 'post-up',
'pre-down', 'down', 'post-down')
if ifname not in self.conf:
Expand All @@ -59,7 +60,7 @@ def _get_iface_opts(self, ifname):
for line in ifconf.splitlines()
if line.strip().split()[0] in iface_opts]

def _get_bridge_opts(self, ifname):
def _get_bridge_opts(self, ifname: str) -> list:
bridge_opts = ('bridge_ports', 'bridge_ageing', 'bridge_bridgeprio',
'bridge_fd', 'bridge_gcinit', 'bridge_hello',
'bridge_hw', 'bridge_maxage', 'bridge_maxwait',
Expand All @@ -73,7 +74,7 @@ def _get_bridge_opts(self, ifname):
for line in ifconf.splitlines()
if line.strip().split()[0] in bridge_opts]

def write_conf(self, ifname, ifconf):
def write_conf(self, ifname: str, ifconf: str) -> None:
self.read_conf()
if not self.unconfigured:
raise IfError(f"refusing to write to {self.CONF_FILE}\n"
Expand All @@ -89,58 +90,79 @@ def write_conf(self, ifname, ifconf):

with open(self.CONF_FILE, "w") as fob:
fob.write(self.HEADER_UNCONFIGURED+'\n')
fob.write("# remove the above line if you edit this file\n\n")

fob.write("auto lo\n")
fob.write("iface lo inet loopback\n\n")

fob.write(ifconf+'\n')

for c in self.conf:
if c in ('lo', ifname):
continue

fob.write(self.conf[c] + '\n')

def set_dhcp(self, ifname):
fob.write("# remove the above line if you edit this file")

for iface in self.conf.keys():
if iface:
fob.write('\n\n')
if iface == ifname:
fob.writelines(ifconf.rstrip())
else:
fob.writelines(self.conf[iface].rstrip())
fob.write('\n')

@staticmethod
def _preproc_if(ifname_conf: str) -> list[str]:
lines = ifname_conf.splitlines()
if len(lines) == 2:
return lines
new_lines = []
hostname = get_hostname()
ifconf = [f"auto {ifname}\niface {ifname} inet dhcp"]

if hostname:
ifconf.append(f" hostname {hostname}")
for line in lines:
_line = line.lstrip()
if (_line.startswith('allow-hotplug')
or _line.startswith('auto')
or _line.startswith('iface')
or _line.startswith('wpa-conf')):
new_lines.append(line)
elif _line.startswith('hostname'):
if hostname:
new_lines.append(f' hostname {hostname}')
else:
continue
elif (_line.startswith('address')
or _line.startswith('netmask')
or _line.startswith('gateway')
or _line.startswith('dns-nameserver')):
continue
else:
raise IfError(f'Unexpected config line: {line}')
return new_lines

ifconf = "\n".join(ifconf)
self.write_conf(ifname, ifconf)
def set_dhcp(self, ifname: str) -> None:
ifconf = self._preproc_if(self.conf[ifname])
ifconf[1] = f'iface {ifname} inet dhcp'

def set_manual(self, ifname):
ifconf = f"auto {ifname}\niface {ifname} inet manual"
self.write_conf(ifname, ifconf)
ifconf_str = "\n".join(ifconf)
self.write_conf(ifname, ifconf_str)

def set_static(self, ifname, addr, netmask, gateway=None, nameservers=[]):
hostname = get_hostname()
ifconf = [f"auto {ifname}",
f"iface {ifname} inet static"]
def set_manual(self, ifname: str) -> None:
ifconf = self._preproc_if(self.conf[ifname])
ifconf[1] = f'iface {ifname} inet manual'
ifconf_str = "\n".join(ifconf)
self.write_conf(ifname, ifconf_str)

if hostname:
ifconf.append(f" hostname {hostname}")
def set_static(self, ifname: str, addr: str, netmask: str,
gateway: str = None, nameservers: list = None
) -> None:
ifconf = self._preproc_if(self.conf[ifname])
ifconf[1] = f'iface {ifname} inet static'

ifconf.extend([f" address {addr}",
f" netmask {netmask}"])

if gateway:
ifconf.append(f" gateway {gateway}")

if nameservers:
ifconf.append(f" dns-nameservers {' '.join(nameservers)}")

ifconf = "\n".join(ifconf)
self.write_conf(ifname, ifconf)
ifconf_str = "\n".join(ifconf)
self.write_conf(ifname, ifconf_str)


class EtcNetworkInterface:
"""enumerate interface information from /etc/network/interfaces"""

def __init__(self, ifname):
def __init__(self, ifname: str):
self.ifname = ifname

interfaces = EtcNetworkInterfaces()
Expand All @@ -149,7 +171,7 @@ def __init__(self, ifname):
if ifname in interfaces.conf:
self.conflines = interfaces.conf[ifname].splitlines()

def _parse_attr(self, attr):
def _parse_attr(self, attr: str) -> list[str]:
for line in self.conflines:

vals = line.strip().split()
Expand All @@ -172,7 +194,7 @@ def method(self):
def dns_nameservers(self):
return self._parse_attr('dns-nameservers')[1:]

def __getattr__(self, attrname):
def __getattr__(self, attrname: str) -> list[str]:
# attributes with multiple values will be returned in an array
# exception: dns-nameservers always returns in array (expected)

Expand All @@ -181,19 +203,19 @@ def __getattr__(self, attrname):
if len(values) > 2:
return values[1:]
elif len(values) > 1:
return values[1]
return [values[1]]

return
return []


def get_nameservers(ifname):
def get_nameservers(ifname: str) -> list[str]:

# /etc/network/interfaces (static)
interface = EtcNetworkInterface(ifname)
if interface.dns_nameservers:
return interface.dns_nameservers

def parse_resolv(path):
def parse_resolv(path: str) -> list[str]:
nameservers = []
with open(path, 'r') as fob:
for line in fob:
Expand All @@ -220,26 +242,29 @@ def parse_resolv(path):
return []


def ifup(ifname):
return subprocess.check_output(["ifup", ifname])
def ifup(ifname: str) -> str:
return subprocess.check_output(["ifup", ifname], text=True)


def ifdown(ifname):
return subprocess.check_output(["ifdown", ifname])
def ifdown(ifname: str) -> str:
return subprocess.check_output(["ifdown", ifname], text=True)


def unconfigure_if(ifname):
def unconfigure_if(ifname: str) -> Optional[str]:
try:
ifdown(ifname)
interfaces = EtcNetworkInterfaces()
interfaces.set_manual(ifname)
subprocess.check_output(['ifconfig', ifname, '0.0.0.0'])
ifup(ifname)
return None
except subprocess.CalledProcessError as e:
return str(e)


def set_static(ifname, addr, netmask, gateway, nameservers):
def set_static(ifname: str, addr: str, netmask: str,
gateway: str, nameservers: list[str]
) -> Optional[str]:
try:
ifdown(ifname)
interfaces = EtcNetworkInterfaces()
Expand All @@ -252,13 +277,13 @@ def set_static(ifname, addr, netmask, gateway, nameservers):

net = InterfaceInfo(ifname)
if not net.addr:
raise IfError('Error obtaining IP address\n\n%s' % output)

raise IfError(f'Error obtaining IP address\n\n{output}')
return None
except Exception as e:
return str(e)


def set_dhcp(ifname):
def set_dhcp(ifname: str) -> Optional[str]:
try:
ifdown(ifname)
interfaces = EtcNetworkInterfaces()
Expand All @@ -267,18 +292,20 @@ def set_dhcp(ifname):

net = InterfaceInfo(ifname)
if not net.addr:
raise IfError('Error obtaining IP address\n\n%s' % output)

raise IfError(f'Error obtaining IP address\n\n{output}')
return None
except Exception as e:
return str(e)


def get_ipconf(ifname, error=False):
def get_ipconf(ifname: str, error: bool = False
) -> tuple[Callable[[], Optional[str]],
Optional[str], Optional[str], list[str]]:
net = InterfaceInfo(ifname)
return (net.addr, net.netmask,
net.get_gateway(error), get_nameservers(ifname))
return (net.addr, net.netmask, net.get_gateway(error),
get_nameservers(ifname))


def get_ifmethod(ifname):
def get_ifmethod(ifname: str) -> str:
interface = EtcNetworkInterface(ifname)
return interface.method
Loading