Skip to content

Commit

Permalink
T6013: Add support for AuthorizedPrincipalsFile to trusted_user_ca_key
Browse files Browse the repository at this point in the history
This commit introduces the bind-user subcommand to support
AuthorizedPrincipalsFile. With this addition, it is now possible to
associate SSH login users with their respective principals.
  • Loading branch information
takehaya committed Dec 28, 2024
1 parent 77d5b09 commit df6d110
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 19 deletions.
5 changes: 5 additions & 0 deletions data/templates/ssh/sshd_config.j2
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,9 @@ RekeyLimit {{ rekey.data }}M {{ rekey.time + 'M' if rekey.time is vyos_defined }

{% if trusted_user_ca_key is vyos_defined %}
TrustedUserCAKeys /etc/ssh/trusted_user_ca_key
{% if trusted_user_ca_key.bind-user is vyos_defined %}
AuthorizedPrincipalsFile /etc/ssh/authorized_principals/%u
{% else %}
AuthorizedPrincipalsFile none
{% endif %}
{% endif %}
18 changes: 18 additions & 0 deletions interface-definitions/service_ssh.xml.in
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,24 @@
</properties>
<children>
#include <include/pki/ca-certificate.xml.i>
<tagNode name="bind-user">
<properties>
<help>user-name</help>
<constraint>
#include <include/constraint/login-username.xml.i>
</constraint>
</properties>
<children>
<leafNode name="principal">
<properties>
<help>principal-name</help>
</properties>
<constraint>
#include <include/constraint/login-username.xml.i>
</constraint>
</leafNode>
</children>
</leafNode>
</children>
</node>
#include <include/vrf-multi.xml.i>
Expand Down
73 changes: 71 additions & 2 deletions smoketest/scripts/cli/test_service_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
key_dsa = '/etc/ssh/ssh_host_dsa_key'
key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'
trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key'

authorized_principals = '/etc/ssh/authorized_principals'

def get_config_value(key):
tmp = read_file(SSHD_CONF)
Expand Down Expand Up @@ -380,19 +380,88 @@ def test_ssh_trusted_user_ca_key(self):

trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config)
authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile')
self.assertIn("none", authorize_principals_file_config)

with open(trusted_user_ca_key, 'r') as file:
ca_key_contents = file.read()
self.assertIn(ca_root_cert_data, ca_key_contents)

self.cli_delete(base_path + ['trusted-user-ca-key'])
self.cli_delete(base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name])
self.cli_delete(['pki', 'ca', ca_cert_name])
self.cli_commit()

# Verify the CA key is removed
trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
self.assertNotIn(trusted_user_ca_key, trusted_user_ca_key_config)
authorize_principals_file_config = get_config_value('AuthorizedPrincipalsFile')
self.assertNotIn("none", authorize_principals_file_config)

def test_ssh_trusted_user_ca_key_and_bind_user_with_principal(self):
ca_cert_name = 'test_ca'
bind_user = 'test_user'
principals = ['test_principal_alice', 'test_principal_bob']
test_user = 'ssh_test'
test_pass = 'v2i57DZs8idUwMN3VC92'

# Create a test user
self.cli_set(
[
'system',
'login',
'user',
test_user,
'authentication',
'plaintext-password',
test_pass,
]
)

# set pki ca <ca_cert_name> certificate <ca_key_data>
# set service ssh trusted-user-ca-key ca-certificate <ca_cert_name>
# set service ssh trusted-user-ca-key bind-user <bind_user> principal <principals>
self.cli_set(
pki_path
+ [
'ca',
ca_cert_name,
'certificate',
ca_root_cert_data.replace('\n', ''),
]
)
self.cli_set(
base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name]
)
for principal in principals:
self.cli_set(
base_path
+ ['trusted-user-ca-key', 'bind-user', bind_user, 'principal', principal]
)
self.cli_commit()

trusted_user_ca_key_config = get_config_value('TrustedUserCAKeys')
self.assertIn(trusted_user_ca_key, trusted_user_ca_key_config)
authorized_principals_file = f'{authorized_principals}/{bind_user}'
self.assertTrue(os.path.exists(authorized_principals_file))

with open(authorized_principals_file, 'r') as file:
authorized_principals = file.read()
for principal in principals:
self.assertIn(principal, authorized_principals)

for principal in principals:
self.cli_delete(
base_path
+ ['trusted-user-ca-key', 'bind-user', bind_user, 'principal', principal]
)

self.cli_delete(base_path + ['trusted-user-ca-key', 'ca-certificate', ca_cert_name])
self.cli_delete(['pki', 'ca', ca_cert_name])
self.cli_delete(['system', 'login', 'user', test_user])
self.cli_commit()

