From c97ad0c18fe8da2b27ea9240485cdce6d2bfcf7e Mon Sep 17 00:00:00 2001 From: Andrew Walker Date: Mon, 20 Jan 2025 13:20:06 -0600 Subject: [PATCH] Add API to migrate from root user This commit adds an API endpoint to migrate root user to one with the specified username and password combination. If password is omitted then the one currently used for root is preserved for the new admin account. Various root account parameters are migrated to the new admin account such as: * ssh keys * password enabled status * two factor authentication configuration and secret * email address * shell * home directory --- .../bin/truenas-set-authentication-method.py | 4 +- .../middlewared/api/v25_04_0/user.py | 17 +- .../middlewared/plugins/account.py | 196 ++++++++++++------ 3 files changed, 153 insertions(+), 64 deletions(-) diff --git a/src/freenas/usr/local/bin/truenas-set-authentication-method.py b/src/freenas/usr/local/bin/truenas-set-authentication-method.py index c30d3a7579fe7..7a0ffdea007f2 100755 --- a/src/freenas/usr/local/bin/truenas-set-authentication-method.py +++ b/src/freenas/usr/local/bin/truenas-set-authentication-method.py @@ -4,13 +4,13 @@ import sqlite3 -from middlewared.plugins.account import ADMIN_UID, ADMIN_GID, crypted_password +from middlewared.plugins.account import ADMIN_UID, ADMIN_GID from middlewared.utils.db import FREENAS_DATABASE if __name__ == "__main__": authentication_method = json.loads(sys.stdin.read()) username = authentication_method["username"] - password = crypted_password(authentication_method["password"]) + password = authentication_method["password"] conn = sqlite3.connect(FREENAS_DATABASE) conn.row_factory = sqlite3.Row diff --git a/src/middlewared/middlewared/api/v25_04_0/user.py b/src/middlewared/middlewared/api/v25_04_0/user.py index 7446bf7a1846d..5b3859f0c9bc5 100644 --- a/src/middlewared/middlewared/api/v25_04_0/user.py +++ b/src/middlewared/middlewared/api/v25_04_0/user.py @@ -31,7 +31,8 @@ "UserTwofactorConfigArgs", "UserTwofactorConfigResult", "UserVerifyTwofactorTokenArgs", "UserVerifyTwofactorTokenResult", "UserUnset2faSecretArgs", "UserUnset2faSecretResult", - "UserRenew2faSecretArgs", "UserRenew2faSecretResult"] + "UserRenew2faSecretArgs", "UserRenew2faSecretResult", + "UserMigrateRootArgs", "UserMigrateRootResult",] DEFAULT_HOME_PATH = "/var/empty" @@ -281,3 +282,17 @@ class UserRenew2faSecretArgs(BaseModel): UserRenew2faSecretResult = single_argument_result(UserEntry, "UserRenew2faSecretResult") + + +@single_argument_args("migrate_root") +class UserMigrateRootArgs(BaseModel): + username: LocalUsername + """Username of new local user account to which to migration the root account. + NOTE: user account nust not exist.""" + password: Secret[str | None] = None + """Password to set for new account. If password is unspecified then the password + for the root account will be used.""" + + +class UserMigrateRootResult(BaseModel): + result: None diff --git a/src/middlewared/middlewared/plugins/account.py b/src/middlewared/middlewared/plugins/account.py index 985a1c0979da7..73833a6dab5d0 100644 --- a/src/middlewared/middlewared/plugins/account.py +++ b/src/middlewared/middlewared/plugins/account.py @@ -6,6 +6,7 @@ import shlex import shutil import stat +import subprocess import wbclient from pathlib import Path from collections import defaultdict @@ -46,6 +47,8 @@ UserShellChoicesResult, UserUpdateArgs, UserUpdateResult, + UserMigrateRootArgs, + UserMigrateRootResult ) from middlewared.service import CallError, CRUDService, ValidationErrors, pass_app, private, job from middlewared.service_exception import MatchNotFound @@ -1149,6 +1152,132 @@ async def has_local_administrator_set_up(self): """ return len(await self.middleware.call('privilege.local_administrators')) > 0 + @api_method( + UserMigrateRootArgs, UserMigrateRootResult, + roles=['ACCOUNT_WRITE'], audit='Migrate root account' + ) + @job(lock='migrate_root') + def migrate_root(self, job, data): + """ + Migrate from root user account to new one with UID 950 and the specified + `username`. If this account already exists then we consider migration to + have already happened and will fail with CallError and errno set to EEXIST. + """ + username = data['username'] + verrors = ValidationErrors() + pw_checkname(verrors, 'account_migrate_root.username', username) + verrors.check() + + root_user = self.middleware.call_sync('user.query', [['uid', '=', 0]], {'get': True}) + homedir = f'/home/{username}' + + if data['password'] is not None: + password_hash = crypted_password(data['password']) + else: + password_hash = root_user['unixhash'] + + try: + pwd_obj = self.middleware.call_sync('user.get_user_obj', {'uid': ADMIN_UID}) + raise CallError( + f'A {pwd_obj["source"].lower()} user with uid={ADMIN_UID} already exists, ' + 'setting up local administrator is not possible', + errno.EEXIST, + ) + except KeyError: + pass + + try: + pwd_obj = self.middleware.call_sync('user.get_user_obj', {'username': username}) + raise CallError(f'{username!r} {pwd_obj["source"].lower()} user already exists, ' + 'setting up local administrator is not possible', + errno.EEXIST) + except KeyError: + pass + + try: + grp_obj = self.middleware.call_sync('group.get_group_obj', {'gid': ADMIN_GID}) + raise CallError( + f'A {grp_obj["source"].lower()} group with gid={ADMIN_GID} already exists, ' + 'setting up local administrator is not possible', + errno.EEXIST, + ) + except KeyError: + pass + + try: + grp_obj = self.middleware.call_sync('group.get_group_obj', {'groupname': username}) + raise CallError(f'{username!r} {grp_obj["source"].lower()} group already exists, ' + 'setting up local administrator is not possible', + errno.EEXIST) + except KeyError: + pass + + # double-check our database in case we have for some reason failed to write to passwd + local_users = self.middleware.call_sync('user.query', [['local', '=', True]]) + local_groups = self.middleware.call_sync('group.query', [['local', '=', True]]) + + if filter_list(local_users, [['uid', '=', ADMIN_UID]]): + raise CallError( + f'A user with uid={ADMIN_UID} already exists, setting up local administrator is not possible', + errno.EEXIST, + ) + + if filter_list(local_users, [['username', '=', username]]): + raise CallError(f'{username!r} user already exists, setting up local administrator is not possible', + errno.EEXIST) + + if filter_list(local_groups, [['gid', '=', ADMIN_GID]]): + raise CallError( + f'A group with gid={ADMIN_GID} already exists, setting up local administrator is not possible', + errno.EEXIST, + ) + + if filter_list(local_groups, [['group', '=', username]]): + raise CallError(f'{username!r} group already exists, setting up local administrator is not possible', + errno.EEXIST) + + subprocess.run( + ['truenas-set-authentication-method.py'], + check=True, encoding='utf-8', errors='ignore', + input=json.dumps({'username': username, 'password': password_hash}) + ) + new_user = self.middleware.call_sync('user.query', [['uid', '=', ADMIN_UID]], {'get': True}) + + self.middleware.call_sync('failover.datastore.force_send') + self.middleware.call_sync('etc.generate', 'user') + + # Set up homedir for new admin user + try: + os.mkdir(homedir, 0o700) + except FileExistsError: + pass + + os.chown(homedir, ADMIN_UID, ADMIN_GID) + os.chmod(homedir, 0o700) + home_copy_job = self.middleware.call_sync('user.do_home_copy', '/root', homedir, '700', ADMIN_UID) + home_copy_job.wait_sync() + + # Update new user account with settings from root + self.middleware.call_sync('user.update', new_user['id'], { + 'ssh_password_enabled': root_user['ssh_password_enabled'], + 'sshpubkey': root_user['sshpubkey'], + 'email': root_user['email'], + 'shell': root_user['shell'], + }) + + # Preserve root twofactor settings + if root_user['twofactor_auth_configured']: + # get twofactor config for UID 0 and copy it over to 950 + twofactor_data = self.middleware.call_sync('datastore.query', 'account.twofactor_user_auth') + root_twofactor = filter_list(twofactor_data, [['user.bsdusr_uid', '=', 0]], {'get': True}) + target = filter_list(twofactor_data, [['user.bsdusr_uid', '=', ADMIN_UID]], {'get': True})['id'] + + self.middleware.call_sync('datastore.update', 'account.twofactor_user_auth', target, { + 'secret': root_twofactor['secret'], + 'otp_digits': root_twofactor['otp_digits'], + 'interval': root_twofactor['interval'], + }) + @api_method( UserSetupLocalAdministratorArgs, UserSetupLocalAdministratorResult, audit='Set up local administrator', @@ -1164,69 +1293,14 @@ async def setup_local_administrator(self, app, username, password, options): raise CallError('Local administrator is already set up', errno.EEXIST) if username == 'truenas_admin': - # first check based on NSS to catch collisions with AD / LDAP users - try: - pwd_obj = await self.middleware.call('user.get_user_obj', {'uid': ADMIN_UID}) - raise CallError( - f'A {pwd_obj["source"].lower()} user with uid={ADMIN_UID} already exists, ' - 'setting up local administrator is not possible', - errno.EEXIST, - ) - except KeyError: - pass - - try: - pwd_obj = await self.middleware.call('user.get_user_obj', {'username': username}) - raise CallError(f'{username!r} {pwd_obj["source"].lower()} user already exists, ' - 'setting up local administrator is not possible', - errno.EEXIST) - except KeyError: - pass - - try: - grp_obj = await self.middleware.call('group.get_group_obj', {'gid': ADMIN_GID}) - raise CallError( - f'A {grp_obj["source"].lower()} group with gid={ADMIN_GID} already exists, ' - 'setting up local administrator is not possible', - errno.EEXIST, - ) - except KeyError: - pass - - try: - grp_obj = await self.middleware.call('group.get_group_obj', {'groupname': username}) - raise CallError(f'{username!r} {grp_obj["source"].lower()} group already exists, ' - 'setting up local administrator is not possible', - errno.EEXIST) - except KeyError: - pass - - # double-check our database in case we have for some reason failed to write to passwd - local_users = await self.middleware.call('user.query', [['local', '=', True]]) - local_groups = await self.middleware.call('group.query', [['local', '=', True]]) - - if filter_list(local_users, [['uid', '=', ADMIN_UID]]): - raise CallError( - f'A user with uid={ADMIN_UID} already exists, setting up local administrator is not possible', - errno.EEXIST, - ) - - if filter_list(local_users, [['username', '=', username]]): - raise CallError(f'{username!r} user already exists, setting up local administrator is not possible', - errno.EEXIST) - - if filter_list(local_groups, [['gid', '=', ADMIN_GID]]): - raise CallError( - f'A group with gid={ADMIN_GID} already exists, setting up local administrator is not possible', - errno.EEXIST, - ) - - if filter_list(local_groups, [['group', '=', username]]): - raise CallError(f'{username!r} group already exists, setting up local administrator is not possible', - errno.EEXIST) + # This should be relatively invexpensive even though it's a job since we + # don't expect /root to have much in the way of contents. + migrate_job = await self.middleware.call('user.migrate_root', {'username': username, 'password': password}) + await migrate_job.wait(raise_error=True) + return await run('truenas-set-authentication-method.py', check=True, encoding='utf-8', errors='ignore', - input=json.dumps({'username': username, 'password': password})) + input=json.dumps({'username': username, 'password': crypted_password(password)})) await self.middleware.call('failover.datastore.force_send') await self.middleware.call('etc.generate', 'user')