Skip to content

Commit

Permalink
Updated tests for setDataBuffer method
Browse files Browse the repository at this point in the history
  • Loading branch information
LorenzzoQM committed Nov 22, 2023
1 parent 5b87a1a commit 228265c
Show file tree
Hide file tree
Showing 2 changed files with 376 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import inspect
import os
import pytest
import numpy as np

filename = inspect.getframeinfo(inspect.currentframe()).filename
path = os.path.dirname(os.path.abspath(filename))
Expand All @@ -26,31 +28,29 @@

# Import all of the modules that we are going to be called in this simulation
from Basilisk.utilities import SimulationBaseClass
from Basilisk.utilities import unitTestSupport # general support file with common unit test functions
from Basilisk.simulation import partitionedStorageUnit
from Basilisk.architecture import messaging
from Basilisk.utilities import macros

def test_module(show_plots):
# each test method requires a single assert method to be called
[testResults, testMessage] = check_storage_limits(show_plots)
assert testResults < 1, testMessage

params_storage_limits = [(1200, 1200, 2400, 2400),
(600, 1200, 3600, 3600),
(600, 600, 10000, 6000)]

def check_storage_limits(show_plots):
@pytest.mark.parametrize("baudRate_1, baudRate_2, storageCapacity, expectedStorage",
params_storage_limits)
def test_storage_limits(baudRate_1, baudRate_2, storageCapacity, expectedStorage):
"""
Tests:
1. Whether the partitionedStorageUnit can add multiple nodes (core base class functionality);
2. That the partitionedStorageUnit correctly evaluates how much stored data it should have given a pair of
1200 baud input messages.
1. Whether the partitionedStorageUnit can add multiple nodes (core base class
functionality);
2. That the partitionedStorageUnit correctly evaluates how much stored data it
should have given a pair of baud input messages.
:param show_plots: Not used; no plots to be shown.
:return:
"""

testFailCount = 0 # zero unit test result counter
testMessages = [] # create empty array to store test log messages
unitTaskName = "unitTask" # arbitrary name (don't change)
unitProcessName = "TestProcess" # arbitrary name (don't change)

Expand All @@ -63,15 +63,15 @@ def check_storage_limits(show_plots):
testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate))

test_storage_unit = partitionedStorageUnit.PartitionedStorageUnit()
test_storage_unit.storageCapacity = 2400. # bit capacity.
test_storage_unit.storageCapacity = storageCapacity # bit capacity.

dataMsg1 = messaging.DataNodeUsageMsgPayload()
dataMsg1.baudRate = 1200. # baud
dataMsg1.baudRate = baudRate_1 # baud
dataMsg1.dataName = "node_1_msg"
dat1Msg = messaging.DataNodeUsageMsg().write(dataMsg1)

dataMsg2 = messaging.DataNodeUsageMsgPayload()
dataMsg2.baudRate = 1200. # baud
dataMsg2.baudRate = baudRate_2 # baud
dataMsg2.dataName = "node_2_msg"
dat2Msg = messaging.DataNodeUsageMsg().write(dataMsg2)

Expand All @@ -93,36 +93,227 @@ def check_storage_limits(show_plots):
capacityLog = dataLog.storageCapacity
netBaudLog = dataLog.currentNetBaud

# Check 1 - is net baud rate equal to 2400.?
# Check 1 - is net baud rate correct?
for ind in range(0,len(netBaudLog)):
currentBaud = netBaudLog[ind]
if currentBaud !=2400.:
testFailCount +=1
testMessages.append("FAILED: PartitionedStorageUnit did not correctly log the net baud rate.")
np.testing.assert_allclose(currentBaud, baudRate_1 + baudRate_2, atol=1e-1,
err_msg=("FAILED: PartitionedStorageUnit did not correctly log baud rate."))

#print(netBaudLog)
# Check 2 - is used storage space correct?
np.testing.assert_allclose(storedDataLog[-1], expectedStorage, atol=1e-4,
err_msg=("FAILED: PartitionedStorageUnit did not track integrated data."))

if not unitTestSupport.isDoubleEqualRelative((2400.),storedDataLog[-1], 1e-8):
testFailCount+=1
testMessages.append("FAILED: PartitionedStorageUnit did not track integrated data. Returned "+str(storedDataLog[-1,1])+", expected "+str((2400.)))
# Check 3 - is the amount of data more than zero and less than the capacity?
for ind in range(0,len(storedDataLog)):
assert storedDataLog[ind] <= capacityLog[ind] or np.isclose(storedDataLog[ind],
capacityLog[ind]), (
"FAILED: PartitionedStorageUnit's stored data exceeded its capacity.")

