-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcar.py
136 lines (114 loc) · 5.19 KB
/
car.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from stats import Statistics
from task import Task
import simpy
import random
import numpy as np
class Car:
def __init__(self, env, sim, speed = None, position: tuple[float, float] = None):
self.env = env
self.sim = sim
# Parameters
self.id = "c" + str(sim.set_car_id())
self.processing_power = sim.get_im_parameter('car_processing_power')()
self.num_tasks = sim.get_im_parameter('task_generation')()
self.lambda_exp = sim.get_im_parameter('lambda_exp')()
self.dwell_time = 10
self.processor = simpy.Resource(self.env, capacity=1)
self.idle = True
self.current_task = None
self.generated_tasks = []
self.assigned_tasks = []
self.active_processes = []
# Mobility
self.speed = speed
self.position = position
# Statistics
self.generated_tasks_count = 0
self.successful_tasks = 0
self.total_processing_time = 0
self.processed_tasks_count = 0
self.time_of_arrival = self.env.now
def generate_tasks(self):
try:
while True:
yield self.env.timeout(random.expovariate(self.lambda_exp))
task = Task(self.env, self.sim, self)
self.generated_tasks.append(task)
print(f"Car {self.id} generated a Task: {task.__dict__}")
self.generated_tasks_count += 1
except simpy.Interrupt:
print(f"Process generate_tasks() for car {self.id} interrupted!")
def generate_tasks_static(self):
"""
Tasks generated with this method will have the same time of arrival (TOA)
"""
self.generated_tasks = [Task(self.env, self.sim, self) for _ in range(self.num_tasks)]
for task in self.generated_tasks:
print(f"Car {self.id} generated Task {task.id}: {task.__dict__}")
self.generated_tasks_count += 1
def process_task(self, selected_task):
try:
with self.processor.request() as req:
yield req # Wait to acquire the requested resource
# Housekeeping
assert(selected_task == self.assigned_tasks[0])
self.current_task = self.assigned_tasks.pop(0)
self.current_task.processing_start = self.env.now
processing_time = self.calculate_processing_time(selected_task)
# Start processing
yield self.env.timeout(processing_time)
# Finished processing
# Update metrics
self.total_processing_time += processing_time
self.processed_tasks_count += 1
if self.env.now - self.current_task.time_of_arrival <= self.current_task.deadline:
self.successful_tasks += 1
self.current_task.status = 2
else:
self.current_task.status = 6
print(f"@t={self.env.now}, Car {self.id} finished computing Task: {selected_task.id}!")
self.current_task.processing_end = self.env.now
Statistics.save_task_stats(self.current_task, self.id)
self.current_task = None
except simpy.Interrupt:
print(f"Process process_task() for car {self.id} interrupted!")
# Record the statistics for the task who's process was interrupted
if(self.current_task):
self.current_task.status = 4
Statistics.save_task_stats(self.current_task, self.id)
self.current_task = None
finally:
self.idle = True
if self.env.active_process in self.active_processes:
self.active_processes.remove(self.env.active_process)
def calculate_waiting_time(self):
return sum(task.complexity / self.processing_power for task in self.assigned_tasks)
def calculate_processing_time(self, task):
return task.complexity / self.processing_power
def get_remaining_time(self):
if self.current_task is None:
return 0
remaining_time = (self.current_task.complexity / self.processing_power) - (self.env.now - self.current_task.processing_start)
return remaining_time
def update(self, speed, position):
self.speed = speed
self.position = position
def finish(self):
"""
This function is called in the end of the lifetime of a vehicle.
For static vehicles: When the dwell time expires
For dynamic vehicles: When they leave the scenario (traci)
"""
Statistics.save_car_stats(self, self.env.now)
for task in self.assigned_tasks:
Statistics.save_task_stats(task, "NA")
self.assigned_tasks.clear()
for task in self.generated_tasks:
Statistics.save_task_stats(task, "NA")
self.generated_tasks.clear()
# Interrupt the processes associated with this Car
for process in list(self.active_processes): # Copy the list to avoid modifying it during iteration
if process.is_alive:
process.interrupt()
self.active_processes.clear()
# NOTE: Make reporting of tasks statistics a method of Tasks class