Skip to content

Commit

Permalink
feat(rpi-gpio): support GPIO inputs and add module README
Browse files Browse the repository at this point in the history
  • Loading branch information
activeshadow committed Aug 2, 2024
1 parent 5787355 commit cf72567
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 3 deletions.
37 changes: 37 additions & 0 deletions src/python/otsim/rpi_gpio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Raspberry Pi GPIO Module

This OT-sim module leverages the [RPi.GPIO](https://pypi.org/project/RPi.GPIO/)
Python module to associate OT-sim tags with input and output pins.

Please note the following about this module:

1. This module will only work when OT-sim is run on a Raspberry Pi. To avoid
long compilation times, the suggested way to do so is to run OT-sim on a
Raspberry Pi using the OT-sim Docker image, which is a multi-architecture build.
1. Each input and output is used to setup the corresponding GPIO channel,
defined by the `pin` assignment.
1. This module only supports boolean inputs and outputs (`PWM` is not
supported).
1. This module only subscribes to `update` messages from the OT-sim message bus,
and only publishes `status` messages to the OT-sim message bus. GPIO inputs are
mapped to input tags, and output tags are mapped to GPIO outputs.
1. On exit, `GPIO.cleanup()` is called, which will set all used channels back to
inputs with no pull up/down. This may cause anything physically connected to the
channels to be affected.

## Example Configuration

```
<rpi-gpio name="rpi-hil" mode="board"> <!-- mode can be "board" or "bcm" -->
<period>1</period> <!-- how often, in seconds, to read GPIO inputs; defaults to 5 -->
<output pin="11">
<tag>led-11</tag>
</output>
<output pin="13">
<tag>relay-1-3</tag>
</output>
<input pin="17">
<tag>switch-17</tag>
</input>
</rpi-gpio>
```
48 changes: 45 additions & 3 deletions src/python/otsim/rpi_gpio/rpi_gpio.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
from __future__ import annotations

import json, logging, signal, sys, threading, typing
import logging, signal, sys, threading, time, typing

import otsim.msgbus.envelope as envelope
import xml.etree.ElementTree as ET

from otsim.msgbus.envelope import Envelope
from otsim.msgbus.envelope import Envelope, Point
from otsim.msgbus.pusher import Pusher
from otsim.msgbus.subscriber import Subscriber

import RPi.GPIO as GPIO


class RPiGPIO:
def __init__(self: RPiGPIO, pub: str, pull: str, el: ET.Element):
# map pin numbers --> tag
self.inputs: typing.Dict[int, str] = {}
# map tags --> pin number
self.outputs: typing.Dict[str, int] = {}

self.monitoring = True

self.name = el.get('name', default='ot-sim-rpi-gpio')
mode = el.get('mode', default='BOARD')

GPIO.setwarnings(False)

if mode.upper() == 'BOARD':
GPIO.setmode(GPIO.BOARD)
elif mode.upper() == 'BCM':
Expand All @@ -27,8 +34,20 @@ def __init__(self: RPiGPIO, pub: str, pull: str, el: ET.Element):
print(f"unknown GPIO mode '{mode}' - defaulting to 'BOARD' mode")
GPIO.setmode(GPIO.BOARD)

pub_endpoint = el.findtext('pub-endpoint', default=pub)
pub_endpoint = el.findtext('pub-endpoint', default=pub)
pull_endpoint = el.findtext('pull-endpoint', default=pull)

self.subscriber = Subscriber(pub_endpoint)
self.pusher = Pusher(pull_endpoint)

self.period = float(el.findtext('period', default=5))

for o in el.findall('input'):
pin = int(o.get('pin'))
tag = o.findtext('tag')

self.inputs[pin] = tag
GPIO.setup(pin, GPIO.IN)

for o in el.findall('output'):
pin = int(o.get('pin'))
Expand All @@ -43,12 +62,35 @@ def __init__(self: RPiGPIO, pub: str, pull: str, el: ET.Element):
def start(self: RPiGPIO):
self.subscriber.start('RUNTIME')

# run GPIO monitor in a thread
self.monitor_thread = threading.Thread(target=self.monitor, daemon=True).start()


def stop(self: RPiGPIO):
self.monitoring = False
self.monitor_thread.join(self.period)

GPIO.cleanup()
self.subscriber.stop()


def monitor(self: RPiGPIO):
if len(self.inputs) == 0:
return

while self.monitoring:
points: typing.List[Point] = []

for pin, tag in self.inputs.items():
val = GPIO.input(pin)
points.append({'tag': tag, 'value': float(val), 'ts': 0})

env = envelope.new_status_envelope(self.name, {'measurements': points})
self.pusher.push('RUNTIME', env)

time.sleep(self.period)


def handle_msgbus_update(self: RPiGPIO, env: Envelope):
update = envelope.update_from_envelope(env)

Expand Down

0 comments on commit cf72567

Please sign in to comment.