From 463f0c5e6d3ae8b8da6bbb82b0f3967af4500650 Mon Sep 17 00:00:00 2001 From: TrustyJAID Date: Sun, 31 Mar 2024 18:05:37 -0600 Subject: [PATCH] Add support for timeouts to Mutes cog (#5604) Co-authored-by: Michael Oliveira <34169552+Flame442@users.noreply.github.com> --- redbot/cogs/mutes/models.py | 19 + redbot/cogs/mutes/mutes.py | 740 ++++++++++++++++++-------------- redbot/cogs/mutes/voicemutes.py | 22 +- 3 files changed, 436 insertions(+), 345 deletions(-) create mode 100644 redbot/cogs/mutes/models.py diff --git a/redbot/cogs/mutes/models.py b/redbot/cogs/mutes/models.py new file mode 100644 index 00000000000..fb3f1e8e9b7 --- /dev/null +++ b/redbot/cogs/mutes/models.py @@ -0,0 +1,19 @@ +from typing import Optional, Dict + +import discord + +from dataclasses import dataclass + + +@dataclass +class MuteResponse: + success: bool + reason: Optional[str] + user: discord.Member + + +@dataclass +class ChannelMuteResponse(MuteResponse): + channel: discord.abc.GuildChannel + old_overs: Optional[Dict[str, bool]] + voice_mute: bool diff --git a/redbot/cogs/mutes/mutes.py b/redbot/cogs/mutes/mutes.py index ce3b3731ce4..f103fc94d56 100644 --- a/redbot/cogs/mutes/mutes.py +++ b/redbot/cogs/mutes/mutes.py @@ -1,14 +1,11 @@ import asyncio import contextlib -import discord import logging - from abc import ABC -from typing import cast, Optional, Dict, List, Tuple, Literal, Union from datetime import datetime, timedelta, timezone +from typing import Dict, List, Literal, Optional, Tuple, Union, cast -from .converters import MuteTime -from .voicemutes import VoiceMutes +import discord from redbot.core.bot import Red from redbot.core import commands, i18n, modlog, Config @@ -24,13 +21,17 @@ from redbot.core.utils.menus import start_adding_reactions from redbot.core.utils.predicates import MessagePredicate, ReactionPredicate +from .converters import MuteTime +from .models import ChannelMuteResponse, MuteResponse +from .voicemutes import VoiceMutes + T_ = i18n.Translator("Mutes", __file__) _ = lambda s: s MUTE_UNMUTE_ISSUES = { - "already_muted": _("That user is already muted in this channel."), - "already_unmuted": _("That user is not muted in this channel."), + "already_muted": _("That user is already muted in {location}."), + "already_unmuted": _("That user is not muted in {location}."), "hierarchy_problem": _( "I cannot let you do that. You are not higher than the user in the role hierarchy." ), @@ -43,8 +44,13 @@ "permission and the user I'm muting must be " "lower than myself in the role hierarchy." ), + "permissions_issue_guild": _( + "Failed to mute or unmute user. I need the Timeout Members " + "permission and the user I'm muting must be " + "lower than myself in the role hierarchy." + ), "permissions_issue_channel": _( - "Failed to mute or unmute user. I need the Manage Permissions permission." + "Failed to mute or unmute user. I need the Manage Permissions permission in {location}." ), "left_guild": _("The user has left the server while applying an overwrite."), "unknown_channel": _("The channel I tried to mute or unmute the user in isn't found."), @@ -52,6 +58,8 @@ "voice_mute_permission": _( "Because I don't have the Move Members permission, this will take into effect when the user rejoins." ), + "mute_is_too_long": _("Timeouts cannot be longer than 28 days."), + "timeouts_require_time": _("You must provide a time for the timeout to end."), "is_not_voice_mute": _( "That user is channel muted in their current voice channel, not just voice muted." " If you want to fully unmute this user in the channel," @@ -84,7 +92,6 @@ def __init__(self, bot: Red): self.bot = bot self.config = Config.get_conf(self, 49615220001, force_registration=True) default_guild = { - "sent_instructions": False, "mute_role": None, "notification_channel": None, "muted_users": {}, @@ -92,13 +99,7 @@ def __init__(self, bot: Red): "dm": False, "show_mod": False, } - # Tbh I would rather force everyone to use role mutes. - # I also honestly think everyone would agree they're the - # way to go. If for whatever reason someone wants to - # enable channel overwrite mutes for their bot they can. - # Channel overwrite logic still needs to be in place - # for channel mutes methods. - self.config.register_global(force_role_mutes=True, schema_version=0) + self.config.register_global(schema_version=0) self.config.register_guild(**default_guild) self.config.register_member(perms_cache={}) self.config.register_channel(muted_users={}) @@ -327,11 +328,11 @@ async def _auto_unmute_user(self, guild: discord.Guild, data: dict): del muted_users[str(data["member"])] del self._server_mutes[guild.id][data["member"]] return - success = await self.unmute_user(guild, author, member, _("Automatic unmute")) + result = await self.unmute_user(guild, author, member, _("Automatic unmute")) async with self.config.guild(guild).muted_users() as muted_users: if str(member.id) in muted_users: del muted_users[str(member.id)] - if success["success"]: + if result.success: await modlog.create_case( self.bot, guild, @@ -354,7 +355,7 @@ async def _auto_unmute_user(self, guild: discord.Guild, data: dict): return error_msg = _( "I am unable to unmute {user} for the following reason:\n{reason}" - ).format(user=member, reason=success["reason"]) + ).format(user=member, reason=result.reason) try: await notification_channel.send(error_msg) except discord.errors.Forbidden: @@ -504,13 +505,13 @@ async def _auto_channel_unmute_user( ): del self._channel_mutes[channel.id][data["member"]] return None - success = await self.channel_unmute_user( + result = await self.channel_unmute_user( channel.guild, channel, author, member, _("Automatic unmute") ) async with self.config.channel(channel).muted_users() as muted_users: if str(member.id) in muted_users: del muted_users[str(member.id)] - if success["success"]: + if result.success: if create_case: if data.get("voice_mute", False): unmute_type = "vunmute" @@ -536,7 +537,7 @@ async def _auto_channel_unmute_user( else: error_msg = _( "I am unable to unmute {user} in {channel} for the following reason:\n{reason}" - ).format(user=member, channel=channel.mention, reason=success["reason"]) + ).format(user=member, channel=channel.mention, reason=result.reason) if create_case: chan_id = await self.config.guild(channel.guild).notification_channel() notification_channel = channel.guild.get_channel(chan_id) @@ -550,7 +551,7 @@ async def _auto_channel_unmute_user( log.info(error_msg) return None else: - return (member, channel, success["reason"]) + return (member, channel, result.reason) async def _send_dm_notification( self, @@ -765,9 +766,7 @@ async def on_member_join(self, member: discord.Member): return mute_role = await self.config.guild(guild).mute_role() if not mute_role: - # channel overwrite mutes would quickly allow a malicious - # user to globally rate limit the bot therefore we are not - # going to support re-muting users via channel overwrites + # timeouts already restore on rejoin return await i18n.set_contextual_locales_from_guild(self.bot, guild) if guild.id in self._server_mutes: @@ -821,18 +820,6 @@ async def showmoderator(self, ctx, true_or_false: bool): ) ) - @muteset.command(name="forcerole") - @commands.is_owner() - async def force_role_mutes(self, ctx: commands.Context, true_or_false: bool): - """ - Whether or not to force role only mutes on the bot - """ - await self.config.force_role_mutes.set(true_or_false) - if true_or_false: - await ctx.send(_("Okay I will enforce role mutes before muting users.")) - else: - await ctx.send(_("Okay I will allow channel overwrites for muting users.")) - @muteset.command(name="settings", aliases=["showsettings"]) @commands.mod_or_permissions(manage_channels=True) async def show_mutes_settings(self, ctx: commands.Context): @@ -889,8 +876,8 @@ async def notification_channel_set( async def mute_role(self, ctx: commands.Context, *, role: discord.Role = None): """Sets the role to be applied when muting a user. - If no role is setup the bot will attempt to mute a user by setting - channel overwrites in all channels to prevent the user from sending messages. + If no role is setup the bot will attempt to mute a user + by utilizing server timeouts. Note: If no role is setup a user may be able to leave the server and rejoin no longer being muted. @@ -899,10 +886,7 @@ async def mute_role(self, ctx: commands.Context, *, role: discord.Role = None): await self.config.guild(ctx.guild).mute_role.set(None) if ctx.guild.id in self.mute_role_cache: del self.mute_role_cache[ctx.guild.id] - await self.config.guild(ctx.guild).sent_instructions.set(False) - # reset this to warn users next time they may have accidentally - # removed the mute role - await ctx.send(_("Channel overwrites will be used for mutes instead.")) + await ctx.send(_("Discord Timeouts will be used for mutes instead.")) else: if role >= ctx.author.top_role: await ctx.send( @@ -1038,7 +1022,7 @@ async def _check_for_mute_role(self, ctx: commands.Context) -> bool: command_1 = f"{ctx.clean_prefix}muteset role" command_2 = f"{ctx.clean_prefix}muteset makerole" msg = _( - "This server does not have a mute role setup. " + "This server does not have a mute role setup and I do not have permission to timeout users. " " You can setup a mute role with {command_1} or" " {command_2} if you just want a basic role created setup.\n\n" ).format( @@ -1047,69 +1031,12 @@ async def _check_for_mute_role(self, ctx: commands.Context) -> bool: ) mute_role_id = await self.config.guild(ctx.guild).mute_role() mute_role = ctx.guild.get_role(mute_role_id) - sent_instructions = await self.config.guild(ctx.guild).sent_instructions() - force_role_mutes = await self.config.force_role_mutes() - if force_role_mutes and not mute_role: + timeout_perms = ctx.channel.permissions_for(ctx.me).moderate_members + if not timeout_perms and not mute_role: await ctx.send(msg) return False - if mute_role or sent_instructions: - return True - else: - msg += _( - "Channel overwrites for muting users can get expensive on Discord's API " - "as such we recommend that you have an admin setup a mute role instead. " - "Channel overwrites will also not re-apply on guild join, so a user " - "who has been muted may leave and re-join and no longer be muted. " - "Role mutes do not have this issue.\n\n" - "Are you sure you want to continue with channel overwrites? " - ) - can_react = can_user_react_in(ctx.me, ctx.channel) - if can_react: - msg += _( - "Reacting with \N{WHITE HEAVY CHECK MARK} will continue " - "the mute with overwrites and stop this message from appearing again, " - "Reacting with \N{NEGATIVE SQUARED CROSS MARK} will end the mute attempt." - ) - else: - msg += _( - "Saying {response_1} will continue " - "the mute with overwrites and stop this message from appearing again, " - "saying {response_2} will end the mute attempt." - ).format( - response_1=inline("yes"), - response_2=inline("no"), - ) - query: discord.Message = await ctx.send(msg) - if can_react: - # noinspection PyAsyncCall - start_adding_reactions(query, ReactionPredicate.YES_OR_NO_EMOJIS) - pred = ReactionPredicate.yes_or_no(query, ctx.author) - event = "reaction_add" - else: - pred = MessagePredicate.yes_or_no(ctx) - event = "message" - try: - await ctx.bot.wait_for(event, check=pred, timeout=30) - except asyncio.TimeoutError: - with contextlib.suppress(discord.NotFound): - await query.delete() - return False - - if not pred.result: - if can_react: - with contextlib.suppress(discord.NotFound): - await query.delete() - else: - await ctx.send(_("OK then.")) - - return False - else: - if can_react: - with contextlib.suppress(discord.Forbidden): - await query.clear_reactions() - await self.config.guild(ctx.guild).sent_instructions.set(True) - return True + return True @commands.command() @commands.guild_only() @@ -1132,17 +1059,24 @@ async def activemutes(self, ctx: commands.Context): else: user_str = user.mention if mutes["until"]: - time_left = timedelta( - seconds=mutes["until"] - datetime.now(timezone.utc).timestamp() - ) - time_str = humanize_timedelta(timedelta=time_left) + timestamp = int(mutes["until"]) + time_str = discord.utils.format_dt(datetime.fromtimestamp(timestamp)) else: time_str = "" msg += f"{user_str} " if time_str: - msg += _("__Remaining__: {time_left}\n").format(time_left=time_str) + msg += _("__Until__: {time_left}\n").format(time_left=time_str) else: msg += "\n" + added_timeouts = False + for member in ctx.guild.members: + if member.is_timed_out(): + if not added_timeouts: + msg += _("__Server Timeouts__\n") + added_timeouts = True + msg += f"{member.mention}" + time_str = discord.utils.format_dt(member.timed_out_until) + msg += _("__Until__: {time_left}\n").format(time_left=time_str) for channel_id, mutes_data in self._channel_mutes.items(): if not mutes_data: continue @@ -1157,17 +1091,16 @@ async def activemutes(self, ctx: commands.Context): else: user_str = user.mention if mutes["until"]: - time_left = timedelta( - seconds=mutes["until"] - datetime.now(timezone.utc).timestamp() - ) - time_str = humanize_timedelta(timedelta=time_left) + timestamp = int(mutes["until"]) + time_str = discord.utils.format_dt(datetime.fromtimestamp(timestamp)) else: time_str = "" msg += f"{user_str} " if time_str: - msg += _("__Remaining__: {time_left}\n").format(time_left=time_str) + msg += _("__Until__: {time_left}\n").format(time_left=time_str) else: msg += "\n" + if msg: for page in pagify(msg): await ctx.maybe_send_embed(page) @@ -1176,7 +1109,89 @@ async def activemutes(self, ctx: commands.Context): @commands.command(usage=" [time_and_reason]") @commands.guild_only() - @commands.mod_or_permissions(manage_roles=True) + @commands.mod_or_permissions(moderate_members=True) + @commands.bot_has_permissions(moderate_members=True) + async def timeout( + self, + ctx: commands.Context, + users: commands.Greedy[discord.Member], + *, + time_and_reason: MuteTime = {}, + ): + """Timeout users. + + `` is a space separated list of usernames, ID's, or mentions. + `[time_and_reason]` is the time to mute for and reason. Time is + any valid time length such as `30 minutes` or `2 days`. If nothing + is provided the mute will use the set default time or indefinite if not set. + + Examples: + `[p]mute @member1 @member2 spam 5 hours` + `[p]mute @member1 3 days` + + """ + if not users: + return await ctx.send_help() + if ctx.me in users: + return await ctx.send(_("You cannot mute me.")) + if ctx.author in users: + return await ctx.send(_("You cannot mute yourself.")) + duration = time_and_reason.get("duration", None) + if duration and duration > timedelta(days=28): + await ctx.send(_(MUTE_UNMUTE_ISSUES["mute_is_too_long"])) + return + reason = time_and_reason.get("reason", None) + time = "" + until = None + if duration: + until = datetime.now(timezone.utc) + duration + length = humanize_timedelta(timedelta=duration) + time = _(" for {length} until {duration}").format( + length=length, duration=discord.utils.format_dt(until) + ) + + else: + default_duration = await self.config.guild(ctx.guild).default_time() + if default_duration: + until = datetime.now(timezone.utc) + timedelta(seconds=default_duration) + length = humanize_timedelta(seconds=default_duration) + time = _(" for {length} until {duration}").format( + length=length, duration=discord.utils.format_dt(until) + ) + + success_list = [] + issues_list = [] + for member in users: + ret = MuteResponse(success=False, reason=None, user=member) + if member.guild_permissions >= ctx.author.guild_permissions: + ret.reason = _(MUTE_UNMUTE_ISSUES["hierarchy_problem"]) + issues_list.append(ret) + continue + if member.guild_permissions.administrator: + ret.reason = _(MUTE_UNMUTE_ISSUES["is_admin"]) + issues_list.append(ret) + continue + + try: + await member.edit(timed_out_until=until, reason=reason) + success_list.append(member) + except Exception: + pass + if success_list: + msg = _("{users} has been timed out in this server{time}.") + if len(success_list) > 1: + msg = _("{users} have been timed out in this server{time}.") + await ctx.send( + msg.format(users=humanize_list([f"`{u}`" for u in success_list]), time=time) + ) + else: + await ctx.send(_("None of the users provided could be muted properly.")) + if issues_list: + await self.handle_issues(ctx, issues_list) + + @commands.command(usage=" [time_and_reason]") + @commands.guild_only() + @commands.mod_or_permissions(manage_roles=True, moderate_members=True) async def mute( self, ctx: commands.Context, @@ -1212,26 +1227,29 @@ async def mute( until = None if duration: until = datetime.now(timezone.utc) + duration - time = _(" for {duration}").format(duration=humanize_timedelta(timedelta=duration)) + length = humanize_timedelta(timedelta=duration) + time = _(" for {length} until {duration}").format( + length=length, duration=discord.utils.format_dt(until) + ) + else: default_duration = await self.config.guild(ctx.guild).default_time() if default_duration: until = datetime.now(timezone.utc) + timedelta(seconds=default_duration) - time = _(" for {duration}").format( - duration=humanize_timedelta(timedelta=timedelta(seconds=default_duration)) + length = humanize_timedelta(seconds=default_duration) + time = _(" for {length} until {duration}").format( + length=length, duration=discord.utils.format_dt(until) ) + author = ctx.message.author guild = ctx.guild audit_reason = get_audit_reason(author, reason, shorten=True) success_list = [] issue_list = [] for user in users: - success = await self.mute_user(guild, author, user, until, audit_reason) - if success["success"]: + response = await self.mute_user(guild, author, user, until, audit_reason) + if response.success: success_list.append(user) - if success["channels"]: - # incase we only muted a user in 1 channel not all - issue_list.append(success) await modlog.create_case( self.bot, guild, @@ -1247,7 +1265,7 @@ async def mute( user, author, guild, _("Server mute"), reason, duration ) else: - issue_list.append(success) + issue_list.append(response) if success_list: if ctx.guild.id not in self._server_mutes: self._server_mutes[ctx.guild.id] = {} @@ -1255,39 +1273,38 @@ async def mute( if len(success_list) > 1: msg = _("{users} have been muted in this server{time}.") await ctx.send( - msg.format(users=humanize_list([f"{u}" for u in success_list]), time=time) + msg.format(users=humanize_list([f"`{u}`" for u in success_list]), time=time) ) if issue_list: await self.handle_issues(ctx, issue_list) - def parse_issues(self, issue_list: dict) -> str: - reasons = {} - reason_msg = issue_list["reason"] + "\n" if issue_list["reason"] else None - channel_msg = "" - error_msg = _("{member} could not be (un)muted for the following reasons:\n").format( - member=issue_list["user"] - ) - if issue_list["channels"]: - for channel, reason in issue_list["channels"]: - if reason not in reasons: - reasons[reason] = [channel] - else: - reasons[reason].append(channel) + def parse_issues(self, issues: List[Union[MuteResponse, ChannelMuteResponse]]) -> str: + users = set(issue.user for issue in issues) + error_msg = "" + + for user in users: + error_msg += _("{member} could not be (un)muted for the following reasons:\n").format( + member=f"`{user}`" + ) + # I would like to replace this with a user mention but send_interactive + # does not support supressing mentions at this time. So in order to keep + # this formatting consistent the username is escaped in a code block. + for issue in issues: + if issue.user.id != user.id: + continue + if issue.reason: + error_msg += f"- {issue.reason}\n" - for reason, channel_list in reasons.items(): - channel_msg += _("- {reason} In the following channels: {channels}\n").format( - reason=reason, - channels=humanize_list([c.mention for c in channel_list]), - ) - error_msg += reason_msg or channel_msg return error_msg - async def handle_issues(self, ctx: commands.Context, issue_list: List[dict]) -> None: + async def handle_issues( + self, ctx: commands.Context, issue_list: List[Union[MuteResponse, ChannelMuteResponse]] + ) -> None: """ This is to handle the various issues that can return for each user/channel """ message = _( - "Some users could not be properly muted. Would you like to see who, where, and why?" + "Some users could not be properly muted or unmuted. Would you like to see who, where, and why?" ) can_react = can_user_react_in(ctx.me, ctx.channel) @@ -1320,7 +1337,7 @@ async def handle_issues(self, ctx: commands.Context, issue_list: List[dict]) -> if can_react: with contextlib.suppress(discord.Forbidden): await query.clear_reactions() - issue = "\n".join(self.parse_issues(issue) for issue in issue_list) + issue = self.parse_issues(issue_list) resp = pagify(issue) await ctx.send_interactive(resp) @@ -1360,14 +1377,12 @@ async def channel_mute( until = None if duration: until = datetime.now(timezone.utc) + duration - time = _(" for {duration}").format(duration=humanize_timedelta(timedelta=duration)) + time = _(" until {duration}").format(duration=discord.utils.format_dt(until)) else: default_duration = await self.config.guild(ctx.guild).default_time() if default_duration: until = datetime.now(timezone.utc) + timedelta(seconds=default_duration) - time = _(" for {duration}").format( - duration=humanize_timedelta(timedelta=timedelta(seconds=default_duration)) - ) + time = _(" until {duration}").format(duration=discord.utils.format_dt(until)) author = ctx.message.author channel = ctx.message.channel if isinstance(channel, discord.Thread): @@ -1377,12 +1392,14 @@ async def channel_mute( issue_list = [] success_list = [] for user in users: - success = await self.channel_mute_user( + response = await self.channel_mute_user( guild, channel, author, user, until, audit_reason ) - if success["success"]: + if response.success: success_list.append(user) - + if response.reason: + # This is incase we couldn't move the user from voice channels + issue_list.append(response) await modlog.create_case( self.bot, guild, @@ -1398,21 +1415,21 @@ async def channel_mute( user, author, guild, _("Channel mute"), reason, duration ) async with self.config.member(user).perms_cache() as cache: - cache[channel.id] = success["old_overs"] + cache[channel.id] = response.old_overs else: - issue_list.append((user, success["reason"])) + issue_list.append(response) if success_list: msg = _("{users} has been muted in this channel{time}.") if len(success_list) > 1: msg = _("{users} have been muted in this channel{time}.") await ctx.send( - msg.format(users=humanize_list([f"{u}" for u in success_list]), time=time) + msg.format(users=humanize_list([f"`{u}`" for u in success_list]), time=time) ) if issue_list: - msg = _("The following users could not be muted\n") - for user, issue in issue_list: - msg += f"{user}: {issue}\n" + msg = _("The following users could not be muted:\n") + for issue in issue_list: + msg += f"- `{issue.user}`: {issue.reason}\n" await ctx.send_interactive(pagify(msg)) @commands.command(usage=" [reason]") @@ -1449,9 +1466,12 @@ async def unmute( else: self._channel_mute_events[guild.id] = asyncio.Event() for user in users: - success = await self.unmute_user(guild, author, user, audit_reason) + response = await self.unmute_user(guild, author, user, audit_reason) - if success["success"]: + if response.success: + if response.reason: + # This is incase we couldn't move the user from voice channels + issue_list.append(response) success_list.append(user) await modlog.create_case( self.bot, @@ -1467,7 +1487,7 @@ async def unmute( user, author, guild, _("Server unmute"), reason ) else: - issue_list.append(success) + issue_list.append(response) self._channel_mute_events[guild.id].set() if success_list: if ctx.guild.id in self._server_mutes and self._server_mutes[ctx.guild.id]: @@ -1478,7 +1498,81 @@ async def unmute( await self.config.guild(ctx.guild).muted_users.clear() await ctx.send( _("{users} unmuted in this server.").format( - users=humanize_list([f"{u}" for u in success_list]) + users=humanize_list([f"`{u}`" for u in success_list]) + ) + ) + if issue_list: + await self.handle_issues(ctx, issue_list) + + @commands.command(usage=" [reason]", hidden=True) + @commands.guild_only() + @commands.mod_or_permissions(manage_roles=True) + async def forceunmute( + self, + ctx: commands.Context, + users: commands.Greedy[discord.Member], + *, + reason: Optional[str] = None, + ): + """Force Unmute users who have had channel overwrite mutes in every channel. + + `` is a space separated list of usernames, ID's, or mentions. + `[reason]` is the reason for the unmute. + """ + if not users: + return await ctx.send_help() + if ctx.me in users: + return await ctx.send(_("You cannot unmute me.")) + if ctx.author in users: + return await ctx.send(_("You cannot unmute yourself.")) + async with ctx.typing(): + guild = ctx.guild + author = ctx.author + audit_reason = get_audit_reason(author, reason, shorten=True) + issue_list = [] + success_list = [] + if guild.id in self._channel_mute_events: + self._channel_mute_events[guild.id].clear() + else: + self._channel_mute_events[guild.id] = asyncio.Event() + for user in users: + tasks = [] + for channel in guild.channels: + tasks.append( + self.channel_unmute_user(guild, channel, author, user, audit_reason) + ) + results = await bounded_gather(*tasks) + for result in results: + if not result.success: + issue_list.append(result) + if any(t.success for t in results): + success_list.append(user) + await modlog.create_case( + self.bot, + guild, + ctx.message.created_at, + "sunmute", + user, + author, + reason, + until=None, + ) + await self._send_dm_notification( + user, author, guild, _("Server unmute"), reason + ) + await self.config.member(user).clear() + + self._channel_mute_events[guild.id].set() + if success_list: + if ctx.guild.id in self._server_mutes and self._server_mutes[ctx.guild.id]: + await self.config.guild(ctx.guild).muted_users.set( + self._server_mutes[ctx.guild.id] + ) + else: + await self.config.guild(ctx.guild).muted_users.clear() + await ctx.send( + _("{users} unmuted in this server.").format( + users=humanize_list([f"`{u}`" for u in success_list]) ) ) if issue_list: @@ -1515,11 +1609,11 @@ async def unmute_channel( success_list = [] issue_list = [] for user in users: - success = await self.channel_unmute_user( + response = await self.channel_unmute_user( guild, channel, author, user, audit_reason ) - if success["success"]: + if response.success: success_list.append(user) await modlog.create_case( self.bot, @@ -1536,7 +1630,7 @@ async def unmute_channel( user, author, guild, _("Channel unmute"), reason ) else: - issue_list.append((user, success["reason"])) + issue_list.append(response) if success_list: if channel.id in self._channel_mutes and self._channel_mutes[channel.id]: await self.config.channel(channel).muted_users.set(self._channel_mutes[channel.id]) @@ -1544,13 +1638,13 @@ async def unmute_channel( await self.config.channel(channel).muted_users.clear() await ctx.send( _("{users} unmuted in this channel.").format( - users=humanize_list([f"{u}" for u in success_list]) + users=humanize_list([f"`{u}`" for u in success_list]) ) ) if issue_list: - msg = _("The following users could not be unmuted\n") - for user, issue in issue_list: - msg += f"{user}: {issue}\n" + msg = _("The following users could not be unmuted:\n") + for issue in issue_list: + msg += f"- `{issue.user}`: {issue.reason}\n" await ctx.send_interactive(pagify(msg)) async def mute_user( @@ -1560,43 +1654,31 @@ async def mute_user( user: discord.Member, until: Optional[datetime] = None, reason: Optional[str] = None, - ) -> Dict[ - str, Optional[Union[List[Tuple[discord.abc.GuildChannel, str]], discord.Member, bool, str]] - ]: + ) -> MuteResponse: """ Handles muting users """ permissions = user.guild_permissions - ret: Dict[ - str, - Union[bool, Optional[str], List[Tuple[discord.abc.GuildChannel, str]], discord.Member], - ] = { - "success": False, - "reason": None, - "channels": [], - "user": user, - } - # TODO: This typing is ugly and should probably be an object on its own - # along with this entire method and some other refactorization - # v1.0.0 is meant to look ugly right :') + ret: MuteResponse = MuteResponse(success=False, reason=None, user=user) + if permissions.administrator: - ret["reason"] = _(MUTE_UNMUTE_ISSUES["is_admin"]) + ret.reason = _(MUTE_UNMUTE_ISSUES["is_admin"]) return ret if not await self.is_allowed_by_hierarchy(guild, author, user): - ret["reason"] = _(MUTE_UNMUTE_ISSUES["hierarchy_problem"]) + ret.reason = _(MUTE_UNMUTE_ISSUES["hierarchy_problem"]) return ret mute_role = await self.config.guild(guild).mute_role() if mute_role: role = guild.get_role(mute_role) if not role: - ret["reason"] = _(MUTE_UNMUTE_ISSUES["role_missing"]) + ret.reason = _(MUTE_UNMUTE_ISSUES["role_missing"]) return ret if author != guild.owner and role >= author.top_role: - ret["reason"] = _(MUTE_UNMUTE_ISSUES["assigned_role_hierarchy_problem"]) + ret.reason = _(MUTE_UNMUTE_ISSUES["assigned_role_hierarchy_problem"]) return ret if not guild.me.guild_permissions.manage_roles or role >= guild.me.top_role: - ret["reason"] = _(MUTE_UNMUTE_ISSUES["permissions_issue_role"]) + ret.reason = _(MUTE_UNMUTE_ISSUES["permissions_issue_role"]) return ret # This is here to prevent the modlog case from happening on role updates # we need to update the cache early so it's there before we receive the member_update event @@ -1614,24 +1696,32 @@ async def mute_user( except discord.errors.Forbidden: if guild.id in self._server_mutes and user.id in self._server_mutes[guild.id]: del self._server_mutes[guild.id][user.id] - ret["reason"] = _(MUTE_UNMUTE_ISSUES["permissions_issue_role"]) + ret.reason = _(MUTE_UNMUTE_ISSUES["permissions_issue_role"]) return ret - ret["success"] = True + if user.voice: + try: + await user.move_to(user.voice.channel) + except discord.HTTPException: + # catch all discord errors because the result will be the same + # we successfully muted by this point but can't move the user + ret.reason = _(MUTE_UNMUTE_ISSUES["voice_mute_permission"]) + ret.success = True return ret else: - perms_cache = {} - tasks = [] - for channel in guild.channels: - tasks.append(self.channel_mute_user(guild, channel, author, user, until, reason)) - task_result = await bounded_gather(*tasks) - for task in task_result: - if not task["success"]: - ret["channels"].append((task["channel"], task["reason"])) - else: - chan_id = task["channel"].id - perms_cache[str(chan_id)] = task.get("old_overs") - ret["success"] = True - await self.config.member(user).perms_cache.set(perms_cache) + if until and (until - datetime.now(tz=timezone.utc)) > timedelta(days=28): + ret.reason = _(MUTE_UNMUTE_ISSUES["mute_is_too_long"]) + return ret + if not until: + ret.reason = _(MUTE_UNMUTE_ISSUES["timeouts_require_time"]) + return ret + if guild.me.guild_permissions.moderate_members: + try: + await user.edit(timed_out_until=until, reason=reason) + ret.success = True + except Exception: + ret.reason = _(MUTE_UNMUTE_ISSUES["permissions_issue_guild"]) + else: + ret.reason = _("I lack the moderate members permission.") return ret async def unmute_user( @@ -1640,58 +1730,49 @@ async def unmute_user( author: discord.Member, user: discord.Member, reason: Optional[str] = None, - ) -> Dict[ - str, - Union[bool, Optional[str], List[Tuple[discord.abc.GuildChannel, str]], discord.Member], - ]: + ) -> MuteResponse: """ Handles unmuting users """ - ret: Dict[ - str, - Union[bool, Optional[str], List[Tuple[discord.abc.GuildChannel, str]], discord.Member], - ] = { - "success": False, - "reason": None, - "channels": [], - "user": user, - } - mute_role = await self.config.guild(guild).mute_role() + ret: MuteResponse = MuteResponse(success=False, reason=None, user=user) + + mute_role_id = await self.config.guild(guild).mute_role() if not await self.is_allowed_by_hierarchy(guild, author, user): - ret["reason"] = _(MUTE_UNMUTE_ISSUES["hierarchy_problem"]) + ret.reason = _(MUTE_UNMUTE_ISSUES["hierarchy_problem"]) return ret - if mute_role: - role = guild.get_role(mute_role) - if not role: - ret["reason"] = _(MUTE_UNMUTE_ISSUES["role_missing"]) - return ret + reasons = [] + mute_role = guild.get_role(mute_role_id) + if mute_role and mute_role in user.roles: if guild.id in self._server_mutes: if user.id in self._server_mutes[guild.id]: del self._server_mutes[guild.id][user.id] - if not guild.me.guild_permissions.manage_roles or role >= guild.me.top_role: - ret["reason"] = _(MUTE_UNMUTE_ISSUES["permissions_issue_role"]) - return ret - try: - await user.remove_roles(role, reason=reason) - except discord.errors.Forbidden: - ret["reason"] = _(MUTE_UNMUTE_ISSUES["permissions_issue_role"]) - return ret - ret["success"] = True - return ret - else: - tasks = [] - for channel in guild.channels: - tasks.append(self.channel_unmute_user(guild, channel, author, user, reason)) - results = await bounded_gather(*tasks) - for task in results: - if not task["success"]: - ret["channels"].append((task["channel"], task["reason"])) - else: - ret["success"] = True - await self.config.member(user).clear() - return ret + if not guild.me.guild_permissions.manage_roles or mute_role >= guild.me.top_role: + reasons.append(_(MUTE_UNMUTE_ISSUES["permissions_issue_role"])) + else: + try: + await user.remove_roles(mute_role, reason=reason) + ret.success = True + except discord.errors.Forbidden: + reasons.append(_(MUTE_UNMUTE_ISSUES["permissions_issue_role"])) + + if user.is_timed_out(): + if guild.me.guild_permissions.moderate_members: + try: + await user.edit(timed_out_until=None, reason=reason) + ret.success = True + except Exception: + reasons.append(_(MUTE_UNMUTE_ISSUES["permissions_issue_guild"])) + else: + reasons.append(_("I lack the timeout members permission.")) + + if not reasons and not ret.success: + ret.reason = _(MUTE_UNMUTE_ISSUES["already_unmuted"]).format(location=_("this server")) + elif reasons: + ret.reason = "\n".join(reasons) + + return ret async def channel_mute_user( self, @@ -1703,32 +1784,34 @@ async def channel_mute_user( reason: Optional[str] = None, *, voice_mute: bool = False, - ) -> Dict[str, Optional[Union[discord.abc.GuildChannel, str, bool]]]: + ) -> ChannelMuteResponse: """Mutes the specified user in the specified channel""" overwrites = channel.overwrites_for(user) permissions = channel.permissions_for(user) + ret = ChannelMuteResponse( + success=False, + channel=channel, + reason=None, + user=user, + old_overs={}, + voice_mute=voice_mute, + ) + if permissions.administrator: - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["is_admin"]), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["is_admin"]) + return ret move_channel = False - send_reason = None if user.voice and user.voice.channel == channel: if channel.permissions_for(guild.me).move_members: move_channel = True else: - send_reason = _(MUTE_UNMUTE_ISSUES["voice_mute_permission"]) + ret.reason = _(MUTE_UNMUTE_ISSUES["voice_mute_permission"]) if not await self.is_allowed_by_hierarchy(guild, author, user): - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["hierarchy_problem"]), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["hierarchy_problem"]) + return ret if channel.id not in self._channel_mutes: self._channel_mutes[channel.id] = {} @@ -1741,11 +1824,8 @@ async def channel_mute_user( # We want to continue if this is a new mute or a mute upgrade, # otherwise we should return with failure. if current_mute is not None and not is_mute_upgrade: - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["already_muted"]), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["already_muted"]).format(location=channel.mention) + return ret new_overs: Dict[str, Optional[bool]] = {"speak": False} if not voice_mute: new_overs.update( @@ -1761,13 +1841,14 @@ async def channel_mute_user( perms_cache = await self.config.member(user).perms_cache() if "speak" in perms_cache: old_overs["speak"] = perms_cache["speak"] + ret.old_overs = old_overs overwrites.update(**new_overs) if not channel.permissions_for(guild.me).manage_permissions: - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["permissions_issue_channel"]), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["permissions_issue_channel"]).format( + location=channel.mention + ) + return ret + self._channel_mutes[channel.id][user.id] = { "author": author.id, "guild": guild.id, @@ -1788,41 +1869,35 @@ async def channel_mute_user( and user.id in self._channel_mutes[channel.id] ): del self._channel_mutes[channel.id][user.id] - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["unknown_channel"]), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["unknown_channel"]) + return ret + elif e.code == 10009: if ( channel.id in self._channel_mutes and user.id in self._channel_mutes[channel.id] ): del self._channel_mutes[channel.id][user.id] - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["left_guild"]), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["left_guild"]) + return ret + except discord.Forbidden: - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["permissions_issue_channel"]), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["permissions_issue_channel"]).format( + location=channel.mention + ) + return ret + if move_channel: try: await user.move_to(channel) except discord.HTTPException: # catch all discord errors because the result will be the same # we successfully muted by this point but can't move the user - return { - "success": True, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["voice_mute_permission"]), - "old_overs": old_overs, - } - return {"success": True, "channel": channel, "old_overs": old_overs, "reason": send_reason} + ret.reason = _(MUTE_UNMUTE_ISSUES["voice_mute_permission"]) + ret.success = True + return ret + ret.success = True + return ret async def channel_unmute_user( self, @@ -1833,11 +1908,20 @@ async def channel_unmute_user( reason: Optional[str] = None, *, voice_mute: bool = False, - ) -> Dict[str, Optional[Union[discord.abc.GuildChannel, str, bool]]]: + ) -> ChannelMuteResponse: """Unmutes the specified user in a specified channel""" overwrites = channel.overwrites_for(user) perms_cache = await self.config.member(user).perms_cache() + ret = ChannelMuteResponse( + success=False, + reason=None, + user=user, + channel=channel, + old_overs={}, + voice_mute=voice_mute, + ) + move_channel = False if channel.id in perms_cache: old_values = perms_cache[channel.id] @@ -1857,35 +1941,28 @@ async def channel_unmute_user( move_channel = True if not await self.is_allowed_by_hierarchy(guild, author, user): - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["hierarchy_problem"]), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["hierarchy_problem"]) + return ret overwrites.update(**old_values) if channel.id in self._channel_mutes and user.id in self._channel_mutes[channel.id]: current_mute = self._channel_mutes[channel.id].pop(user.id) else: - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["already_unmuted"]), - } - if not current_mute.get("voice_mute", False) and voice_mute: - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["is_not_voice_mute"]).format( - command=inline("unmutechannel") - ), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["already_unmuted"]).format(location=channel.mention) + return ret + + if not current_mute["voice_mute"] and voice_mute: + ret.reason = _(MUTE_UNMUTE_ISSUES["is_not_voice_mute"]).format( + command=inline("unmutechannel") + ) + return ret + if not channel.permissions_for(guild.me).manage_permissions: - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["permissions_issue_channel"]), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["permissions_issue_channel"]).format( + location=channel.mention + ) + return ret + try: if overwrites.is_empty(): await channel.set_permissions( @@ -1898,26 +1975,21 @@ async def channel_unmute_user( del muted_users[str(user.id)] except discord.NotFound as e: if e.code == 10003: - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["unknown_channel"]), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["unknown_channel"]) + return ret + elif e.code == 10009: - return { - "success": False, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["left_guild"]), - } + ret.reason = _(MUTE_UNMUTE_ISSUES["left_guild"]) + return ret + if move_channel: try: await user.move_to(channel) except discord.HTTPException: # catch all discord errors because the result will be the same # we successfully muted by this point but can't move the user - return { - "success": True, - "channel": channel, - "reason": _(MUTE_UNMUTE_ISSUES["voice_mute_permission"]), - } - return {"success": True, "channel": channel, "reason": None} + ret.success = True + ret.reason = _(MUTE_UNMUTE_ISSUES["voice_mute_permission"]) + return ret + ret.success = True + return ret diff --git a/redbot/cogs/mutes/voicemutes.py b/redbot/cogs/mutes/voicemutes.py index dec9e8a43e0..075c5339d1f 100644 --- a/redbot/cogs/mutes/voicemutes.py +++ b/redbot/cogs/mutes/voicemutes.py @@ -122,13 +122,13 @@ async def voice_mute( channel = user_voice_state.channel audit_reason = get_audit_reason(author, reason, shorten=True) - success = await self.channel_mute_user( + result = await self.channel_mute_user( guild, channel, author, user, until, audit_reason, voice_mute=True ) - if success["success"]: - if "reason" in success and success["reason"]: - issue_list.append((user, success["reason"])) + if result.success: + if result.reason: + issue_list.append((user, result.reason)) else: success_list.append(user) await modlog.create_case( @@ -146,9 +146,9 @@ async def voice_mute( user, author, guild, _("Voice mute"), reason, duration ) async with self.config.member(user).perms_cache() as cache: - cache[channel.id] = success["old_overs"] + cache[channel.id] = result.old_overs else: - issue_list.append((user, success["reason"])) + issue_list.append((user, result.reason)) if success_list: msg = _("{users} has been muted in this channel{time}.") @@ -198,13 +198,13 @@ async def unmute_voice( channel = user_voice_state.channel audit_reason = get_audit_reason(author, reason, shorten=True) - success = await self.channel_unmute_user( + result = await self.channel_unmute_user( guild, channel, author, user, audit_reason, voice_mute=True ) - if success["success"]: - if "reason" in success and success["reason"]: - issue_list.append((user, success["reason"])) + if result.success: + if result.reason: + issue_list.append((user, result.reason)) else: success_list.append(user) await modlog.create_case( @@ -222,7 +222,7 @@ async def unmute_voice( user, author, guild, _("Voice unmute"), reason ) else: - issue_list.append((user, success["reason"])) + issue_list.append((user, result.reason)) if success_list: if channel.id in self._channel_mutes and self._channel_mutes[channel.id]: await self.config.channel(channel).muted_users.set(self._channel_mutes[channel.id])