Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

T6013: Add support for AuthorizedPrincipalsFile to trusted_user_ca_key #4266

Draft
wants to merge 6 commits into
base: current
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions data/templates/ssh/sshd_config.j2
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,9 @@ RekeyLimit {{ rekey.data }}M {{ rekey.time + 'M' if rekey.time is vyos_defined }
{% if trusted_user_ca_key is vyos_defined %}
TrustedUserCAKeys /etc/ssh/trusted_user_ca_key
{% endif %}

{% if trusted_user_ca_key is vyos_defined and trusted_user_ca_key.bind_user is vyos_defined %}
AuthorizedPrincipalsFile /etc/ssh/authorized_principals/%u
{% elif trusted_user_ca_key is vyos_defined %}
AuthorizedPrincipalsFile none
{% endif %}
19 changes: 19 additions & 0 deletions interface-definitions/service_ssh.xml.in
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,25 @@
</properties>
<children>
#include <include/pki/ca-certificate.xml.i>
<tagNode name="bind-user">
<properties>
<help>user-name</help>
<constraint>
#include <include/constraint/login-username.xml.i>
</constraint>
</properties>
<children>
<leafNode name="principal">
<properties>
<help>principal-name</help>
<constraint>
#include <include/constraint/login-username.xml.i>
</constraint>
<multi/>
</properties>
</leafNode>
</children>
</tagNode>
</children>
</node>
#include <include/vrf-multi.xml.i>
Expand Down
89 changes: 88 additions & 1 deletion smoketest/scripts/cli/test_service_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
key_dsa = '/etc/ssh/ssh_host_dsa_key'
key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'
trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key'
authorized_principals_dir = '/etc/ssh/authorized_principals'


def get_config_value(key):
Expand Down Expand Up @@ -380,18 +381,104 @@ def test_ssh_trusted_user_ca_key(self):

trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config)
authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile')
self.assertIn('none', authorize_principals_file_config)

with open(trusted_user_ca_key, 'r') as file:
ca_key_contents = file.read()
self.assertIn(ca_root_cert_data, ca_key_contents)

self.cli_delete(base_path + ['trusted-user-ca-key'])
self.cli_delete(
base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name]
)
self.cli_delete(['pki', 'ca', ca_cert_name])
self.cli_commit()

# Verify the CA key is removed
trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
self.assertNotIn(trusted_user_ca_key, trusted_user_ca_key_config)
authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile')
self.assertNotIn('none', authorize_principals_file_config)

def test_ssh_trusted_user_ca_key_and_bind_user_with_principal(self):
ca_cert_name = 'test_ca'
bind_user = 'test_user'
principals = ['test_principal_alice', 'test_principal_bob']
test_user = 'ssh_test'
test_pass = 'v2i57DZs8idUwMN3VC92'

# Create a test user
self.cli_set(
[
'system',
'login',
'user',
test_user,
'authentication',
'plaintext-password',
test_pass,
]
)

# set pki ca <ca_cert_name> certificate <ca_key_data>
# set service ssh trusted-user-ca-key ca-certificate <ca_cert_name>
# set service ssh trusted-user-ca-key bind-user <bind_user> principal <principals>
self.cli_set(
pki_path
+ [
'ca',
ca_cert_name,
'certificate',
ca_root_cert_data.replace('\n', ''),
]
)
self.cli_set(
base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name]
)
for principal in principals:
self.cli_set(
base_path
+ [
'trusted-user-ca-key',
'bind-user',
bind_user,
'principal',
principal,
]
)
self.cli_commit()

trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config)
authorized_principals_file = f'{authorized_principals_dir}/{bind_user}'
self.assertTrue(os.path.exists(authorized_principals_file))

with open(authorized_principals_file, 'r') as file:
authorized_principals = file.read()
for principal in principals:
self.assertIn(principal, authorized_principals)

for principal in principals:
self.cli_delete(
base_path
+ [
'trusted-user-ca-key',
'bind-user',
bind_user,
'principal',
principal,
]
)

self.cli_delete(
base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name]
)
self.cli_delete(['pki', 'ca', ca_cert_name])
self.cli_delete(['system', 'login', 'user', test_user])
self.cli_commit()

# Verify the authorized principals file is removed
self.assertFalse(os.path.exists(authorized_principals_file))


