forked from loblab/rfask
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ask_device.py
112 lines (100 loc) · 3.33 KB
/
ask_device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Copyright 2017 loblab
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import RPi.GPIO as GPIO
import time
from ask_wave import *
class Transmitter:
def __init__(self, pin_tx):
GPIO.setup(pin_tx, GPIO.OUT)
self.pin_tx = pin_tx
def lock(self):
GPIO.output(self.pin_tx, 1)
def unlock(self):
GPIO.output(self.pin_tx, 0)
def send(self, wave):
b = wave.startbit
ts = wave.timestamp
t1 = time.time()
GPIO.output(self.pin_tx, b)
t1 -= ts[0]
for t in ts[1:-1]:
b = 1 - b
wait = t1 + t - time.time()
if wait > 0:
time.sleep(wait)
GPIO.output(self.pin_tx, b)
wait = t1 + ts[-1] - time.time()
if wait > 0:
time.sleep(wait)
class Receiver:
def __init__(self, pin_rx, pin_en, max_len=2048, sample_period=0.05, min_gap=3, max_gap=10):
# Don't enable GPIO.PUD_DOWN nor GPIO.PUD_UP due to the small receiver current
GPIO.setup(pin_rx, GPIO.IN)
if pin_en > 0:
GPIO.setup(pin_en, GPIO.OUT)
GPIO.output(pin_en, GPIO.HIGH)
self.pin_rx = pin_rx
self.max_len = max_len
self.sample_period = 1e-3 * sample_period
self.min_gap = min_gap
self.max_gap = max_gap
def get_sample(self):
self.sample_time += self.sample_period
now = time.time()
wait = self.sample_time - now
if wait > 0:
time.sleep(wait)
b = GPIO.input(self.pin_rx)
now = time.time()
return (b, now)
def receive(self):
wave = BitWave()
ts = wave.timestamp
b = GPIO.input(self.pin_rx)
now = time.time()
ch = GPIO.wait_for_edge(self.pin_rx, GPIO.BOTH, timeout=self.min_gap)
if ch is not None:
return None
b0 = b
t0 = now
ts.append(t0)
ch = GPIO.wait_for_edge(self.pin_rx, GPIO.BOTH, timeout=self.max_gap-self.min_gap)
b = GPIO.input(self.pin_rx)
now = time.time()
if ch is None:
return None
if b == b0:
return None
wave.startbit = b0
self.bit = b
self.sample_time = now
self.edge_time = now
ts.append(now)
min_gap = 1e-3 * self.min_gap
while True:
(b, now) = self.get_sample()
if b == self.bit:
if now - self.edge_time > min_gap:
if b == 0:
ts.append(now)
return wave if len(ts) > 5 else None
else:
if now - self.edge_time < self.sample_period:
return None
self.edge_time = now
self.bit = b
ts.append(now)
if len(ts) > self.max_len:
return None
return wave