forked from nlgranger/input-event-daemon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinput-event-daemon.py
145 lines (116 loc) · 4.48 KB
/
input-event-daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
import argparse
import asyncio
import configparser
import grp
import logging
import multiprocessing
import os
import pwd
import signal
import subprocess
import sys
import evdev
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def drop_privileges(uid_name='nobody', gid_name='nobody'):
# from https://stackoverflow.com/a/2699996/5786475
if os.getuid() != 0:
# We're not root so, like, whatever dude
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(77)
def run_commands(command_queue, user="nobody", group="nobody", timeout=1):
drop_privileges(user, group)
while True:
cmd = command_queue.get()
if cmd is None:
return
subprocess.run(cmd, timeout=timeout, shell=True, check=False)
def is_ambiguous(keys, combinations):
return any(combo[:len(keys)] == keys and len(combo) > len(keys)
for combo in combinations)
async def event_handler(device, bindings, command_queue):
while True:
# Read key
event = await device.async_read_one()
event = evdev.categorize(event)
if not isinstance(event, evdev.KeyEvent) or not event.keystate == evdev.KeyEvent.key_up:
continue
prefix = (event.keycode,)
# Read extra keys if there it might be the start of a combo
try:
while is_ambiguous(prefix, bindings.keys()):
event = await asyncio.wait_for(device.async_read_one(), timeout=1)
event = evdev.categorize(event)
if not isinstance(event, evdev.KeyEvent) or not event.keystate == evdev.KeyEvent.key_up:
continue
prefix += (event.keycode,)
except asyncio.TimeoutError:
pass
# Process command
if prefix in bindings: # erroneous input, discard
logger.debug(f"handling {'+'.join(prefix)} on {device.path}")
for cmd in bindings[prefix]:
command_queue.put_nowait(cmd)
def main():
argparser = argparse.ArgumentParser()
argparser.add_argument("--list", action='store_true',
help="list available devices and quit")
argparser.add_argument("--config", "-c", default="/etc/input-event-daemon.conf",
help="configuration file")
args = argparser.parse_args()
# Parse configuration
config = configparser.RawConfigParser()
config.read(args.config)
# Start command runner
command_queue = multiprocessing.Queue(maxsize=100)
user = config.get('commands', 'user', fallback="nobody")
group = config.get('commands', 'group', fallback="nobody")
print("user: {} - group: {}".format(user, group))
timeout = config.getfloat('commands', 'timeout', fallback=1)
command_worker = multiprocessing.Process(
target=run_commands, args=[command_queue, user, group, timeout])
def cleanup(*kargs):
command_queue.put_nowait(None)
command_worker.join()
command_queue.close()
logger.info("Exiting")
sys.exit(0)
# prevent children from capturing sigint
signal.signal(signal.SIGINT, signal.SIG_IGN)
command_worker.start()
# make sure to cleanly terminate when killed
signal.signal(signal.SIGINT, cleanup)
# Start input event handlers
for device_name in (s for s in config.sections() if s != "commands"):
# store commands
bindings = {}
for key in config[device_name].keys():
commands = config[device_name].get(key)
commands = [cmd.strip() for cmd in commands.splitlines()
if cmd.strip() != ""]
key = tuple(k.upper().strip() for k in key.split("+"))
bindings[key] = commands
# set-up handler loop
try:
device = evdev.InputDevice(device_name)
except IOError as e:
logger.error("Failed to open {}: {}".format(device_name, str(e)))
sys.exit(1)
else:
logger.info("Opened device {}".format(device_name))
asyncio.ensure_future(event_handler(device, bindings, command_queue))
loop = asyncio.get_event_loop()
loop.run_forever()
if __name__ == "__main__":
main()