Skip to content

Commit

Permalink
We now only process packets received with the current sequence number.
Browse files Browse the repository at this point in the history
We increase the number of packets per cycle to 200.
  • Loading branch information
fdojurado committed Jun 11, 2022
1 parent 69315a2 commit 6bd2980
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 20 deletions.
9 changes: 7 additions & 2 deletions controller/controller/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def exit_process(signal_number, frame):
routing_input_queue = mp.Queue()
routing_output_queue = mp.Queue()
routing_alg_queue = mp.Queue()
sequence_number = mp.Queue()
# NC Queues
nc_input_queue = mp.Queue()
nc_output_queue = mp.Queue()
Expand All @@ -92,7 +93,7 @@ def exit_process(signal_number, frame):
# can be change at run time
if rl:
rp = SDWSN_RL(ServerConfig.from_json_file(config), verbose,
routing_input_queue, routing_output_queue, nc_input_queue, nc_output_queue)
routing_input_queue, routing_output_queue, nc_input_queue, nc_output_queue, sequence_number)
else:
rp = Routing(ServerConfig.from_json_file(config), verbose, "dijkstra",
routing_input_queue, routing_output_queue, routing_alg_queue)
Expand Down Expand Up @@ -129,12 +130,16 @@ def exit_process(signal_number, frame):
interval = ServerConfig.from_json_file(config).routing.time
# timeout = time.time()+int(120.0)
while True:
# Read latest sequence number from RL
if not sequence_number.empty():
sequence = sequence_number.get()
globals.sequence = sequence
# look for incoming request from the serial interface
if not serial_output_queue.empty():
data = serial_output_queue.get()
handle_serial_packet(data, ack_queue)
# Run the routing protocol?
if globals.num_packets_period > 150:
if globals.num_packets_period > 200:
G = load_wsn_links("rssi")
routing_input_queue.put(G)
globals.num_packets_period = 0
Expand Down
28 changes: 16 additions & 12 deletions controller/controller/deep_reinforcement_learning/sdwsn_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ class sdwsnEnv(gym.Env):
metadata = {'render.modes': ['human']}

def __init__(self, num_nodes, max_channel_offsets, max_slotframe_size,
nc_job_queue, input_queue, job_completion):
nc_job_queue, input_queue, job_completion, sequence_number):
super(sdwsnEnv, self).__init__()
self.nc_job_queue = nc_job_queue
self.input_queue = input_queue
self.job_completion = job_completion
self.sequence_number = sequence_number
self.num_nodes = num_nodes
self.max_channel_offsets = max_channel_offsets
self.max_slotframe_size = max_slotframe_size
Expand Down Expand Up @@ -104,8 +105,10 @@ def step(self, action):
# Send job with id and wait for reply
self.send_job(schedules_json, sequence)
# We now wait for the cycle to complete
self.sequence_number.put(sequence)
self.input_queue.get()
print("process reward")
sleep(1)
# Build observations
user_requirements = np.array([alpha, beta, delta])
observation = np.append(user_requirements, last_ts_in_schedule)
Expand Down Expand Up @@ -192,7 +195,7 @@ def send_job(self, data, job_id):
result = 0
while True:
try:
result, job = self.job_completion.get(timeout=0.1)
result, job = self.job_completion.get(timeout=0.5)
if job == job_id and result == 1:
print("job completion successful")
result = 1
Expand All @@ -201,7 +204,7 @@ def send_job(self, data, job_id):
print("job not completed yet")
# We stop sending the current NC packet if
# we reached the max RTx or we received ACK
if(rtx >= 7):
if(rtx >= 17):
print("Job didn't complete")
break
# We wait until max queue readings < 7
Expand Down Expand Up @@ -368,7 +371,7 @@ def get_last_power_consumption(self, node, power_samples):
{"$unwind": "$energy"},
{"$match": {
"energy.cycle_seq": {
"$gte": sequence
"$eq": sequence
}
}
},
Expand All @@ -388,10 +391,9 @@ def get_last_power_consumption(self, node, power_samples):
energy = 0
for doc in db:
energy = doc['ewma_energy']
# print("last energy sample")
# print(energy)
power_samples.append((node, energy))
# Calculate the avg delay
print("last energy sample")
print(energy)
# Calculate the avg delay
if energy > 0:
power_samples.append((node, energy))
else:
Expand All @@ -410,7 +412,7 @@ def get_network_power_consumption(self, init_time, nodes):
power_samples = []
# We first loop through all sensor nodes
for node in nodes:
# print(f"printing power for node {node}")
print(f"printing power for node {node}")
# Get all samples from the start of the network configuration
self.get_last_power_consumption(
node, power_samples)
Expand Down Expand Up @@ -513,7 +515,7 @@ def get_avg_delay(self, node, delay_samples):
if num_rcv > 0:
avg_delay = sum_delay/num_rcv
else:
avg_delay = 3000
avg_delay = 2500
delay_samples.append((node, avg_delay))
return

