forked from emidan19/deep-tempest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfolder_simulation.py
229 lines (167 loc) · 7.78 KB
/
folder_simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Dec 20 2022
@author: Emilio Martínez <[email protected]>
Script that reads all images in folder and simulates HDMI tempest capture
"""
# =============================================================================
# Imports
# =============================================================================
import os
import json
import time as time
import numpy as np
from skimage.io import imread
from scipy import signal
from PIL import Image
from utils.DTutils import TMDS_encoding_original, TMDS_serial
import logging
from utils import utils_logger
from datetime import datetime
# Currently supporting png, jpg, jpeg, tif and gif extentions only
def get_images_names_from_folder (folder):
images_list = [image for image in os.listdir(folder) \
if image.endswith('.png') or image.endswith('.jpg') or image.endswith('.jpeg') or \
image.endswith('.tif') or image.endswith('.tiff') or image.endswith('.gif') or image.endswith('.bmp')]
return images_list
def get_subfolders_names_from_folder(folder):
subfolders_list = [name for name in os.listdir(folder) if os.path.isdir(os.path.join(folder, name))]
return subfolders_list
def image_transmition_simulation(I, blanking=False):
# Encode image for TMDS
I_TMDS = TMDS_encoding_original (I, blanking = blanking)
# Serialize pixel bits and sum channel signals
I_TMDS_Tx = TMDS_serial(I_TMDS)
return I_TMDS_Tx, I_TMDS.shape
def image_capture_simulation(I_Tx, h_total, v_total, N_harmonic, sdr_rate = 50e6,
noise_std=0, fps=60, freq_error=0, phase_error=0,
interpolator=None, diff_signaling=False):
# Compute pixelrate and bitrate
px_rate = h_total*v_total*fps
bit_rate = 10*px_rate
# Continuous samples (interpolate)
if interpolator:
sample_rate = interpolator*bit_rate
else:
interpolator = int(np.ceil(N_harmonic/5)) # Condition for sampling rate
if interpolator > 1:
I_Tx_continuous = np.repeat(I_Tx,interpolator)
else:
I_Tx_continuous = I_Tx
# Differential signaling
if (diff_signaling) and (interpolator != 1):
I_Tx_continuous = np.diff(I_Tx_continuous)
Nsamples = len(I_Tx_continuous)
# Add Gaussian noise
if noise_std > 0:
noise_sigma = noise_std/15.968719423 # sqrt(255)~15.968719423
I_Tx_noisy = I_Tx_continuous + np.random.normal(0, noise_sigma, Nsamples) + 1j*np.random.normal(0, noise_sigma,Nsamples)
else:
I_Tx_noisy = I_Tx_continuous
# Continuous time array
t_continuous = np.arange(Nsamples)/sample_rate
# AM modulation frequency according to pixel harmonic
harm = N_harmonic*px_rate
# Harmonic oscilator (including frequency and phase error)
baseband_exponential = np.exp(2j*np.pi*(harm+freq_error)*t_continuous + 1j*phase_error)
# AM modulation and SDR sampling
I_Rx = signal.resample_poly(I_Tx_noisy*baseband_exponential,up=int(sdr_rate), down=sample_rate)
# Reshape signal to the image size
I_capture = signal.resample(I_Rx, h_total*v_total).reshape(v_total,h_total)
return I_capture
def save_simulation_image(I,path_and_name):
v_total,h_total = I.shape
I_save = np.zeros((v_total,h_total,3))
I_real = np.real(I)
I_imag = np.imag(I)
I_save[:,:,0], I_save[:,:,1] = I_real, I_imag
min_value, max_value = np.min(I_save[:,:,:2]), np.max(I_save[:,:,:2])
I_save[:,:,0] = 255*(I_real-min_value)/(max_value-min_value)
I_save[:,:,1] = 255*(I_imag-min_value)/(max_value-min_value)
im = Image.fromarray(I_save.astype('uint8'))
im.save(path_and_name)
def main(simulation_options_path = 'options/tempest_simulation.json'):
# Load JSON options file
options = json.load(open(simulation_options_path))
logs_dir = 'logfiles/'
# Create logs directory if not exist
if not os.path.exists(logs_dir):
os.mkdir(logs_dir)
# Get input and output dirs
input_folder = options['paths']['folder_original_images']
logger_name = 'simulations_'+datetime.now().strftime("%d-%m-%Y_%H:%M:%S")
utils_logger.logger_info(logger_name, os.path.join(logs_dir,logger_name+'.log'))
logger = logging.getLogger(logger_name)
message = f'Tempest capture simulation for image folder {input_folder}\n'
logger.info(message)
# Check input directory exists
if not(os.path.exists(input_folder)):
message = f'No input folder {input_folder} was found. Exiting\n'
logger.info(message)
exit()
# Create output simulation directory if not exists
output_folder = options['paths']['folder_simulated_images']
if not (os.path.exists(output_folder)):
os.mkdir(output_folder)
message = f'Created simulation directory at {output_folder}\n'
logger.info(message)
message = f'Tempest options:\n {options} \n'
logger.info(message)
# Get tempest options
blanking = options['options']['blanking']
fps = options['options']['frames_per_second']
sdr_rate = options['options']['sdr_rate']
interpolator = options['options']['interpolator']
differential_signaling = options['options']['differential_signaling']
harmonics = options['options']['random']['harmonics']
freq_error_range = options['options']['random']['freq_error']
phase_error_range = options['options']['random']['phase_error']
sigma = options['options']['random']['sigma']
# Process possible sigma types
if type(sigma) == list:
sigma = np.random.randint(sigma[0],sigma[1])
elif sigma is None:
sigma = 0
# Get images and subfolders names
images = get_images_names_from_folder(input_folder)
# Get images names from output folder
output_existing_images = get_images_names_from_folder(output_folder)
# Initialize processing time
t_all_images = 0
for image in images:
# Check if image already simulated
if image in output_existing_images:
output_existing_images.remove(image)
continue
# timestamp for simulation starting
t1_image = time.time()
# Read image
image_path = os.path.join(input_folder,image)
I = imread(image_path)
# Random channel effects
freq_error = np.random.randint(freq_error_range[0], freq_error_range[1])
phase_error = np.random.uniform(phase_error_range[0], phase_error_range[1])*np.pi
# Choose random pixel rate harmonic number
N_harmonic = np.random.choice(harmonics)
message = f'Initiate simulation for image "{image}" with {N_harmonic} pixel harmonic frequency, {freq_error} Hz and {phase_error} rads error.'
logger.info(message)
# TMDS coding and bit serialization
I_Tx, resolution = image_transmition_simulation(I, blanking=blanking)
v_res, h_res, _ = resolution
I_capture = image_capture_simulation(I_Tx, h_res, v_res, N_harmonic, sdr_rate,
sigma, fps, freq_error, phase_error,
interpolator, differential_signaling)
path = os.path.join(output_folder,image)
save_simulation_image(I_capture,path)
# timestamp for simulation ending
t2_image = time.time()
t_image = t2_image-t1_image
t_all_images += t_image
message = 'Processing time: {:.2f}'.format(t_image)+'s\n'
logger.info(message)
# message = 'Total processing time for {} images: {:.2f}'.format(len(images),t_all_images)+'s\n'
message = 'Total processing time for {} images: {:.2f}s \n'.format(len(images),t_all_images)
logger.info(message)
if __name__ == "__main__":
main()