-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
executable file
·89 lines (60 loc) · 2.1 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/python
from __future__ import print_function
import os
import signal
import sys
import RPi.GPIO as GPIO
import MFRC522
# import pygame
import handlers
import time
import asyncio
from functools import wraps, partial
from lib import RFID, CredentialsProvider, CollisionException, Playback, Cache
AWS_IOT_CREDS_URL = os.environ['AWS_IOT_CREDS_URL']
CERT_PEM_PATH = os.environ['CERT_PEM_PATH']
CERT_KEY_PATH = os.environ['CERT_KEY_PATH']
def sigterm_handler(signo, stack_frame):
print("Stopping...")
sys.exit(0)
def async_wrap(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
async def main():
driver = MFRC522.MFRC522()
driver.MFRC522_Init()
rfid = RFID(driver)
awsHelper = CredentialsProvider(AWS_IOT_CREDS_URL, (CERT_PEM_PATH, CERT_KEY_PATH))
playback = Cache(awsHelper, Playback(awsHelper, None), asyncio.get_event_loop()) # pygame.mixer.music)
# Event handers
# rfid.onNewCardDetected += lambda sender, uid, data: print("New card has detected: %s (%s)" % (uid, data))
rfid.onNewCardDetected += handlers.playHandler(playback)
# rfid.onCardRemoved += lambda sender, uid: print("Card %s has removed!" % uid)
rfid.onCardRemoved += handlers.stopHandler(playback)
# rfid.onCardStillPresent += lambda sender, uid: print("Card %s is still there!" % uid)
print("Waiting for tag ...")
await playback.play('ta-da-chimes-sound-effect.mp3')
await asyncio.sleep(2)
await playback.stop()
await rfid.start()
if __name__ == "__main__":
signal.signal(signal.SIGTERM, sigterm_handler)
try:
start = time.time()
asyncio.run(main())
end = time.time()
print(f"Time elapse: {end-start}")
except CollisionException as e:
print(e)
except KeyboardInterrupt:
exit(0)
except Exception:
raise
finally:
GPIO.cleanup()
# pygame.mixer.music.stop()