Expand All @@ -522,7 +524,7 @@ def get_network_delay(self, init_time, nodes):
# Min power
delay_min = SLOT_DURATION
# Max power
delay_max = 3000
delay_max = 2500
# Get the time when the last network configuration was deployed
timestamp = init_time
# Variable to keep track of the number of delay samples
Expand Down Expand Up @@ -1081,7 +1083,7 @@ def reset(self):
delay = [0.1, 0.8, 0.1]
reliability = [0.1, 0.1, 0.8]
user_req = [balanced, energy, delay, reliability]
select_user_req = energy
select_user_req = delay
# select_user_req = random.choice(user_req)
# Let's prepare the schedule information in the json format
schedules_json = self.schedule.schedule_toJSON(slotframe_size)
Expand Down Expand Up @@ -1113,6 +1115,7 @@ def reset(self):
sequence = self.increase_sequence()
# Send job with id and wait for reply
self.send_job(routes_json, sequence)
self.sequence_number.put(sequence)
# Wait for the network to settle
sleep(0.5)
# We now save all the observations
Expand All @@ -1127,6 +1130,7 @@ def reset(self):
_, last_ts = self.build_link_schedules_matrix_obs()
# We now save the observations with reward None
# observation = np.zeros(self.n_observations).astype(np.float32)
slotframe_size = slotframe_size + 15
observation = np.append(user_requirements, last_ts)
observation = np.append(observation, slotframe_size)
self.save_observations(
Expand Down
5 changes: 3 additions & 2 deletions controller/controller/deep_reinforcement_learning/sdwsn_rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ def _on_training_end(self) -> None:


class SDWSN_RL(mp.Process):
def __init__(self, config, verbose, input_queue, output_queue, nc_job_queue, nc_job_completion):
def __init__(self, config, verbose, input_queue, output_queue, nc_job_queue, nc_job_completion, sequence_number):
mp.Process.__init__(self)
self.input_queue = input_queue
self.output_queue = output_queue
self.nc_job_queue = nc_job_queue
self.nc_job_completion = nc_job_completion
self.sequence_number = sequence_number
self.verbose = verbose
self.first_time_run = 0
self.max_channel_offsets = config.tsch.num_of_channels
Expand All @@ -99,7 +100,7 @@ def configure_env(self):
# Get last index of sensor
N = get_last_index_wsn()+1
self.env = sdwsnEnv(N, self.max_channel_offsets,
self.max_slotframe_size, self.nc_job_queue, self.input_queue, self.nc_job_completion)
self.max_slotframe_size, self.nc_job_queue, self.input_queue, self.nc_job_completion, self.sequence_number)
print('Number of states: {}'.format(self.env.observation_space))
print('Number of actions: {}'.format(self.env.action_space))
self.model = DQN('MlpPolicy', self.env, verbose=2)
Expand Down
3 changes: 2 additions & 1 deletion controller/controller/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
def globals_initialize():
print("initializing globals")
global routes_matrix, nbr_rssi_matrix, nbr_etx_matrix, \
link_schedules_matrices, elapse_time, num_packets_period
link_schedules_matrices, elapse_time, num_packets_period, sequence
routes_matrix = np.array([])
nbr_rssi_matrix = np.array([])
nbr_etx_matrix = np.array([])
link_schedules_matrices = []
elapse_time = 0
num_packets_period = 0
sequence = 0
2 changes: 1 addition & 1 deletion controller/controller/network_config/network_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def send(self, data, serial_pkt):
result = 0
while True:
try:
ack_pkt = self.ack_queue.get(timeout=0.5)
ack_pkt = self.ack_queue.get(timeout=1.2)
if (ack_pkt.reserved0 == serial_pkt.reserved0+1):
print("correct ACK received")
result = 1
Expand Down
14 changes: 12 additions & 2 deletions controller/controller/serial/serial_packet_dissector.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,17 @@ def handle_serial_packet(data, ack_queue):
"bad NA packet"
return
# Add to number of pkts received during this period
if not na_pkt.cycle_seq == globals.sequence:
return
print(repr(pkt))
print(repr(na_pkt))
globals.num_packets_period += 1
print(f"num seq (NA): {globals.num_packets_period}")
# We now build the energy DB
save_energy(pkt, na_pkt)
# We now build the neighbors DB
save_neighbors(pkt, na_pkt)
# return
return
# case sdn_protocols.SDN_PROTO_NC_ROUTE:
# # rt_pkt = process_nc_route_packet(pkt.payload, pkt.tlen-SDN_IPH_LEN)
# ack_queue.put(pkt)
Expand All @@ -81,12 +86,17 @@ def handle_serial_packet(data, ack_queue):
"bad Data packet"
return
# Add to number of pkts received during this period
if not data_pkt.cycle_seq == globals.sequence:
return
print(repr(pkt))
print(repr(data_pkt))
globals.num_packets_period += 1
print(f"num seq (data): {globals.num_packets_period}")
# We now build the pdr DB
save_pdr(pkt, data_pkt)
# We now build the delay DB
save_delay(pkt, data_pkt)
# return
return
case _:
print("sdn IP packet type not found")
return
Expand Down

0 comments on commit 6bd2980

Please sign in to comment.