Skip to content

Commit

Permalink
Refactored tests
Browse files Browse the repository at this point in the history
  • Loading branch information
PointlessUser committed Oct 7, 2023
1 parent e429248 commit 56e7049
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 38 deletions.
5 changes: 1 addition & 4 deletions test/test_eps_hk_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@

def testAllCommandsToOBC():

i = 0
while i < 9:
for _ in range(9):
test.send('eps.time_management.get_eps_time')
test.send('eps.cli.general_telemetry')
time.sleep(1800)
i += 1

test.summary() # call when done to print summary of tests


Expand Down
65 changes: 31 additions & 34 deletions test/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, dt : datetime):
self.cron["hr"] = dt.hour
self.cron["day"] = dt.day
self.cron["mon"] = dt.month
self.cron["yr"] = dt.year;
self.cron["yr"] = dt.year

def _setCron(self, dt : datetime):
if self.cron["msec"] != '*':
Expand All @@ -61,17 +61,14 @@ def _setCron(self, dt : datetime):
if self.cron["mon"] != '*':
self.cron["mon"] = dt.month
if self.cron["yr"] != '*':
self.cron["yr"] = dt.year;
self.cron["yr"] = dt.year

def inc(self, relTime : timedelta):
self.dt = self.dt + relTime
self._setCron(self.dt)

def __str__(self):
fmt = ""
for key in self.cron:
fmt += "{} ".format(self.cron[key])
return fmt
return "".join(f"{self.cron[key]} " for key in self.cron)

def test_time():
# Get the current satellite time and adjust it if necessary. By updating
Expand All @@ -86,20 +83,20 @@ def test_time():

response = transactObj.execute()
if response == {} or response['err'] != 0:
print("get_time error: {}".format(response))
print(f"get_time error: {response}")
return

print("now: {}, sat: {}".format(now, response))
print(f"now: {now}, sat: {response}")
sat = int(response['timestamp'])
dt = datetime.fromtimestamp(sat)
# Arbitrarily decide that a 10 second diference is "close enough"
if abs(sat - now) < 10:
print("satellite time {} (delta {})".format(dt, sat - now))
print(f"satellite time {dt} (delta {sat - now})")
return

print("adjusting satellite time {} (delta {})".format(dt, sat - now))
print(f"adjusting satellite time {dt} (delta {sat - now})")

cmd = "ex2.time_management.set_time({})".format(now)
cmd = f"ex2.time_management.set_time({now})"
transactObj = gs.interactive.getTransactionObject(cmd, gs.networkManager)
# set the satellite's time to this script's time
response = transactObj.execute()
Expand All @@ -113,7 +110,7 @@ def test_set_multiple():
# 2. use scedhuler.set_schedule to upload the schedule
# 3. wait until the scheduled task has executed
# 4. get the satellite log to check that the task was executed

dt = datetime.now(timezone.utc)
task1 = CronTime(dt)
task2 = CronTime(dt)
Expand All @@ -127,32 +124,32 @@ def test_set_multiple():
with open(schedFile, "w") as f:
# Note that the tasks are specified out of chronological order
task1.inc(timedelta(seconds = sleep_time))
f.write("{} {}\n".format(task1, "ex2.time_management.get_time"))
f.write(f"{task1} ex2.time_management.get_time\n")
task2.inc(timedelta(seconds = sleep_time - 10))
f.write("{} {}\n".format(task2, "ex2.time_management.get_time"))
f.write(f"{task2} ex2.time_management.get_time\n")
task3.inc(timedelta(seconds = sleep_time - 30))
f.write("{} {}\n".format(task3, "ex2.time_management.get_time"))
f.write(f"{task3} ex2.time_management.get_time\n")
task4.inc(timedelta(seconds = sleep_time - 20))
f.write("{} {}\n".format(task4, "ex2.time_management.get_time"))
f.write(f"{task4} ex2.time_management.get_time\n")

# Upload the schedule file to the satellite
cmd = "ex2.scheduler.set_schedule({})".format(schedFile)
cmd = f"ex2.scheduler.set_schedule({schedFile})"
transactObj = gs.interactive.getTransactionObject(cmd, gs.networkManager)

