Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocol/driver: add DigitalInputProtocol, GpioDigitalInputDriver #1458

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2174,9 +2174,33 @@ Implements:
Arguments:
- delay (float, default=2.0): delay in seconds between off and on

GpioDigitalInputDriver
~~~~~~~~~~~~~~~~~~~~~~
The :any:`GpioDigitalInputDriver` reads a digital signal from a GPIO line.

This driver configures GPIO lines via `the sysfs kernel interface <https://www.kernel.org/doc/html/latest/gpio/sysfs.html>`
as an input.


Binds to:
gpio:
- `SysfsGPIO`_
- `MatchedSysfsGPIO`_
- NetworkSysfsGPIO

Implements:
- :any:`DigitalInputProtocol`

.. code-block:: yaml

GpioDigitalInputDriver: {}

Arguments:
- None

GpioDigitalOutputDriver
~~~~~~~~~~~~~~~~~~~~~~~
The :any:`GpioDigitalOutputDriver` writes a digital signal to a GPIO line.
The :any:`GpioDigitalOutputDriver` reads and writes a digital signal from and to a GPIO line.

This driver configures GPIO lines via `the sysfs kernel interface <https://www.kernel.org/doc/html/latest/gpio/sysfs.html>`.
While the driver automatically exports the GPIO, it does not configure it in any other way than as an output.
Expand Down
2 changes: 1 addition & 1 deletion labgrid/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .sigrokdriver import SigrokDriver, SigrokPowerDriver, SigrokDmmDriver
from .usbstoragedriver import USBStorageDriver, NetworkUSBStorageDriver, Mode
from .resetdriver import DigitalOutputResetDriver
from .gpiodriver import GpioDigitalOutputDriver
from .gpiodriver import GpioDigitalInputDriver, GpioDigitalOutputDriver
from .filedigitaloutput import FileDigitalOutputDriver
from .serialdigitaloutput import SerialPortDigitalOutputDriver
from .xenadriver import XenaDriver
Expand Down
33 changes: 32 additions & 1 deletion labgrid/driver/gpiodriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,44 @@
import attr

from ..factory import target_factory
from ..protocol import DigitalOutputProtocol
from ..protocol import DigitalInputProtocol, DigitalOutputProtocol
from ..resource.remote import NetworkSysfsGPIO
from ..step import step
from .common import Driver
from ..util.agentwrapper import AgentWrapper


@target_factory.reg_driver
@attr.s(eq=False)
class GpioDigitalInputDriver(Driver, DigitalInputProtocol):

bindings = {
"gpio": {"SysfsGPIO", "NetworkSysfsGPIO"},
}

def __attrs_post_init__(self):
super().__attrs_post_init__()
self.wrapper = None

def on_activate(self):
if isinstance(self.gpio, NetworkSysfsGPIO):
host = self.gpio.host
else:
host = None
self.wrapper = AgentWrapper(host)
self.proxy = self.wrapper.load('sysfsgpioin')

def on_deactivate(self):
self.wrapper.close()
self.wrapper = None
self.proxy = None

@Driver.check_active
@step(result=True)
def get(self):
return self.proxy.get(self.gpio.index)


@target_factory.reg_driver
@attr.s(eq=False)
class GpioDigitalOutputDriver(Driver, DigitalOutputProtocol):
Expand Down
1 change: 1 addition & 0 deletions labgrid/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .filetransferprotocol import FileTransferProtocol
from .infoprotocol import InfoProtocol
from .digitaloutputprotocol import DigitalOutputProtocol
from .digitalinputprotocol import DigitalInputProtocol
from .mmioprotocol import MMIOProtocol
from .filesystemprotocol import FileSystemProtocol
from .resetprotocol import ResetProtocol
Expand Down
10 changes: 10 additions & 0 deletions labgrid/protocol/digitalinputprotocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import abc


