Skip to content

Commit

Permalink
Update code
Browse files Browse the repository at this point in the history
  • Loading branch information
fepitre committed Jan 30, 2024
1 parent 2cc9d04 commit 47443c8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 47 deletions.
2 changes: 2 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ disable=
duplicate-code,
fixme,
locally-disabled,
missing-function-docstring,
missing-class-docstring

[REPORTS]

Expand Down
87 changes: 40 additions & 47 deletions qubesbridgedevice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@

"""qubes-core-admin extension for handling Bridge Device"""

import qubes.ext
import qubesdb
import asyncio
import ipaddress
import random
import re
import lxml
import string
import random
import ipaddress
import asyncio

import lxml
import qubes.ext

name_re = re.compile(r"^[a-z0-9-]{1,12}$")


def rand_mac():
return "00:16:3e:%02x:%02x:%02x" % (
random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255),
return (
f"00:16:3e:"
f"{random.randint(0, 255):02x}:"
f"{random.randint(0, 255):02x}:"
f"{random.randint(0, 255):02x}"
)


Expand All @@ -57,38 +58,36 @@ def check_ip(ip):
def get_netmask_from_prefix(prefix):
try:
network_conf = ipaddress.IPv4Interface("0.0.0.0" + "/" + prefix)
except ipaddress.NetmaskValueError:
raise qubes.exc.QubesValueError("Invalid prefix: " + prefix)
except ipaddress.NetmaskValueError as e:
raise qubes.exc.QubesValueError("Invalid prefix: " + prefix) from e

return str(network_conf.network.netmask)


def get_prefix_from_netmask(netmask):
try:
network_conf = ipaddress.IPv4Interface("0.0.0.0" + "/" + netmask)
except ipaddress.NetmaskValueError:
raise qubes.exc.QubesValueError("Invalid netmask: " + netmask)
except ipaddress.NetmaskValueError as e:
raise qubes.exc.QubesValueError("Invalid netmask: " + netmask) from e

return str(network_conf.network.prefixlen)


def get_subnet(ip, netmask):
try:
network_conf = ipaddress.IPv4Interface(ip + "/" + netmask)
except (ipaddress.AddressValueError, ipaddress.NetmaskValueError):
except (ipaddress.AddressValueError, ipaddress.NetmaskValueError) as e:
raise qubes.exc.QubesValueError(
"Invalid ip/netmask: " + ip + "/" + netmask
)
) from e

return str(network_conf.network.network_address)


class BridgeDevice(qubes.devices.DeviceInfo):
# pylint: disable=too-few-public-methods
def __init__(self, backend_domain, ident):
super(BridgeDevice, self).__init__(
backend_domain=backend_domain, ident=ident
)
super().__init__(backend_domain=backend_domain, ident=ident)
self._description = None

@property
Expand All @@ -102,7 +101,7 @@ def description(self):
for c in string.ascii_letters + string.digits + "()+,-.:=_/ "
}
untrusted_desc = self.backend_domain.untrusted_qdb.read(
"/qubes-bridge-devices/{}/desc".format(self.ident)
f"/qubes-bridge-devices/{self.ident}/desc"
)
if not untrusted_desc:
return ""
Expand All @@ -114,7 +113,7 @@ def description(self):


class BridgeDeviceExtension(qubes.ext.Extension):
# pylint: disable=unused-argument,no-self-use,unused-variable
# pylint: disable=unused-argument,unused-variable
@qubes.ext.handler("domain-init", "domain-load")
def on_domain_init_load(self, vm, event):
"""Initialize watching for changes"""
Expand All @@ -140,11 +139,9 @@ def on_device_list_bridge(self, vm, event):

for untrusted_ident in untrusted_idents:
if not name_re.match(untrusted_ident):
msg = (
"%s vm's device path name contains unsafe characters. "
"Skipping it."
)
vm.log.warning(msg % vm.name)
msg = (f"{vm.name} vm's device path name contains"
f" unsafe characters. Skipping it.")
vm.log.warning(msg)
continue

ident = untrusted_ident
Expand Down Expand Up @@ -213,7 +210,7 @@ def on_device_list_attached(self, vm, event, **kwargs):
if gateway:
options["gateway"] = gateway

yield (BridgeDevice(backend_domain, ident), options)
yield BridgeDevice(backend_domain, ident), options

@qubes.ext.handler("device-pre-attach:bridge")
def on_device_pre_attach_bridge(self, vm, event, device, options):
Expand All @@ -231,15 +228,14 @@ def on_device_pre_attach_bridge(self, vm, event, device, options):
)
else:
raise qubes.exc.QubesValueError(
"Unsupported option {}".format(option)
f"Unsupported option '{option}'"
)

if not device.backend_domain.is_running():
raise qubes.exc.QubesVMNotRunningError(
device.backend_domain,
"Domain {} needs to be running to attach device from it".format(
device.backend_domain.name
),
f"Domain {device.backend_domain.name} needs"
f" to be running to attach device from it",
)

if "mac" not in options:
Expand Down Expand Up @@ -282,39 +278,37 @@ def on_device_detach_bridge(self, vm, event, device):
break

@qubes.ext.handler("domain-pre-start")
@asyncio.coroutine
def on_domain_pre_start(self, vm, event, start_guid, **kwargs):
async def on_domain_pre_start(self, vm, event, start_guid, **kwargs):
for bridge in vm.devices["bridge"].assignments():
backenddomain = None
try:
backenddomain = vm.app.domains[bridge.backend_domain.name]
except KeyError:
msg = (
"Cannot find backend domain '%s'"
% bridge.backend_domain.name
f"Cannot find backend domain '{bridge.backend_domain.name}'"
)
vm.log.error(msg)

if backenddomain.qid != 0:
if backenddomain and backenddomain.qid != 0:
if not backenddomain.is_running():
yield from backenddomain.start(
await backenddomain.start(
start_guid=start_guid, notify_function=None
)

wait_count = 0
vm.log.info(
"Waiting for {}:{} being available".format(
bridge.backend_domain.name, bridge.ident
)
f"Waiting for {bridge.backend_domain.name}:"
f"{bridge.ident} being available"
)
while not self.device_get(backenddomain, bridge.ident):
wait_count += 1
if wait_count > 60:
vm.log.error(
"Timeout while waiting for {}"
" to be available".format(bridge.ident)
f"Timeout while waiting for {bridge.ident}"
f" to be available"
)
continue
yield from asyncio.sleep(0.1)
await asyncio.sleep(0.1)

@qubes.ext.handler("domain-spawn")
def on_domain_spawn(self, vm, event, start_guid, **kwargs):
Expand All @@ -340,9 +334,8 @@ def on_domain_pre_shutdown(self, vm, event, **kwargs):
if attached_vms and not kwargs.get("force", False):
raise qubes.exc.QubesVMError(
self,
"There are bridges attached to this VM: {}".format(
", ".join(vm.name for vm in attached_vms)
),
f"There are bridges attached to this VM:"
f" {', '.join(vm.name for vm in attached_vms)}",
)

@staticmethod
Expand All @@ -354,7 +347,7 @@ def device_get(vm, ident):
:returns BridgeDevice"""

untrusted_qubes_device_attrs = vm.untrusted_qdb.list(
"/qubes-bridge-devices/{}/".format(ident)
f"/qubes-bridge-devices/{ident}/"
)
if not untrusted_qubes_device_attrs:
return None
Expand Down

0 comments on commit 47443c8

Please sign in to comment.