Skip to content

Commit

Permalink
Feature: Bumps (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
holedaemon authored Apr 13, 2024
1 parent d307415 commit 86b0bb7
Show file tree
Hide file tree
Showing 9 changed files with 553 additions and 12 deletions.
2 changes: 2 additions & 0 deletions cogs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING
from .debug import DebugCog
from .player import PlayerCog
from .bumps import BumpCog
if TYPE_CHECKING:
from utils.blanco import BlancoBot

Expand All @@ -17,3 +18,4 @@ def setup(bot: 'BlancoBot'):
# Add cogs
bot.add_cog(DebugCog(bot))
bot.add_cog(PlayerCog(bot))
bot.add_cog(BumpCog(bot))
228 changes: 228 additions & 0 deletions cogs/bumps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
"""
BumpCog: Cog for guild bumps.
"""

from typing import TYPE_CHECKING

from nextcord import (Color, Permissions, Interaction, SlashOption, slash_command)
from nextcord.ext.commands import Cog

from dataclass.bump import Bump

from utils.url import check_url
from utils.embeds import CustomEmbed, create_error_embed, create_success_embed
from utils.logger import create_logger
from utils.paginator import Paginator, list_chunks

if TYPE_CHECKING:
from utils.blanco import BlancoBot


class BumpCog(Cog):
"""
Cog for guild bumps.
"""
def __init__(self, bot: 'BlancoBot'):
"""
Constructor for BumpCog.
"""
self._bot = bot
self._logger = create_logger(self.__class__.__name__)
self._logger.info('Loaded BumpCog')

@slash_command(
name='bump',
dm_permission=False,
default_member_permissions=Permissions(manage_guild=True)
)
async def bump(self, itx: Interaction):
"""
Base slash command for bumps.
"""


@bump.subcommand(name='toggle', description='Toggle the playback of bumps.')
async def bump_toggle(
self,
itx: Interaction,
toggle: bool = SlashOption(
name='toggle',
description='Turn bumps on or off?',
required=False
)
):
"""
Subcommand for toggling bumps.
"""
if itx.guild is None:
raise RuntimeError('[bump::toggle] itx.guild is None')

if toggle is None:
enabled = self._bot.database.get_bumps_enabled(itx.guild.id)
status = "Bump playback is currently enabled." if enabled \
else "Bump playback is currently disabled."
return await itx.response.send_message(
embed=create_success_embed(
title="Bumps status",
body=status,
)
)

self._bot.database.set_bumps_enabled(itx.guild.id, toggle)
status = "Bump playback has been enabled." if toggle \
else "Bump playback has been disabled."
return await itx.response.send_message(
embed=create_success_embed(
title="Bumps toggled",
body=status,
)
)

@bump.subcommand(name='add', description='Add a bump.')
async def bump_add(
self,
itx: Interaction,
title: str = SlashOption(name='title', description='Title of bump.', required=True),
author: str = SlashOption(name='author', description='Author of bump.', required=True),
url: str = SlashOption(name='url', description='URL to add.', required=True),
):
"""
Subcommand for adding a bump.
"""
if itx.guild is None:
raise RuntimeError('[bump::add] itx.guild is None')

if len(title) > 32 or len(author) > 32:
return await itx.response.send_message(
embed=create_error_embed(
message='Titles/authors cannot exceed 32 characters in length.'
)
)

if not check_url(url):
return await itx.response.send_message(
embed=create_error_embed(
message='The given URL is not valid.'
)
)

bump = self._bot.database.get_bump_by_url(itx.guild.id, url)
if bump is not None:
return await itx.response.send_message(
embed=create_error_embed(
message='A bump with the given URL already exists.'
)
)

self._bot.database.add_bump(itx.guild.id, url, title, author)
return await itx.response.send_message(
embed=create_success_embed(
title='Bump added',
body='Bump has been successfully added to the database.'
)
)

@bump.subcommand(name='remove', description='Remove a bump.')
async def bump_remove(
self,
itx: Interaction,
idx: int = SlashOption(name='index', description='Index of bump.', required=True)
):
"""
Subcommand for removing a bump.
"""
if itx.guild is None:
raise RuntimeError('[bump::remove] itx.guild is None')

bump = self._bot.database.get_bump(itx.guild.id, idx)
if bump is None:
return await itx.response.send_message(
embed=create_error_embed(
message='There is no bump at that index for this guild.'
)
)

self._bot.database.delete_bump(itx.guild.id, idx)
return await itx.response.send_message(
embed=create_success_embed(
title='Bump removed',
body='Bump has successfully been removed from the database.'
)
)

@bump.subcommand(name='list', description='List every bump.')
async def bump_list(
self,
itx: Interaction,
):
"""
Subcommand for listing bumps.
"""
if itx.guild is None:
raise RuntimeError('[bump::list] itx.guild is None')
await itx.response.defer()

bumps = self._bot.database.get_bumps(itx.guild.id)
if bumps is None:
return await itx.response.send_message(
embed=create_error_embed(
message='This guild has no bumps.'
)
)

pages = []
count = 1
for _, chunk in enumerate(list_chunks(bumps)):
chunk_bumps = []

bump: Bump
for bump in chunk:
line = f'{bump.idx} :: [{bump.title}]({bump.url}) by {bump.author}'
chunk_bumps.append(line)
count += 1

embed = CustomEmbed(
title=f'Bumps for {itx.guild.name}',
description='\n'.join(chunk_bumps),
color=Color.lighter_gray()
)
pages.append(embed.get())

paginator = Paginator(itx)
return await paginator.run(pages)

@bump.subcommand(name='interval', description='Set or get the bump interval.')
async def bump_interval(
self,
itx: Interaction,
interval: int = SlashOption(
name='interval',
description='The new interval bumps will play at',
required=False,
min_value=1,
max_value=60
)
):
"""
Subcommand for changing/checking the bump interval.
"""

if itx.guild is None:
raise RuntimeError('[bump::interval] itx.guild is None')

if interval is None:
curr_interval = self._bot.database.get_bump_interval(itx.guild.id)
return await itx.response.send_message(
embed=create_success_embed(
title='Current Interval',
body=f'A bump will play once at least every {curr_interval} minute(s).'
)
)

self._bot.database.set_bump_interval(itx.guild.id, interval)
return await itx.response.send_message(
embed=create_success_embed(
title='Interval Changed',
body=f'The bump interval has been set to {interval} minute(s).'
)
)
11 changes: 1 addition & 10 deletions cogs/player/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

from asyncio import TimeoutError as AsyncioTimeoutError
from itertools import islice
from typing import TYPE_CHECKING, Any, Generator, List, Optional

from mafic import PlayerNotConnected
Expand All @@ -20,7 +19,7 @@
from utils.exceptions import (EmptyQueueError, EndOfQueueError, JockeyError,
JockeyException, SpotifyNoResultsError)
from utils.logger import create_logger
from utils.paginator import Paginator
from utils.paginator import Paginator, list_chunks
from utils.player_checks import check_mutual_voice
from views.spotify_dropdown import SpotifyDropdownView

Expand All @@ -31,14 +30,6 @@
from utils.blanco import BlancoBot


def list_chunks(data: List[Any]) -> Generator[List[Any], Any, Any]:
"""
Yield 10-element chunks of a list. Used for pagination.
"""
for i in range(0, len(data), 10):
yield list(islice(data, i, i + 10))


class PlayerCog(Cog):
"""
Cog for creating, controlling, and destroying music players for guilds.
Expand Down
51 changes: 50 additions & 1 deletion cogs/player/jockey.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from utils.constants import UNPAUSE_THRESHOLD
from utils.embeds import create_error_embed
from utils.exceptions import (EndOfQueueError, JockeyError, JockeyException,
LavalinkSearchError, SpotifyNoResultsError)
LavalinkSearchError, SpotifyNoResultsError,
BumpError, BumpNotReadyError, BumpNotEnabledError)
from utils.musicbrainz import annotate_track
from utils.time import human_readable_time
from views.now_playing import NowPlayingView
Expand Down Expand Up @@ -547,6 +548,18 @@ async def skip(self, *, forward: bool = True, index: int = -1, auto: bool = True
# to prevent the user from spamming them.
await self._edit_np_controls(show_controls=False)

try:
await self.play_bump()
return
except (JockeyException, SpotifyNoResultsError) as err:
self._logger.error('Error parsing bump into track: %s', err)
except BumpError as err:
self._logger.error('Error playing bump: %s', err)
except BumpNotEnabledError:
self._logger.debug('Bumps are not enabled in this guild.')
except BumpNotReadyError:
self._logger.debug('Not ready to play a bump yet.')

# If index is specified, use that instead
if index != -1:
try:
Expand Down Expand Up @@ -632,3 +645,39 @@ async def update_now_playing(self):
self.guild.name,
exc
)

async def play_bump(self):
"""
Check and attempt to play a bump if it's been long enough.
"""

enabled = self._db.get_bumps_enabled(self.guild.id)
if not enabled:
raise BumpNotEnabledError

interval = self._db.get_bump_interval(self.guild.id) * 60
last_bump = self._db.get_last_bump(self.guild.id)

if last_bump == 0:
self._db.set_last_bump(self.guild.id)
raise BumpNotReadyError

if int(time()) - last_bump < interval:
raise BumpNotReadyError

bump = self._db.get_random_bump(self.guild.id)
if bump is None:
raise BumpError('Guild has no bumps.')

requester = self._bot.user.id if self._bot.user is not None else self.guild.me.id

try:
tracks = await parse_query(self.node, self._bot.spotify, bump.url, requester)
except (JockeyException, SpotifyNoResultsError):
raise

if len(tracks) == 0:
raise BumpError('Unable to parse bump URL into tracks.')

await self._play(tracks[0])
self._db.set_last_bump(self.guild.id)
Loading

0 comments on commit 86b0bb7

Please sign in to comment.