Skip to content

Commit

Permalink
feat: updated store_role
Browse files Browse the repository at this point in the history
  • Loading branch information
Tbaile committed Sep 29, 2023
1 parent 9e57093 commit fbcaa08
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 31 deletions.
45 changes: 26 additions & 19 deletions src/nethsec/mwan/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,19 @@ def __store_member(e_uci: EUci, interface_name: str, metric: int, weight: int) -


def store_rule(e_uci: EUci, name: str, policy: str, protocol: str = None,
source_addresses: str = None, source_ports: str = None,
destination_addresses: str = None, destination_ports: str = None) -> str:
source_address: str = None, source_port: str = None,
destination_address: str = None, destination_port: str = None) -> str:
"""
Stores a rule for mwan3
Args:
e_uci: EUci instance
name: name of the rule, must be unique
policy: policy to use for the rule, must be already set
protocol: must be one of tcp, udp, icmp or all, defaults to 'all'
source_addresses: source addresses to match
source_ports: source ports to match or range
destination_addresses: destination addresses to match
destination_ports: destination ports to match or range
source_address: source addresses to match
source_port: source ports to match or range
destination_address: destination addresses to match
destination_port: destination ports to match or range
Returns:
name of the rule created
Expand All @@ -136,6 +136,7 @@ def store_rule(e_uci: EUci, name: str, policy: str, protocol: str = None,
ValidationError if name is not unique or policy is not valid
"""
rule_config_name = utils.get_id(name.lower(), 15)
rules = utils.get_all_by_type(e_uci, 'mwan3', 'rule').keys()
if rule_config_name in e_uci.get('mwan3').keys():
raise ValidationError('name', 'unique', name)
if policy not in utils.get_all_by_type(e_uci, 'mwan3', 'policy').keys():
Expand All @@ -146,16 +147,17 @@ def store_rule(e_uci: EUci, name: str, policy: str, protocol: str = None,
e_uci.set('mwan3', rule_config_name, 'use_policy', policy)
if protocol is not None:
e_uci.set('mwan3', rule_config_name, 'proto', protocol)
if source_addresses is not None:
e_uci.set('mwan3', rule_config_name, 'src_ip', source_addresses)
if source_ports is not None:
e_uci.set('mwan3', rule_config_name, 'src_port', source_ports)
if destination_addresses is not None:
e_uci.set('mwan3', rule_config_name, 'dest_ip', destination_addresses)
if destination_ports is not None:
e_uci.set('mwan3', rule_config_name, 'dest_port', destination_ports)
if source_address is not None:
e_uci.set('mwan3', rule_config_name, 'src_ip', source_address)
if source_port is not None:
e_uci.set('mwan3', rule_config_name, 'src_port', source_port)
if destination_address is not None:
e_uci.set('mwan3', rule_config_name, 'dest_ip', destination_address)
if destination_port is not None:
e_uci.set('mwan3', rule_config_name, 'dest_port', destination_port)

e_uci.save('mwan3')
order_rules(e_uci, [rule_config_name] + list(rules))
return f'mwan3.{rule_config_name}'


Expand Down Expand Up @@ -198,13 +200,14 @@ def store_policy(e_uci: EUci, name: str, interfaces: list[dict]) -> list[str]:

def __fetch_interface_status(interface_name: str) -> str:
try:
output = subprocess.check_output([
output = (subprocess.run([
'ubus',
'call',
'mwan3',
'status',
'{"section": "interfaces"}'
]).decode('utf-8')
], capture_output=True)
.stdout.decode('utf-8'))
decoded_output = json.JSONDecoder().decode(output)
return decoded_output['interfaces'][interface_name]['status']
except:
Expand Down Expand Up @@ -343,9 +346,13 @@ def index_rules(e_uci: EUci) -> list[dict]:
if 'proto' in rule_value:
rule_data['protocol'] = rule_value['proto']
if 'src_ip' in rule_value:
rule_data['source_addresses'] = rule_value['src_ip']
rule_data['source_address'] = rule_value['src_ip']
if 'src_port' in rule_value:
rule_data['source_port'] = rule_value['src_port']
if 'dest_ip' in rule_value:
rule_data['destination_addresses'] = rule_value['dest_ip']
rule_data['destination_address'] = rule_value['dest_ip']
if 'dest_port' in rule_value:
rule_data['destination_port'] = rule_value['dest_port']

data.append(rule_data)
return data
Expand All @@ -364,7 +371,7 @@ def order_rules(e_uci: EUci, rules: list[str]) -> list[str]:

order.extend(rules)

subprocess.check_output([
subprocess.run([
'ubus',
'call',
'uci',
Expand Down
50 changes: 38 additions & 12 deletions tests/test_mwan.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ def test_create_member(e_uci):
assert mwan.__store_member(e_uci, 'RED_1', 1, 100) == ('ns_RED_1_M1_W100', True)


def test_create_default_mwan(e_uci):
def test_create_default_mwan(e_uci, mocker):
mocker.patch('subprocess.run')
assert mwan.store_policy(e_uci, 'default', [
{
'name': 'RED_1',
Expand All @@ -165,7 +166,8 @@ def test_create_default_mwan(e_uci):
'ns_RED_1_M10_W200', 'ns_RED_2_M20_W100')


def test_create_unique_mwan(e_uci):
def test_create_unique_mwan(e_uci, mocker):
mocker.patch('subprocess.run')
mwan.store_policy(e_uci, 'this', [])
with pytest.raises(ValueError):
mwan.store_policy(e_uci, 'this', [])
Expand All @@ -178,7 +180,8 @@ def test_metric_generation(e_uci):
assert mwan.__generate_metric(e_uci, [4, 3, 1]) == 2


def test_list_policies(e_uci):
def test_list_policies(e_uci, mocker):
mocker.patch('subprocess.run')
mwan.store_policy(e_uci, 'backup', [
{
'name': 'RED_1',
Expand Down Expand Up @@ -263,44 +266,64 @@ def test_list_policies(e_uci):
assert index[2]['members'][20][0]['weight'] == '100'


def test_store_rule(e_uci):
def test_store_rule(e_uci, mocker):
mocker.patch('subprocess.run')
mwan.store_policy(e_uci, 'default', [
{
'name': 'RED_1',
'metric': '20',
'weight': '100',
}
])
assert mwan.store_rule(e_uci, 'additional rule', 'ns_default') == 'mwan3.ns_additional_r'
assert mwan.store_rule(e_uci, 'additional rule', 'ns_default', 'udp', '192.168.1.1/24', '1:1024', '10.0.0.2/12',
'22,443') == 'mwan3.ns_additional_r'
assert e_uci.get('mwan3', 'ns_additional_r') == 'rule'
assert e_uci.get('mwan3', 'ns_additional_r', 'label') == 'additional rule'
assert e_uci.get('mwan3', 'ns_additional_r', 'use_policy') == 'ns_default'
assert e_uci.get('mwan3', 'ns_additional_r', 'proto') == 'udp'
assert e_uci.get('mwan3', 'ns_additional_r', 'src_ip') == '192.168.1.1/24'
assert e_uci.get('mwan3', 'ns_additional_r', 'src_port') == '1:1024'
assert e_uci.get('mwan3', 'ns_additional_r', 'dest_ip') == '10.0.0.2/12'
assert e_uci.get('mwan3', 'ns_additional_r', 'dest_port') == '22,443'


def test_unique_rule(e_uci):
def test_unique_rule(e_uci, mocker):
mocker.patch('subprocess.run')
mwan.store_policy(e_uci, 'default', [
{
'name': 'RED_1',
'metric': '20',
'weight': '100',
}
])
with pytest.raises(ValueError) as e:
with pytest.raises(ValidationError) as e:
mwan.store_rule(e_uci, 'additional rule', 'ns_default')
mwan.store_rule(e_uci, 'additional rule', 'ns_default')

assert e.value.args[0] == 'name'
assert e.value.args[1] == 'unique'


def test_delete_non_existent_policy(e_uci):
def test_missing_policy_rule(e_uci):
with pytest.raises(ValidationError) as e:
mwan.store_rule(e_uci, 'cool rule', 'ns_default')

assert e.value.args[0] == 'policy'
assert e.value.args[1] == 'invalid'
assert e.value.args[2] == 'ns_default'


def test_delete_non_existent_policy(e_uci, mocker):
mocker.patch('subprocess.run')
with pytest.raises(ValidationError) as e:
mwan.delete_policy(e_uci, 'ns_default')
assert e.value.args[0] == 'name'
assert e.value.args[1] == 'invalid'
assert e.value.args[2] == 'ns_default'


def test_delete_policy(e_uci):
def test_delete_policy(e_uci, mocker):
mocker.patch('subprocess.run')
mwan.store_policy(e_uci, 'default', [
{
'name': 'RED_1',
Expand All @@ -312,7 +335,8 @@ def test_delete_policy(e_uci):
assert e_uci.get('mwan3', 'ns_default', default=None) is None


def test_edit_policy(e_uci):
def test_edit_policy(e_uci, mocker):
mocker.patch('subprocess.run')
mwan.store_policy(e_uci, 'default', [
{
'name': 'RED_1',
Expand Down Expand Up @@ -342,15 +366,17 @@ def test_edit_policy(e_uci):
assert mwan.index_policies(e_uci)[0]['type'] == 'backup'


def test_missing_policy(e_uci):
def test_missing_policy(e_uci, mocker):
mocker.patch('subprocess.run')
with pytest.raises(ValidationError) as e:
mwan.edit_policy(e_uci, 'dummy', '', [])
assert e.value.args[0] == 'name'
assert e.value.args[1] == 'invalid'
assert e.value.args[2] == 'dummy'


def test_index_rules(e_uci):
def test_index_rules(e_uci, mocker):
mocker.patch('subprocess.run')
mwan.store_policy(e_uci, 'default', [
{
'name': 'RED_1',
Expand Down

0 comments on commit fbcaa08

Please sign in to comment.