# set the schedule
response = transactObj.execute()
print("set_schedule response: {}".format(response))
response = transactObj.execute()
print(f"set_schedule response: {response}")

assert response != {}, "set_schedule - no response"
assert response['err'] == 0, "set_schedule error {}".format(response['err'])
assert response['err'] == 0, f"set_schedule error {response['err']}"

cmds = get_schedule()
for c in cmds:
cmd_dt = datetime.fromtimestamp(c['next'], timezone.utc)
diff = cmd_dt - dt
print("next: {} ({}), diff {}".format(cmd_dt, c['next'], diff.total_seconds()))
print(f"next: {cmd_dt} ({c['next']}), diff {diff.total_seconds()}")

print("sleeping for {} seconds".format(sleep_time))
print(f"sleeping for {sleep_time} seconds")
time.sleep(sleep_time*2)

# Thes task should have executed now. There are a couple of ways to read
Expand All @@ -175,7 +172,7 @@ def test_set_multiple():
if line.find("get_time") != -1:
fields = line.split(":")
timestamp = int(fields[1].strip())
print("executed at {} vs desired {}".format(timestamp, cmds[i]['next']))
print(f"executed at {timestamp} vs desired {cmds[i]['next']}")
i = i+1
# print("longest_len {}".format(longest_len))

Expand All @@ -187,7 +184,7 @@ def test_set_periodic():
# 2. use scedhuler.set_schedule to upload the schedule
# 3. wait until the scheduled task has executed
# 4. get the satellite log to check that the task was executed

dt = datetime.now(timezone.utc)
task1 = CronTime(dt)
task2 = CronTime(dt)
Expand All @@ -204,26 +201,26 @@ def test_set_periodic():

schedFile = "periodic-schedule.txt"
with open(schedFile, "w") as f:
f.write("{} {}\n".format(task1, "ex2.time_management.get_time"))
f.write("{} {}\n".format(task2, "ex2.time_management.get_time"))
f.write(f"{task1} ex2.time_management.get_time\n")
f.write(f"{task2} ex2.time_management.get_time\n")

# Upload the schedule file to the satellite
cmd = "ex2.scheduler.set_schedule({})".format(schedFile)
cmd = f"ex2.scheduler.set_schedule({schedFile})"
transactObj = gs.interactive.getTransactionObject(cmd, gs.networkManager)
# set the schedule
response = transactObj.execute()
print("set_schedule response: {}".format(response))
response = transactObj.execute()
print(f"set_schedule response: {response}")

assert response != {}, "set_schedule - no response"
assert response['err'] == 0, "set_schedule error {}".format(response['err'])
assert response['err'] == 0, f"set_schedule error {response['err']}"

cmds = get_schedule()
for c in cmds:
cmd_dt = datetime.fromtimestamp(c['next'], timezone.utc)
diff = cmd_dt - dt
print("next: {} ({}), diff {}".format(cmd_dt, c['next'], diff.total_seconds()))
print(f"next: {cmd_dt} ({c['next']}), diff {diff.total_seconds()}")

print("sleeping for {} seconds".format(sleep_time))
print(f"sleeping for {sleep_time} seconds")
time.sleep(sleep_time*2)

cmd = "ex2.logger.get_file"
Expand All @@ -240,7 +237,7 @@ def test_set_periodic():
if line.find("get_time") != -1:
fields = line.split(":")
timestamp = int(fields[1].strip())
print("{}: executed at {}".format(i, timestamp))
print(f"{i}: executed at {timestamp}")
i = i+1

def get_schedule():
Expand Down Expand Up @@ -281,7 +278,7 @@ def test_scheduler_delete():
transactObj = gs.interactive.getTransactionObject(cmd, gs.networkManager)

response = transactObj.execute()
print("delete_schedule response: {}".format(response))
print(f"delete_schedule response: {response}")

# Send the delete schedule command. It shouldn't matter if there is a
# schedule present or not.
Expand Down

0 comments on commit 56e7049

Please sign in to comment.