Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/506 set data buffer method in storage unit #507

Merged
merged 4 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import inspect
import os
import pytest
import numpy as np

filename = inspect.getframeinfo(inspect.currentframe()).filename
path = os.path.dirname(os.path.abspath(filename))
Expand All @@ -26,31 +28,29 @@

# Import all of the modules that we are going to be called in this simulation
from Basilisk.utilities import SimulationBaseClass
from Basilisk.utilities import unitTestSupport # general support file with common unit test functions
from Basilisk.simulation import partitionedStorageUnit
from Basilisk.architecture import messaging
from Basilisk.utilities import macros

def test_module(show_plots):
# each test method requires a single assert method to be called
[testResults, testMessage] = check_storage_limits(show_plots)
assert testResults < 1, testMessage

params_storage_limits = [(1200, 1200, 2400, 2400),
(600, 1200, 3600, 3600),
(600, 600, 10000, 6000)]

def check_storage_limits(show_plots):
@pytest.mark.parametrize("baudRate_1, baudRate_2, storageCapacity, expectedStorage",
params_storage_limits)
def test_storage_limits(baudRate_1, baudRate_2, storageCapacity, expectedStorage):
"""
Tests:

1. Whether the partitionedStorageUnit can add multiple nodes (core base class functionality);
2. That the partitionedStorageUnit correctly evaluates how much stored data it should have given a pair of
1200 baud input messages.
1. Whether the partitionedStorageUnit can add multiple nodes (core base class
functionality);
2. That the partitionedStorageUnit correctly evaluates how much stored data it
should have given a pair of baud input messages.

:param show_plots: Not used; no plots to be shown.
:return:
"""

testFailCount = 0 # zero unit test result counter
testMessages = [] # create empty array to store test log messages
unitTaskName = "unitTask" # arbitrary name (don't change)
unitProcessName = "TestProcess" # arbitrary name (don't change)

Expand All @@ -63,15 +63,15 @@ def check_storage_limits(show_plots):
testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate))

test_storage_unit = partitionedStorageUnit.PartitionedStorageUnit()
test_storage_unit.storageCapacity = 2400. # bit capacity.
test_storage_unit.storageCapacity = storageCapacity # bit capacity.

dataMsg1 = messaging.DataNodeUsageMsgPayload()
dataMsg1.baudRate = 1200. # baud
dataMsg1.baudRate = baudRate_1 # baud
dataMsg1.dataName = "node_1_msg"
dat1Msg = messaging.DataNodeUsageMsg().write(dataMsg1)

dataMsg2 = messaging.DataNodeUsageMsgPayload()
dataMsg2.baudRate = 1200. # baud
dataMsg2.baudRate = baudRate_2 # baud
dataMsg2.dataName = "node_2_msg"
dat2Msg = messaging.DataNodeUsageMsg().write(dataMsg2)

Expand All @@ -93,36 +93,227 @@ def check_storage_limits(show_plots):
capacityLog = dataLog.storageCapacity
netBaudLog = dataLog.currentNetBaud

# Check 1 - is net baud rate equal to 2400.?
# Check 1 - is net baud rate correct?
for ind in range(0,len(netBaudLog)):
currentBaud = netBaudLog[ind]
if currentBaud !=2400.:
testFailCount +=1
testMessages.append("FAILED: PartitionedStorageUnit did not correctly log the net baud rate.")
np.testing.assert_allclose(currentBaud, baudRate_1 + baudRate_2, atol=1e-1,
err_msg=("FAILED: PartitionedStorageUnit did not correctly log baud rate."))

#print(netBaudLog)
# Check 2 - is used storage space correct?
np.testing.assert_allclose(storedDataLog[-1], expectedStorage, atol=1e-4,
err_msg=("FAILED: PartitionedStorageUnit did not track integrated data."))

if not unitTestSupport.isDoubleEqualRelative((2400.),storedDataLog[-1], 1e-8):
testFailCount+=1
testMessages.append("FAILED: PartitionedStorageUnit did not track integrated data. Returned "+str(storedDataLog[-1,1])+", expected "+str((2400.)))
# Check 3 - is the amount of data more than zero and less than the capacity?
for ind in range(0,len(storedDataLog)):
assert storedDataLog[ind] <= capacityLog[ind] or np.isclose(storedDataLog[ind],
capacityLog[ind]), (
"FAILED: PartitionedStorageUnit's stored data exceeded its capacity.")

assert storedDataLog[ind] >= 0., (
"FAILED: PartitionedStorageUnit's stored data was negative.")


