diff --git a/cli/__init__.py b/cli/__init__.py index f370ca6..c44e66b 100644 --- a/cli/__init__.py +++ b/cli/__init__.py @@ -186,12 +186,29 @@ def deactivate_crc_users( user_ou, root_dn, ): - cli.ldap.user.deactivate_crc_users( + cli.ldap_cmds.user.deactivate_crc_users( user_ou, root_dn, ) +@click.command() +@click.option( + "-u", + "--user-ou", + help="OU to add users to, defaults to ou=Users", + default="ou=Users", +) +@click.option( + "-r", + "--root-dn", + help="Root DN to add users to, defaults to dc=moj,dc=com", + default="dc=moj,dc=com", +) +def user_expiry(user_ou, root_dn): + cli.ldap_cmds.user.user_expiry(user_ou=user_ou, root_dn=root_dn) + + # from cli.ldap import test main_group.add_command(add_roles_to_users) @@ -199,6 +216,7 @@ def deactivate_crc_users( main_group.add_command(update_user_home_areas) main_group.add_command(update_user_roles) main_group.add_command(deactivate_crc_users) +main_group.add_command(user_expiry) logger.configure_logging() diff --git a/cli/ldap_cmds/user.py b/cli/ldap_cmds/user.py index 410a801..18b65dd 100644 --- a/cli/ldap_cmds/user.py +++ b/cli/ldap_cmds/user.py @@ -14,6 +14,7 @@ ) from ldap3 import ( MODIFY_REPLACE, + MODIFY_DELETE, DEREF_NEVER, ) @@ -45,9 +46,7 @@ def change_home_areas( env.secrets.get("LDAP_BIND_PASSWORD"), ) - search_filter = ( - f"(&(objectclass={object_class})(userHomeArea={old_home_area})(!(cn={old_home_area}))(!(endDate=*)))" - ) + search_filter = f"(&(objectclass={object_class})(userHomeArea={old_home_area})(!(cn={old_home_area}))(!(endDate=*)))" ldap_connection.search( ",".join( [ @@ -79,7 +78,9 @@ def change_home_areas( if ldap_connection.result["result"] == 0: log.info(f"Successfully updated {attribute} for {dn}") else: - log.error(f"Failed to update {attribute} for {dn}: {ldap_connection.result}") + log.error( + f"Failed to update {attribute} for {dn}: {ldap_connection.result}" + ) ######################################### @@ -95,7 +96,10 @@ def parse_user_role_list( # and the roles are separated by a semi-colon: # username1,role1;role2;role3|username2,role1;role2 - return {user.split(",")[0]: user.split(",")[1].split(";") for user in user_role_list.split("|")} + return { + user.split(",")[0]: user.split(",")[1].split(";") + for user in user_role_list.split("|") + } def add_roles_to_user( @@ -205,7 +209,13 @@ def update_roles( log.exception("Failed to search for users") raise e - users_found = sorted([entry.cn.value for entry in ldap_connection_user_filter.entries if entry.cn.value]) + users_found = sorted( + [ + entry.cn.value + for entry in ldap_connection_user_filter.entries + if entry.cn.value + ] + ) log.debug("users found from user filter") log.debug(users_found) ldap_connection_user_filter.unbind() @@ -215,9 +225,7 @@ def update_roles( # create role filter if len(roles_filter_list) > 0: - full_role_filter = ( - f"(&(objectclass=NDRoleAssociation)(|{''.join(['(cn=' + role + ')' for role in roles_filter_list])}))" - ) + full_role_filter = f"(&(objectclass=NDRoleAssociation)(|{''.join(['(cn=' + role + ')' for role in roles_filter_list])}))" else: full_role_filter = "(&(objectclass=NDRoleAssociation)(cn=*))" @@ -250,7 +258,12 @@ def update_roles( raise e roles_found = sorted( - set({entry.entry_dn.split(",")[1].split("=")[1] for entry in ldap_connection_role_filter.entries}) + set( + { + entry.entry_dn.split(",")[1].split("=")[1] + for entry in ldap_connection_role_filter.entries + } + ) ) log.debug("users found from roles filter: ") log.debug(roles_found) @@ -309,7 +322,9 @@ def update_roles( log.e(f"Failed to add role '{item[1]}' to user '{item[0]}'") log.debug(ldap_connection_action.result) elif remove: - ldap_connection_action.delete(f"cn={item[1]},cn={item[0]},{user_ou},{root_dn}") + ldap_connection_action.delete( + f"cn={item[1]},cn={item[0]},{user_ou},{root_dn}" + ) if ldap_connection_action.result["result"] == 0: log.info(f"Successfully removed role '{item[1]}' from user '{item[0]}'") elif ldap_connection_action.result["result"] == 32: @@ -388,7 +403,9 @@ def deactivate_crc_users( env.secrets.get("LDAP_BIND_PASSWORD"), ) - user_filter = "(userSector=private)(!(userSector=public))(!(endDate=*))(objectclass=NDUser)" + user_filter = ( + "(userSector=private)(!(userSector=public))(!(endDate=*))(objectclass=NDUser)" + ) home_areas = [ [ @@ -463,9 +480,7 @@ def deactivate_crc_users( connection = cli.database.connection() for user_dn in all_users: try: - update_sql = ( - f"UPDATE USER_ SET END_DATE=TRUNC(CURRENT_DATE) WHERE UPPER(DISTINGUISHED_NAME)=UPPER(:user_dn)" - ) + update_sql = f"UPDATE USER_ SET END_DATE=TRUNC(CURRENT_DATE) WHERE UPPER(DISTINGUISHED_NAME)=UPPER(:user_dn)" update_cursor = connection.cursor() update_cursor.execute( update_sql, @@ -478,3 +493,88 @@ def deactivate_crc_users( except: log.exception(f"Failed to update END_DATE for user {user_dn}") connection.close() + + +def user_expiry( + user_ou, + root_dn, +): + date_str = f"{datetime.now().strftime('%Y%m%d')}000000Z" + log.info(f"Expiring users with end date {date_str}") + + ldap_connection_lock = ldap_connect( + env.vars.get("LDAP_HOST"), + env.vars.get("LDAP_USER"), + env.secrets.get("LDAP_BIND_PASSWORD"), + ) + try: + ldap_connection_lock.search( + ",".join( + [ + user_ou, + root_dn, + ] + ), + f"(&(!(pwdAccountLockedTime=*))(|(&(endDate=*)(!(endDate>={date_str})))(&(startDate=*)(!(startDate<={date_str})))))", + attributes=["cn"], + ) + except Exception as e: + log.exception(f"Failed to search for users \n Exception: {e}") + + found_users = [entry.entry_dn for entry in ldap_connection_lock.entries] + log.debug(found_users) + for user in found_users: + try: + ldap_connection_lock.modify( + user, + { + "pwdAccountLockedTime": [ + ( + MODIFY_REPLACE, + ["000001010000Z"], + ) + ] + }, + ) + log.info(f"Locked user {user}") + except Exception as e: + log.exception(f"Failed to unlock user {user} \n Exception: {e}") + + ldap_connection_unlock = ldap_connect( + env.vars.get("LDAP_HOST"), + env.vars.get("LDAP_USER"), + env.secrets.get("LDAP_BIND_PASSWORD"), + ) + + try: + ldap_connection_unlock.search( + ",".join( + [ + user_ou, + root_dn, + ] + ), + f"(&(pwdAccountLockedTime=000001010000Z)(|(!(endDate=*))(endDate>={date_str}))(|(!(startDate=*))(startDate<={date_str})))", + attributes=["cn"], + ) + except Exception as e: + log.exception(f"Failed to search for users \n Exception: {e}") + + found_users = [entry.entry_dn for entry in ldap_connection_unlock.entries] + log.debug(found_users) + for user in found_users: + try: + ldap_connection_unlock.modify( + user, + { + "pwdAccountLockedTime": [ + ( + MODIFY_DELETE, + ["000001010000Z"], + ) + ] + }, + ) + log.info(f"Unlocked user {user}") + except Exception as e: + log.exception(f"Failed to unlock user {user} \n Exception: {e}")