class DigitalInputProtocol(abc.ABC):
"""Abstract class providing the DigitalInputProtocol interface"""

@abc.abstractmethod
def get(self):
"""Implementations should return the status of the digital input."""
raise NotImplementedError
11 changes: 4 additions & 7 deletions labgrid/protocol/digitaloutputprotocol.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import abc
from .digitalinputprotocol import DigitalInputProtocol


class DigitalOutputProtocol(abc.ABC):
"""Abstract class providing the DigitalOutputProtocol interface"""

@abc.abstractmethod
def get(self):
"""Implementations should return the status of the digital output."""
raise NotImplementedError
class DigitalOutputProtocol(DigitalInputProtocol):
"""Abstract class providing the DigitalOutputProtocol interface.
Implies that the set output can be read as well, so requires DigitalInputProtocol"""

@abc.abstractmethod
def set(self, status):
Expand Down
72 changes: 72 additions & 0 deletions labgrid/util/agents/sysfsgpioin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
This module implements reading GPIOs via sysfs GPIO kernel interface.

Takes an integer property 'index' which refers to the already exported GPIO device.

"""
import logging
import os

class GpioDigitalInput:
_gpio_sysfs_path_prefix = '/sys/class/gpio'

@staticmethod
def _assert_gpio_line_is_exported(index):
gpio_sysfs_path = os.path.join(GpioDigitalInput._gpio_sysfs_path_prefix,
f'gpio{index}')
# Deprecated: the exporter can export on acquire, we are leaving this
# in for now to support exporters which have not been updated yet.
if not os.path.exists(gpio_sysfs_path):
export_sysfs_path = os.path.join(GpioDigitalInput._gpio_sysfs_path_prefix, 'export')
with open(export_sysfs_path, mode='wb') as export:
export.write(str(index).encode('utf-8'))
if not os.path.exists(gpio_sysfs_path):
raise ValueError("Device not found")

def __init__(self, index):
self._logger = logging.getLogger("Device: ")
GpioDigitalInput._assert_gpio_line_is_exported(index)
gpio_sysfs_path = os.path.join(GpioDigitalInput._gpio_sysfs_path_prefix,
f'gpio{index}')

gpio_sysfs_direction_path = os.path.join(gpio_sysfs_path, 'direction')
with open(gpio_sysfs_direction_path, 'rb') as direction_fd:
literal_value = direction_fd.read(2)
if literal_value != b"in":
self._logger.debug("Configuring GPIO %d as input.", index)
with open(gpio_sysfs_direction_path, 'wb') as direction_fd:
direction_fd.write(b'in')

gpio_sysfs_value_path = os.path.join(gpio_sysfs_path, 'value')
self.gpio_sysfs_value_fd = os.open(gpio_sysfs_value_path, flags=(os.O_RDONLY | os.O_SYNC))

def __del__(self):
os.close(self.gpio_sysfs_value_fd)
self.gpio_sysfs_value_fd = None

def get(self):
os.lseek(self.gpio_sysfs_value_fd, 0, os.SEEK_SET)
literal_value = os.read(self.gpio_sysfs_value_fd, 1)
if literal_value == b'0':
return False
elif literal_value == b'1':
return True
raise ValueError("GPIO value is out of range.")


_gpios = {}

def _get_gpio_line(index):
if index not in _gpios:
_gpios[index] = GpioDigitalInput(index=index)
return _gpios[index]


def handle_get(index):
gpio_line = _get_gpio_line(index)
return gpio_line.get()


methods = {
'get': handle_get,
}
3 changes: 3 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def test_all_modules():
methods = aw.list()
assert 'sysfsgpio.set' in methods
assert 'sysfsgpio.get' in methods
aw.load('sysfsgpioin')
methods = aw.list()
assert 'sysfsgpio.get' in methods
aw.load('usb_hid_relay')
methods = aw.list()
assert 'usb_hid_relay.set' in methods
Expand Down
Loading