params_set_data = [(1200, 1200, ['test'], [1200], 2400, 2400),
(600, 600, ['test'], [0], 10000, 6000),
(600, 600, ['test'], [4000], 10000, 10000),
(0, 0, ['test'], [1000], 2000, 1000),
(0, 0, ['test'], [-1000], 2000, 0),
(1000, 0, ['node_1_msg'], [-2000], 6000, 3000),
(0, 0, ['test'], [3000], 2000, 0),
(600, 0, ['node_1_msg'], [3000], 10000, 6000),
(300, 600, ['node_1_msg'], [3000], 10000, 7500),
(600, 600, ['node_1_msg', 'node_2_msg'], [1000, 1000], 10000, 8000),
(600, 300, ['test', 'node_2_msg'], [1000, 1000], 10000, 6500)]

@pytest.mark.parametrize(
"baudRate_1, baudRate_2, partitionName, add_data, storageCapacity, expectedStorage",
params_set_data)
def test_set_data_buffer(baudRate_1, baudRate_2, partitionName,
add_data, storageCapacity, expectedStorage):
"""
Tests:

1. Whether the partitionedStorageUnit properly adds data in different partitions
using the setDataBuffer method;
2. That the partitionedStorageUnit correctly evaluates how much stored data it
should have given a pair of input messages and using setDataBuffer.

:return:
"""

unitTaskName = "unitTask" # arbitrary name (don't change)
unitProcessName = "TestProcess" # arbitrary name (don't change)

# Create a sim module as an empty container
unitTestSim = SimulationBaseClass.SimBaseClass()

# Create test thread
testProcessRate = macros.sec2nano(0.1) # update process rate update time
testProc = unitTestSim.CreateNewProcess(unitProcessName)
testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate))

test_storage_unit = partitionedStorageUnit.PartitionedStorageUnit()
test_storage_unit.storageCapacity = storageCapacity # bit capacity.

dataMsg1 = messaging.DataNodeUsageMsgPayload()
dataMsg1.baudRate = baudRate_1 # baud
dataMsg1.dataName = "node_1_msg"
dat1Msg = messaging.DataNodeUsageMsg().write(dataMsg1)

dataMsg2 = messaging.DataNodeUsageMsgPayload()
dataMsg2.baudRate = baudRate_2 # baud
dataMsg2.dataName = "node_2_msg"
dat2Msg = messaging.DataNodeUsageMsg().write(dataMsg2)

# Test the addNodeToStorage method:
test_storage_unit.addDataNodeToModel(dat1Msg)
test_storage_unit.addDataNodeToModel(dat2Msg)

unitTestSim.AddModelToTask(unitTaskName, test_storage_unit)

dataLog = test_storage_unit.storageUnitDataOutMsg.recorder()
unitTestSim.AddModelToTask(unitTaskName, dataLog)

#print(storedDataLog)
# Initialize the partition
initData = [0 for i in range(0, len(partitionName))]
test_storage_unit.setDataBuffer(partitionName, initData)

unitTestSim.InitializeSimulation()
sim_time = 5.0
unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time - 1.0))
unitTestSim.ExecuteSimulation()

test_storage_unit.setDataBuffer(partitionName, add_data)
unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time))
unitTestSim.ExecuteSimulation()

storedDataLog = dataLog.storageLevel
capacityLog = dataLog.storageCapacity
netBaudLog = dataLog.currentNetBaud

# Check 1 - is net baud rate correct?
for ind in range(0,len(netBaudLog)):
currentBaud = netBaudLog[ind]
np.testing.assert_allclose(currentBaud, baudRate_1 + baudRate_2, atol=1e-4,
err_msg=("FAILED: PartitionedStorageUnit did not correctly log baud rate."))

# Check 2 - is used storage space correct?
np.testing.assert_allclose(storedDataLog[-1], expectedStorage, atol=1e-4,
err_msg=("FAILED: PartitionedStorageUnit did not track integrated data."))

# Check 3 - is the amount of data more than zero and less than the capacity?
for ind in range(0,len(storedDataLog)):
if storedDataLog[ind] > capacityLog[ind]:
testFailCount +=1
testMessages.append("FAILED: PartitionedStorageUnit's stored data exceeded its capacity.")
assert storedDataLog[ind] <= capacityLog[ind] or np.isclose(storedDataLog[ind],
capacityLog[ind]), (
"FAILED: PartitionedStorageUnit's stored data exceeded its capacity.")

assert storedDataLog[ind] >= 0., (
"FAILED: PartitionedStorageUnit's stored data was negative.")


