From d8279ddc1572832e9a7a8c9b397c583408f236f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leonard=20G=C3=B6hrs?= Date: Mon, 26 Jun 2023 15:55:41 +0200 Subject: [PATCH] httpdigitaloutput: support generic digital outputs via HTTP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support and documentation for a generic digital output driver that works via HTTP. The driver allows setting an URL to PUT/POST/PATCH a new output status to and an URL to GET the current status from. The message body to send when asserting/deasserting the output can be specified independently. The status returned by a GET request can either be matched exactly against the same body used to set the state or more fuzzily using regular expressions. Signed-off-by: Leonard Göhrs --- doc/configuration.rst | 54 ++++++++++++++++ labgrid/driver/__init__.py | 1 + labgrid/driver/httpdigitaloutput.py | 71 +++++++++++++++++++++ labgrid/remote/client.py | 4 +- labgrid/resource/__init__.py | 1 + labgrid/resource/httpdigitalout.py | 34 ++++++++++ tests/test_httpdigitalout.py | 97 +++++++++++++++++++++++++++++ 7 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 labgrid/driver/httpdigitaloutput.py create mode 100644 labgrid/resource/httpdigitalout.py create mode 100644 tests/test_httpdigitalout.py diff --git a/doc/configuration.rst b/doc/configuration.rst index 1a1ecba3e..0b0193750 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -483,6 +483,39 @@ Arguments: Used by: - `HIDRelayDriver`_ +HttpDigitalOutput ++++++++++++++++++ +A ``HttpDigitalOutput`` resource describes a generic digital output that can be +controlled via HTTP. + +.. code-block:: yaml + + HttpDigitalOutput: + url: http://host.example/some/endpoint + body_asserted: "On" + body_deasserted: "Off" + +The example assumes a simple scenario where the same URL is used for PUT +requests that set the output state and GET requests to get the current state. +It also assumes that the returned state matches either "On" or "Off" exactly. + +The `HttpDigitalOutputDriver`_ also supports more advanced use cases where the +current state is fetched from another URL and is interpreted using regular +expressions. + +Arguments: + - url (str): URL to use for setting a new state + - body_asserted (str): Request body to send to assert the output + - body_deasserted (str): Request body to send to de-assert the output + - method (str, default="PUT"): HTTP method to set a new state + + - url_get (str): URL to use instead of ``url`` for getting the state + - body_get_asserted (str): Regular Expression that matches an asserted response body + - body_get_deasserted (str): Regular Expression that matches a de-asserted response body + +Used by: + - `HttpDigitalOutputDriver`_ + NetworkHIDRelay +++++++++++++++ A NetworkHIDRelay describes an `HIDRelay`_ exported over the network. @@ -2763,6 +2796,27 @@ Implements: Arguments: - None + +HttpDigitalOutputDriver +~~~~~~~~~~~~~~~~~~~~~~~ +A HttpDigitalOutputDriver binds to a `HttpDigitalOutput` to set and get a +digital output state via HTTP. + +Binds to: + http: + - `HttpDigitalOutput`_ + +.. code-block:: yaml + + HttpDigitalOutputDriver: {} + +Implements: + - :any:`DigitalOutputProtocol` + +Arguments: + - None + + PyVISADriver ~~~~~~~~~~~~ The PyVISADriver uses a PyVISADevice resource to control test equipment manageable by PyVISA. diff --git a/labgrid/driver/__init__.py b/labgrid/driver/__init__.py index 471eb0078..cd18cc47b 100644 --- a/labgrid/driver/__init__.py +++ b/labgrid/driver/__init__.py @@ -46,3 +46,4 @@ from .usbtmcdriver import USBTMCDriver from .deditecrelaisdriver import DeditecRelaisDriver from .dediprogflashdriver import DediprogFlashDriver +from .httpdigitaloutput import HttpDigitalOutputDriver diff --git a/labgrid/driver/httpdigitaloutput.py b/labgrid/driver/httpdigitaloutput.py new file mode 100644 index 000000000..32a387aac --- /dev/null +++ b/labgrid/driver/httpdigitaloutput.py @@ -0,0 +1,71 @@ +import re +from importlib import import_module + +import attr + +from ..factory import target_factory +from ..protocol import DigitalOutputProtocol +from ..step import step +from ..util.proxy import proxymanager +from .common import Driver +from .exception import ExecutionError + + +@target_factory.reg_driver +@attr.s(eq=False) +class HttpDigitalOutputDriver(Driver, DigitalOutputProtocol): + bindings = { "http": "HttpDigitalOutput" } + + def __attrs_post_init__(self): + super().__attrs_post_init__() + self._requests = import_module("requests") + + def on_activate(self): + self._url_set = proxymanager.get_url( + self.http.url, + default_port=(443 if self.http.url.startswith("https") else 80), + ) + + if self.http.url_get: + self._url_get = proxymanager.get_url( + self.http.url_get, + default_port=(443 if self.http.url_get.startswith("https") else 80), + ) + + else: + self._url_get = self._url_set + + @Driver.check_active + @step(args=["status"]) + def set(self, status): + method = self.http.method or "PUT" + body = self.http.body_asserted if status else self.http.body_deasserted + + res = self._requests.request(method, self._url_set, data=body) + res.raise_for_status() + + @Driver.check_active + @step(result=["True"]) + def get(self): + res = self._requests.get(self._url_get) + res.raise_for_status() + + # Check if the response body matches an asserted state + if self.http.body_get_asserted: + if re.fullmatch(self.http.body_get_asserted, res.text) is not None: + return True + + elif res.text == self.http.body_asserted: + return True + + # Check if the response body matches a de-asserted state + if self.http.body_get_deasserted: + if re.fullmatch(self.http.body_get_deasserted, res.text) is not None: + return False + + elif res.text == self.http.body_deasserted: + return False + + raise ExecutionError( + f'response does not match asserted or deasserted state: "{res.text}"' + ) diff --git a/labgrid/remote/client.py b/labgrid/remote/client.py index a1f8d9825..3fe534e9a 100755 --- a/labgrid/remote/client.py +++ b/labgrid/remote/client.py @@ -762,7 +762,7 @@ def digital_io(self): action = self.args.action name = self.args.name target = self._get_target(place) - from ..resource import ModbusTCPCoil, OneWirePIO + from ..resource import ModbusTCPCoil, OneWirePIO, HttpDigitalOutput from ..resource.remote import (NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay) @@ -775,6 +775,8 @@ def digital_io(self): drv = self._get_driver_or_new(target, "ModbusCoilDriver", name=name) elif isinstance(resource, OneWirePIO): drv = self._get_driver_or_new(target, "OneWirePIODriver", name=name) + elif isinstance(resource, HttpDigitalOutput): + drv = self._get_driver_or_new(target, "HttpDigitalOutputDriver", name=name) elif isinstance(resource, NetworkDeditecRelais8): drv = self._get_driver_or_new(target, "DeditecRelaisDriver", name=name) elif isinstance(resource, NetworkSysfsGPIO): diff --git a/labgrid/resource/__init__.py b/labgrid/resource/__init__.py index 1820cad5d..77672b2a5 100644 --- a/labgrid/resource/__init__.py +++ b/labgrid/resource/__init__.py @@ -23,3 +23,4 @@ from .mqtt import TasmotaPowerPort from .httpvideostream import HTTPVideoStream from .dediprogflasher import DediprogFlasher, NetworkDediprogFlasher +from .httpdigitalout import HttpDigitalOutput diff --git a/labgrid/resource/httpdigitalout.py b/labgrid/resource/httpdigitalout.py new file mode 100644 index 000000000..6bcaa4074 --- /dev/null +++ b/labgrid/resource/httpdigitalout.py @@ -0,0 +1,34 @@ +import attr + +from ..factory import target_factory +from .common import Resource + + +@target_factory.reg_resource +@attr.s(eq=False) +class HttpDigitalOutput(Resource): + """This resource describes a generic HTTP-controlled output pin. + + Args: + url (str): URL to use for setting a new state + body_asserted (str): Request body to send to assert the output + body_deasserted (str): Request body to send to de-assert the output + method (str): HTTP method to use instead of PUT (the default) to set a new state + + url_get (str): URL to use for getting the state + body_get_asserted (str): Regular Expression that matches an asserted response body + body_get_deasserted (str): Regular Expression that matches a de-asserted response body + """ + + url = attr.ib(validator=attr.validators.instance_of(str)) + body_asserted = attr.ib(validator=attr.validators.instance_of(str)) + body_deasserted = attr.ib(validator=attr.validators.instance_of(str)) + method = attr.ib(default="PUT", validator=attr.validators.instance_of(str)) + + url_get = attr.ib(default="", validator=attr.validators.instance_of(str)) + body_get_asserted = attr.ib( + default="", validator=attr.validators.instance_of(str) + ) + body_get_deasserted = attr.ib( + default="", validator=attr.validators.instance_of(str) + ) diff --git a/tests/test_httpdigitalout.py b/tests/test_httpdigitalout.py new file mode 100644 index 000000000..9b2d20bcb --- /dev/null +++ b/tests/test_httpdigitalout.py @@ -0,0 +1,97 @@ +import pytest +import requests + +from labgrid.driver import HttpDigitalOutputDriver +from labgrid.resource import HttpDigitalOutput + + +@pytest.fixture(scope="function") +def mock_server(mocker): + state = '"Unknown"' + + def request(method, url, data=None): + nonlocal state + state = data + return mocker.MagicMock() + + def get(url): + r = mocker.MagicMock() + r.text = state + return r + + mock_request = mocker.patch("requests.request") + mock_request.side_effect = request + mock_get = mocker.patch("requests.get") + mock_get.side_effect = get + + return (mock_request, mock_get) + + +def _make_http_driver(target, with_tls, with_regex, separate_get, match_error): + scheme = "https" if with_tls else "http" + url = f"{scheme}://host.example/set" + url_get = f"{scheme}://host.example/get" if separate_get else "" + + body_get_asserted = ".*n.*" if with_regex else "" + body_get_deasserted = ".*ff.*" if with_regex else "" + + if match_error: + body_get_asserted = "--- DOES NOT MATCH ---" + body_get_deasserted = "--- DOES NOT MATCH EITHER ---" + + dig_out_res = HttpDigitalOutput( + target, + name=None, + url=url, + body_asserted='"On"', + body_deasserted='"Off"', + method="PUT", + url_get=url_get, + body_get_asserted=body_get_asserted, + body_get_deasserted=body_get_deasserted, + ) + + http_driver = HttpDigitalOutputDriver(target, name=None) + target.activate(http_driver) + + return http_driver + + +@pytest.mark.parametrize( + "asserted,with_tls,with_regex,separate_get", + [ + (False, False, False, False), + (True, False, False, False), + (True, True, False, False), + (True, False, True, False), + (False, False, True, False), + (True, False, False, True), + (True, True, True, True), + ], +) +def test_set_get(asserted, with_tls, with_regex, separate_get, target, mock_server): + http_driver = _make_http_driver(target, with_tls, with_regex, separate_get, False) + mock_request, mock_get = mock_server + + data = '"On"' if asserted else '"Off"' + scheme = "https" if with_tls else "http" + port = 443 if with_tls else 80 + get_endpoint = "get" if separate_get else "set" + + set_url = f"{scheme}://host.example:{port}/set" + get_url = f"{scheme}://host.example:{port}/{get_endpoint}" + + http_driver.set(asserted) + mock_request.assert_called_once_with("PUT", set_url, data=data) + + assert http_driver.get() == asserted + mock_get.assert_called_once_with(get_url) + + +def test_match_exception(target, mock_server): + http_driver = _make_http_driver(target, False, False, False, True) + mock_request, mock_get = mock_server + + http_driver.set(True) + with pytest.raises(Exception): + http_driver.get()