assert storedDataLog[ind] >= 0., (
"FAILED: PartitionedStorageUnit's stored data was negative.")


params_set_data = [(1200, 1200, ['test'], [1200], 2400, 2400),
(600, 600, ['test'], [0], 10000, 6000),
(600, 600, ['test'], [4000], 10000, 10000),
(0, 0, ['test'], [1000], 2000, 1000),
(0, 0, ['test'], [-1000], 2000, 0),
(1000, 0, ['node_1_msg'], [-2000], 6000, 3000),
(0, 0, ['test'], [3000], 2000, 0),
(600, 0, ['node_1_msg'], [3000], 10000, 6000),
(300, 600, ['node_1_msg'], [3000], 10000, 7500),
(600, 600, ['node_1_msg', 'node_2_msg'], [1000, 1000], 10000, 8000),
(600, 300, ['test', 'node_2_msg'], [1000, 1000], 10000, 6500)]

@pytest.mark.parametrize(
"baudRate_1, baudRate_2, partitionName, add_data, storageCapacity, expectedStorage",
params_set_data)
def test_set_data_buffer(baudRate_1, baudRate_2, partitionName,
add_data, storageCapacity, expectedStorage):
"""
Tests:
1. Whether the partitionedStorageUnit properly adds data in different partitions
using the setDataBuffer method;
2. That the partitionedStorageUnit correctly evaluates how much stored data it
should have given a pair of input messages and using setDataBuffer.
:return:
"""

unitTaskName = "unitTask" # arbitrary name (don't change)
unitProcessName = "TestProcess" # arbitrary name (don't change)

# Create a sim module as an empty container
unitTestSim = SimulationBaseClass.SimBaseClass()

# Create test thread
testProcessRate = macros.sec2nano(0.1) # update process rate update time
testProc = unitTestSim.CreateNewProcess(unitProcessName)
testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate))

test_storage_unit = partitionedStorageUnit.PartitionedStorageUnit()
test_storage_unit.storageCapacity = storageCapacity # bit capacity.

dataMsg1 = messaging.DataNodeUsageMsgPayload()
dataMsg1.baudRate = baudRate_1 # baud
dataMsg1.dataName = "node_1_msg"
dat1Msg = messaging.DataNodeUsageMsg().write(dataMsg1)

dataMsg2 = messaging.DataNodeUsageMsgPayload()
dataMsg2.baudRate = baudRate_2 # baud
dataMsg2.dataName = "node_2_msg"
dat2Msg = messaging.DataNodeUsageMsg().write(dataMsg2)

# Test the addNodeToStorage method:
test_storage_unit.addDataNodeToModel(dat1Msg)
test_storage_unit.addDataNodeToModel(dat2Msg)

unitTestSim.AddModelToTask(unitTaskName, test_storage_unit)

dataLog = test_storage_unit.storageUnitDataOutMsg.recorder()
unitTestSim.AddModelToTask(unitTaskName, dataLog)

#print(storedDataLog)
# Initialize the partition
initData = [0 for i in range(0, len(partitionName))]
test_storage_unit.setDataBuffer(partitionName, initData)

unitTestSim.InitializeSimulation()
sim_time = 5.0
unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time - 1.0))
unitTestSim.ExecuteSimulation()

test_storage_unit.setDataBuffer(partitionName, add_data)
unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time))
unitTestSim.ExecuteSimulation()

storedDataLog = dataLog.storageLevel
capacityLog = dataLog.storageCapacity
netBaudLog = dataLog.currentNetBaud

# Check 1 - is net baud rate correct?
for ind in range(0,len(netBaudLog)):
currentBaud = netBaudLog[ind]
np.testing.assert_allclose(currentBaud, baudRate_1 + baudRate_2, atol=1e-4,
err_msg=("FAILED: PartitionedStorageUnit did not correctly log baud rate."))

# Check 2 - is used storage space correct?
np.testing.assert_allclose(storedDataLog[-1], expectedStorage, atol=1e-4,
err_msg=("FAILED: PartitionedStorageUnit did not track integrated data."))

# Check 3 - is the amount of data more than zero and less than the capacity?
for ind in range(0,len(storedDataLog)):
if storedDataLog[ind] > capacityLog[ind]:
testFailCount +=1
testMessages.append("FAILED: PartitionedStorageUnit's stored data exceeded its capacity.")
assert storedDataLog[ind] <= capacityLog[ind] or np.isclose(storedDataLog[ind],
capacityLog[ind]), (
"FAILED: PartitionedStorageUnit's stored data exceeded its capacity.")