if __name__ == '__main__':
Expand Down
100 changes: 83 additions & 17 deletions src/conf_mode/service_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,77 @@
key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'

trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key'
authorized_principals = '/etc/ssh/authorized_principals'


def cleanup_authorized_principals_dir(valid_users: list[str]):
if not os.path.isdir(authorized_principals):
return

# Check the files (user name) under the directory and delete unnecessary ones.
for filename in os.listdir(authorized_principals):
file_path = os.path.join(authorized_principals, filename)
if os.path.isfile(file_path) and filename not in valid_users:
os.remove(file_path)

# If the directory is empty, delete it too
if not os.listdir(authorized_principals):
os.rmdir(authorized_principals)


def handle_trusted_user_ca_key(ssh: dict):
if 'trusted_user_ca_key' not in ssh:
if os.path.exists(trusted_user_ca_key):
os.unlink(trusted_user_ca_key)

# remove authorized_principals directory if it exists
cleanup_authorized_principals_dir([])
return

# trusted_user_ca_key is present
ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']
pki_ca_cert = ssh['pki']['ca'][ca_key_name]

loaded_ca_cert = load_certificate(pki_ca_cert['certificate'])
loaded_ca_certs = {
load_certificate(c['certificate'])
for c in ssh['pki']['ca'].values()
if 'certificate' in c
}

ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs)
write_file(
trusted_user_ca_key, '\n'.join(encode_certificate(c) for c in ca_full_chain)
)

if 'bind-user' not in ssh['trusted_user_ca_key']:
# remove authorized_principals directory if it exists
cleanup_authorized_principals_dir([])
return

# bind-user is present
configured_users = []
for bind_user, bind_user_config in ssh['trusted_user_ca_key']['bind-user'].items():
if bind_user not in ssh['login_users']:
raise ConfigError(f"User '{bind_user}' not found in system login users")

if 'principal' not in bind_user_config:
raise ConfigError(f"Principal not found for user '{bind_user}'")

principals = bind_user_config['principal']
if isinstance(principals, str):
principals = [principals]

if not os.path.isdir(authorized_principals):
os.makedirs(authorized_principals, exist_ok=True)

principal_file = os.path.join(authorized_principals, bind_user)
contents = '\n'.join(principals) + '\n'
write_file(principal_file, contents)
configured_users.append(bind_user)

# remove unnecessary files under authorized_principals directory
cleanup_authorized_principals_dir(configured_users)


def get_config(config=None):
Expand All @@ -59,7 +130,15 @@ def get_config(config=None):
ssh = conf.get_config_dict(
base, key_mangling=('-', '_'), get_first_key=True, with_pki=True
)
login_users_base = ['system', 'login', 'user']
login_users = conf.get_config_dict(
login_users_base,
key_mangling=('-', '_'),
no_tag_node_value_mangle=True,
get_first_key=True,
)

# create a list of all users, cli and users
tmp = is_node_changed(conf, base + ['vrf'])
if tmp:
ssh.update({'restart_required': {}})
Expand All @@ -71,6 +150,9 @@ def get_config(config=None):
# pass config file path - used in override template
ssh['config_file'] = config_file

# use for trusted ca
ssh['login_users'] = login_users

# Ignore default XML values if config doesn't exists
# Delete key from dict
if not conf.exists(base + ['dynamic-protection']):
Expand Down Expand Up @@ -119,23 +201,7 @@ def generate(ssh):
syslog(LOG_INFO, 'SSH ed25519 host key not found, generating new key!')
call(f'ssh-keygen -q -N "" -t ed25519 -f {key_ed25519}')

if 'trusted_user_ca_key' in ssh:
ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']
pki_ca_cert = ssh['pki']['ca'][ca_key_name]

loaded_ca_cert = load_certificate(pki_ca_cert['certificate'])
loaded_ca_certs = {
load_certificate(c['certificate'])
for c in ssh['pki']['ca'].values()
if 'certificate' in c
}

ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs)
write_file(
trusted_user_ca_key, '\n'.join(encode_certificate(c) for c in ca_full_chain)
)
elif os.path.exists(trusted_user_ca_key):
os.unlink(trusted_user_ca_key)
handle_trusted_user_ca_key(ssh)

render(config_file, 'ssh/sshd_config.j2', ssh)

Expand Down
Loading