# Verify the authorized principals file is removed
self.assertFalse(os.path.exists(authorized_principals_file))

if __name__ == '__main__':
unittest.main(verbosity=2)
98 changes: 81 additions & 17 deletions src/conf_mode/service_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,76 @@
key_ed25519 = '/etc/ssh/ssh_host_ed25519_key'

trusted_user_ca_key = '/etc/ssh/trusted_user_ca_key'
authorized_principal = '/etc/ssh/authorized_principal'


def cleanup_authorized_principal_dir(valid_users: list[str]):
if not os.path.isdir(authorized_principal):
return

# Check the files (user name) under the directory and delete unnecessary ones.
for filename in os.listdir(authorized_principal):
file_path = os.path.join(authorized_principal, filename)
if os.path.isfile(file_path) and filename not in valid_users:
os.remove(file_path)

# If the directory is empty, delete it too
if not os.listdir(authorized_principal):
os.rmdir(authorized_principal)

def handle_trusted_user_ca_key(ssh: dict):
if 'trusted_user_ca_key' not in ssh:
if os.path.exists(trusted_user_ca_key):
os.unlink(trusted_user_ca_key)

# remove authorized_principal directory if it exists
cleanup_authorized_principal_dir([])
return

# trusted_user_ca_key is present
ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']
pki_ca_cert = ssh['pki']['ca'][ca_key_name]

loaded_ca_cert = load_certificate(pki_ca_cert['certificate'])
loaded_ca_certs = {
load_certificate(c['certificate'])
for c in ssh['pki']['ca'].values()
if 'certificate' in c
}

ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs)
write_file(
trusted_user_ca_key, '\n'.join(encode_certificate(c) for c in ca_full_chain)
)

if 'bind-user' not in ssh['trusted_user_ca_key']:
# remove authorized_principal directory if it exists
cleanup_authorized_principal_dir([])
return

# bind-user is present
configured_users = []
for bind_user, bind_user_config in ssh['trusted_user_ca_key']['bind-user'].items():
if bind_user not in ssh['login_users']:
raise ConfigError(f"User '{bind_user}' not found in system login users")

if 'principal' not in bind_user_config:
raise ConfigError(f"Principal not found for user '{bind_user}'")

principals = bind_user_config['principal']
if isinstance(principals, str):
principals = [principals]

if not os.path.isdir(authorized_principal):
os.makedirs(authorized_principal, exist_ok=True)

principal_file = os.path.join(authorized_principal, bind_user)
contents = '\n'.join(principals) + '\n'
write_file(principal_file, contents)
configured_users.append(bind_user)

# remove unnecessary files under authorized_principal directory
cleanup_authorized_principal_dir(configured_users)


def get_config(config=None):
Expand All @@ -59,7 +129,14 @@ def get_config(config=None):
ssh = conf.get_config_dict(
base, key_mangling=('-', '_'), get_first_key=True, with_pki=True
)
login_users_base = ['system', 'login', 'user']
login_users = conf.get_config_dict(
login_users_base, key_mangling=('-', '_'),
no_tag_node_value_mangle=True,
get_first_key=True,
)

# create a list of all users, cli and users
tmp = is_node_changed(conf, base + ['vrf'])
if tmp:
ssh.update({'restart_required': {}})
Expand All @@ -71,6 +148,9 @@ def get_config(config=None):
# pass config file path - used in override template
ssh['config_file'] = config_file

# use for trusted ca
ssh['login_users'] = login_users

# Ignore default XML values if config doesn't exists
# Delete key from dict
if not conf.exists(base + ['dynamic-protection']):
Expand Down Expand Up @@ -119,23 +199,7 @@ def generate(ssh):
syslog(LOG_INFO, 'SSH ed25519 host key not found, generating new key!')
call(f'ssh-keygen -q -N "" -t ed25519 -f {key_ed25519}')

if 'trusted_user_ca_key' in ssh:
ca_key_name = ssh['trusted_user_ca_key']['ca_certificate']
pki_ca_cert = ssh['pki']['ca'][ca_key_name]

loaded_ca_cert = load_certificate(pki_ca_cert['certificate'])
loaded_ca_certs = {
load_certificate(c['certificate'])
for c in ssh['pki']['ca'].values()
if 'certificate' in c
}

ca_full_chain = find_chain(loaded_ca_cert, loaded_ca_certs)
write_file(
trusted_user_ca_key, '\n'.join(encode_certificate(c) for c in ca_full_chain)
)
elif os.path.exists(trusted_user_ca_key):
os.unlink(trusted_user_ca_key)
handle_trusted_user_ca_key(ssh)

render(config_file, 'ssh/sshd_config.j2', ssh)

Expand Down

0 comments on commit df6d110

Please sign in to comment.