-
Notifications
You must be signed in to change notification settings - Fork 0
/
RTOS.py
103 lines (82 loc) · 3.16 KB
/
RTOS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from printer import TaskSetPrinter as Printer
from schedular import *
class RTOS:
"""Real-Time Operating System Class"""
def __init__(self, task_set, mode=RM_MODE, preemptive=False):
"""Initialize the RTOs instance
Args:
task_set (TaskSet): The task set to run on the operating system
mode (int): Mode will choose the algorithm
preemptive (bool): Algorithm is preemptive or not
"""
self.task_set = task_set
self.scheduler = Scheduler(task_set, mode=mode)
self.printer = Printer()
self.preemptive = preemptive
self.busy = False
if mode == RM_MODE:
self.mode = "RM"
elif mode == DM_MODE:
self.mode = "DM"
elif mode == EDF_MODE:
self.mode = "EDF"
def compute_missed_tasks(self):
"""Count the number of missed tasks.
Returns:
int: number of missed tasks.
"""
count = 0;
for task in self.task_set.get_all():
if task.is_missed():
count = count + 1
return count
def run(self, duration):
"""Run the task set on the operating system for a specified duration
Args:
duration (int): The duration to run the task set for
Returns:
float: Utility of CPU
"""
completed_tasks = []
task = None
cpu_busy_time = 0
for i in range(duration):
if not self.preemptive: # if we are not preemptive
if not self.busy:
# get a task to start
task = self.scheduler.schedule(i)
if task != None:
self.busy = True
elif task.done():
task = None
self.busy = False
if task != None: # update task downtime
if not task.do(i):
self.busy = False
task = None
else: # if we are preemptive
# take a task and do it
task = self.scheduler.schedule(i)
if task != None:
task.do(i)
# update cpu busy time
if task != None:
cpu_busy_time = cpu_busy_time+1
completed_tasks.append(task)
# print tasks
self.printer.print_schedule(completed_tasks)
# compute missed tasks
missed = self.compute_missed_tasks()
# print extra information
print()
print("\n==========================================\n")
print(f'Total time: {duration}s')
print(f'CPU Utilization: {100 * cpu_busy_time/duration}')
print(f'Total tasks: {self.task_set.status()}')
print(f'Missed tasks: {missed}')
print('Scheduling Algorithm:')
print(f'\tFeasible: {missed == 0}')
print(f'\tPreemptive: {self.preemptive}')
print(f'\tMode: {self.mode}')
print("\n==========================================\n")
print()