-
Notifications
You must be signed in to change notification settings - Fork 0
/
encoding.py
159 lines (135 loc) · 5.22 KB
/
encoding.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from utils import *
from consts import *
import log
import wave
import math
import time
from itertools import zip_longest
import numpy as np
import matplotlib.pyplot as plt
def hamming_7_4(bit_array, auto_pad=True):
# print("encoding hamming 7_4,\trecieved {} bits".format(len(bit_array)))
if auto_pad:
bit_array = pad(bit_array, 4)
assert len(bit_array) % 4 == 0, 'For hamming 7:4 bit array must be multiple of 4'
chunks = chunk(bit_array, 4)
bit_array = np.empty_like([], dtype='bool')
for l in chunks:
p1 = (l[0]+l[1]+l[3]) % 2
p2 = (l[0]+l[2]+l[3]) % 2
p3 = (l[1]+l[2]+l[3]) % 2
l = np.concatenate((l, [p1, p2, p3]))
bit_array = np.concatenate((bit_array,l))
# print("\t\t\t\t\t\treturned {} bits".format(len(bit_array)))
return bit_array
#
# def hamming_8_4(bit_array):
# hamming_7_4(bit_array)
# p4 = sum(bit_array) % 2
# return bit_array
def encode(data, compression, freqs, coding, modulation, **kwargs):
log.info("Encoding")
if len(data) < 64:
log.info('Sending data bits: {}'.format(bits2string(data)))
# Compress ->bits
if compression == 'image':
log.info("Compression: Image")
with open(kwargs.get('image_file'), 'rb') as f:
b = np.array(bytearray(f.read()))
send_bits = np.unpackbits(b)
else:
log.info("Compression: None")
send_bits = data
# Freq Multiplex
log.info('Frequencies: {}'.format(freqs))
bit_rates = get_data_rates(freqs)
log.info(f'Bit rates: {bit_rates}')
bit_streams = split_data_into_streams(send_bits, bit_rates)
# Coding
if coding == 'hamming':
log.info("Coding: Hamming")
coded_bit_streams = [hamming_7_4(stream) for stream in bit_streams]
else:
log.info("Coding: None")
coded_bit_streams = bit_streams
# Modulate
# Add leading silence and sync pulse
audio = []
audio = append_silence(audio, duration_milliseconds=500)
audio.extend(get_sync_pulse())
# If provided with single modulation, use that type for all freqs
if type(modulation) == str:
log.debug(f"Using modulation type {modulation} for all freqs")
modulation = [modulation for _i in freqs]
audio_streams = []
for stream, freq, mod, rate in zip(coded_bit_streams, freqs, modulation, bit_rates):
if mod == "psk":
log.info("Modulation: PSK")
audio_streams.append(modulate_psk(stream, freq, rate))
elif mod == "simple":
log.info("Modulation: Simple")
audio_streams.append(modulate_simple(stream, freq, rate))
else:
raise Exception("Invalid modulation type: {}".format(modulation))
audio.extend([np.mean(i) for i in zip(*audio_streams)])
# Add trailing silence
audio.extend(get_silence(duration_milliseconds=500))
# Save file
save_wav(audio, 'bin.wav')
if kwargs.get('plot_audio'):
plt.figure('audio')
plt.plot(audio)
plt.draw()
def modulate_psk(bit_array, frequency, data_rate):
audio = []
duration = 1000 / data_rate
loss_of_sync_per_bit = SAMPLE_RATE / data_rate % 1
# e.g value of 0.1 means we should append 40.1 samples but only append 40
# Keep track of total lag, and compensate with added samples
# Pad for PSK
bit_array = pad(bit_array, 2)
# Insert 16 bits detailing length of transmission
assert len(bit_array) < 65536, "ERROR maximum packet size exceeded"
bit_count_frame = "{:0>16b}".format(len(bit_array))
log.debug("Bit count frame: {}".format(bit_count_frame))
assert len(bit_count_frame) == 16
coded_bit_count_frame = hamming_7_4([int(i) for i in bit_count_frame], auto_pad=False)
log.debug("Coded bit count frame: {}".format(''.join([str(i) for i in coded_bit_count_frame])))
bit_array = np.insert(bit_array, 0, coded_bit_count_frame)
log.debug("len(bit_array): {}".format(len(bit_array)))
# Insert 8 bits detailing symbol space
bit_array = np.insert(bit_array, 0, [1, 1, 1, 0, 0, 1, 0, 0])
cumulative_lag = 0
chunks = chunk(bit_array, 2)
for bit_pair in chunks:
b1, b2 = bit_pair
if b1 and b2:
audio.extend(get_neg_coswave(frequency, duration))
elif b1:
audio.extend(get_neg_sinewave(frequency, duration))
elif b2:
audio.extend(get_coswave(frequency, duration))
else:
audio.extend(get_sinewave(frequency, duration))
cumulative_lag += loss_of_sync_per_bit
if cumulative_lag > 1:
audio.append(audio[-1])
cumulative_lag -= 1
return audio
def modulate_simple(bit_array, frequency, data_rate):
audio = []
duration = 1000 / data_rate
loss_of_sync_per_bit = SAMPLE_RATE / data_rate % 1
# e.g value of 0.1 means we should append 40.1 samples but only append 40
# Keep track of total lag, and compensate with added samples
cumulative_lag = 0
for bit in bit_array:
if bit:
audio.extend(get_sinewave(frequency, duration))
else:
audio.extend(get_silence(duration))
cumulative_lag += loss_of_sync_per_bit
if cumulative_lag > 1:
audio.append(audio[-1])
cumulative_lag -= 1
return audio