params_set_data = [(600, 600, ['test'], [0], 1E4),
(0, 0, ['test'], [1000], 2000),
(1000, 0, ['node_1_msg'], [-2000], 6000),
(600, 0, ['node_1_msg'], [3000], 1e4),
(600, 600, ['node_1_msg'], [3000], 10000),
(600, 600, ['node_1_msg', 'node_2_msg'], [1e3, 1e3], 10000),
(600, 300, ['test', 'node_1_msg', 'node_2_msg'], [1500, 1e3, 1000], 10000)]

@pytest.mark.parametrize(
"baudRate_1, baudRate_2, partitionName, add_data, storageCapacity",
params_set_data)
def test_set_data_buffer_partition(baudRate_1, baudRate_2, partitionName,
add_data, storageCapacity):

"""
Tests:

1. Whether the partitionedStorageUnit manages the data in already existing
and new partitions correctly;

:return:
"""

unitTaskName = "unitTask" # arbitrary name (don't change)
unitProcessName = "TestProcess" # arbitrary name (don't change)

# Create a sim module as an empty container
unitTestSim = SimulationBaseClass.SimBaseClass()

# Create test thread
testProcessRate = macros.sec2nano(0.1) # update process rate update time
testProc = unitTestSim.CreateNewProcess(unitProcessName)
testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, testProcessRate))

test_storage_unit = partitionedStorageUnit.PartitionedStorageUnit()
test_storage_unit.storageCapacity = storageCapacity # bit capacity.

dataMsg1 = messaging.DataNodeUsageMsgPayload()
dataMsg1.baudRate = baudRate_1 # baud
dataMsg1.dataName = "node_1_msg"
dat1Msg = messaging.DataNodeUsageMsg().write(dataMsg1)

dataMsg2 = messaging.DataNodeUsageMsgPayload()
dataMsg2.baudRate = baudRate_2 # baud
dataMsg2.dataName = "node_2_msg"
dat2Msg = messaging.DataNodeUsageMsg().write(dataMsg2)

# Test the addNodeToStorage method:
test_storage_unit.addDataNodeToModel(dat1Msg)
test_storage_unit.addDataNodeToModel(dat2Msg)

unitTestSim.AddModelToTask(unitTaskName, test_storage_unit)

dataLog = test_storage_unit.storageUnitDataOutMsg.recorder()
unitTestSim.AddModelToTask(unitTaskName, dataLog)

# Initialize the partition
initData = [0 for i in range(0, len(partitionName))]
test_storage_unit.setDataBuffer(partitionName, initData)

unitTestSim.InitializeSimulation()
sim_time = 5.0
unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time - 1.0))
unitTestSim.ExecuteSimulation()

test_storage_unit.setDataBuffer(partitionName, add_data)
unitTestSim.ConfigureStopTime(macros.sec2nano(sim_time))
unitTestSim.ExecuteSimulation()

dataNameVec = dataLog.storedDataName
dataVec = dataLog.storedData

# Check 1 - are the partition names in the data name vector?
for partName in partitionName:
assert partName in list(dataNameVec[-1]), (
"FAILED: PartitionedStorageUnit did not add the new partition.")

if storedDataLog[ind] < 0.:
testFailCount +=1
testMessages.append("FAILED: PartitionedStorageUnit's stored data was negative.")
partIndex = list(dataNameVec[-1]).index(partName)
dataIndex = list(partitionName).index(partName)
if partName == "test":
baudRate = 0.0
elif partName == "node_1_msg":
baudRate = baudRate_1
elif partName == "node_2_msg":
baudRate = baudRate_2

if testFailCount:
print(testMessages)
else:
print("Passed")
return [testFailCount, ''.join(testMessages)]
# Check 2 - if partition exists, does it added data to the correct partition?
np.testing.assert_allclose(dataVec[-1][partIndex],
add_data[dataIndex] + baudRate*sim_time,
err_msg = (
"FAILED: PartitionedStorageUnit did not use the correct partition."))


if __name__ == "__main__":
print(test_module(False))
baudRate_1 = 1200
baudRate_2 = 1200
storageCapacity = 2400
expectedStorage = 2400
test_storage_limits(baudRate_1, baudRate_2, storageCapacity, expectedStorage)
add_data = [1200, 200]
partitionName = ["test", "node_1_msg"]
test_set_data_buffer(baudRate_1, baudRate_2, partitionName,
add_data, storageCapacity, expectedStorage)
storageCapacity = 20000
test_set_data_buffer_partition(baudRate_1, baudRate_2, partitionName,
add_data, storageCapacity)
Loading