assert storedDataLog[ind] >= 0., (
"FAILED: PartitionedStorageUnit's stored data was negative.")


params_set_data = [(600, 600, ['test'], [0], 1E4),
(0, 0, ['test'], [1000], 2000),
(1000, 0, ['node_1_msg'], [-2000], 6000),
(600, 0, ['node_1_msg'], [3000], 1e4),
(600, 600, ['node_1_msg'], [3000], 10000),
(600, 600, ['node_1_msg', 'node_2_msg'], [1e3, 1e3], 10000),
(600, 300, ['test', 'node_1_msg', 'node_2_msg'], [1500, 1e3, 1000], 10000)]

@pytest.mark.parametrize(
"baudRate_1, baudRate_2, partitionName, add_data, storageCapacity",
params_set_data)
def test_set_data_buffer_partition(baudRate_1, baudRate_2, partitionName,
add_data, storageCapacity):

"""
Tests:
1. Whether the partitionedStorageUnit manages the data in already existing
and new partitions correctly;
:return:
"""

unitTaskName = "unitTask" # arbitrary name (don't change)
unitProcessName = "TestProcess" # arbitrary name (don't change)

# Create a sim module as an empty container
unitTestSim = SimulationBaseClass.SimBaseClass()

# Create test thread
testProcessRate = macros.sec2nano(0.1) # update process rate update time
testProc = unitTestSim.CreateNewProcess(unitProcessName)
testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate))

test_storage_unit = partitionedStorageUnit.PartitionedStorageUnit()
test_storage_unit.storageCapacity = storageCapacity # bit capacity.

dataMsg1 = messaging.DataNodeUsageMsgPayload()
dataMsg1.baudRate = baudRate_1 # baud
dataMsg1.dataName = "node_1_msg"
dat1Msg = messaging.DataNodeUsageMsg().write(dataMsg1)

dataMsg2 = messaging.DataNodeUsageMsgPayload()
dataMsg2.baudRate = baudRate_2 # baud
dataMsg2.dataName = "node_2_msg"
dat2Msg = messaging.DataNodeUsageMsg().write(dataMsg2)

# Test the addNodeToStorage method:
test_storage_unit.addDataNodeToModel(dat1Msg)
test_storage_unit.addDataNodeToModel(dat2Msg)

unitTestSim.AddModelToTask(unitTaskName, test_storage_unit)

dataLog = test_storage_unit.storageUnitDataOutMsg.recorder()
unitTestSim.AddModelToTask(unitTaskName, dataLog)

# Initialize the partition
initData = [0 for i in range(0, len(partitionName))]
test_storage_unit.setDataBuffer(partitionName, initData)

unitTestSim.InitializeSimulation()
sim_time = 5.0
unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time - 1.0))
unitTestSim.ExecuteSimulation()

test_storage_unit.setDataBuffer(partitionName, add_data)
unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time))
unitTestSim.ExecuteSimulation()

dataNameVec = dataLog.storedDataName
dataVec = dataLog.storedData

# Check 1 - are the partition names in the data name vector?
for partName in partitionName:
assert partName in list(dataNameVec[-1]), (
"FAILED: PartitionedStorageUnit did not add the new partition.")

if storedDataLog[ind] < 0.:
testFailCount +=1
testMessages.append("FAILED: PartitionedStorageUnit's stored data was negative.")
partIndex = list(dataNameVec[-1]).index(partName)
dataIndex = list(partitionName).index(partName)
if partName == "test":
baudRate = 0.0
elif partName == "node_1_msg":
baudRate = baudRate_1
elif partName == "node_2_msg":
baudRate = baudRate_2

if testFailCount:
print(testMessages)
else:
print("Passed")
return [testFailCount, ''.join(testMessages)]
# Check 2 - if partition exists, does it added data to the correct partition?
np.testing.assert_allclose(dataVec[-1][partIndex],
add_data[dataIndex] + baudRate*sim_time,
err_msg = (
"FAILED: PartitionedStorageUnit did not use the correct partition."))


if __name__ == "__main__":
print(test_module(False))
baudRate_1 = 1200
baudRate_2 = 1200
storageCapacity = 2400
expectedStorage = 2400
test_storage_limits(baudRate_1, baudRate_2, storageCapacity, expectedStorage)
add_data = [1200, 200]
partitionName = ["test", "node_1_msg"]
test_set_data_buffer(baudRate_1, baudRate_2, partitionName,
add_data, storageCapacity, expectedStorage)
storageCapacity = 20000
test_set_data_buffer_partition(baudRate_1, baudRate_2, partitionName,
add_data, storageCapacity)
Loading

0 comments on commit 228265c

Please sign in to comment.