Skip to content

Commit

Permalink
feature: rework plex class
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroquinc committed May 30, 2024
1 parent 82202f1 commit a25f06a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 107 deletions.
169 changes: 64 additions & 105 deletions api/plex/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json
import discord

from config.globals import PLEX_ICON, PLEX_PLAYING, PLEX_CONTENT
from src.discord.embed import EmbedBuilder
Expand All @@ -9,35 +8,28 @@ class PlexWebhookHandler:
def __init__(self, payload, discord_bot):
self.payload = payload
self.discord_bot = discord_bot
self.extract_details()

# Global variables for the class to use in the embed creation
self.media_type = self.payload.get('source_metadata_details', {}).get('media_type', 'N/A').capitalize()
self.year = self.payload.get('source_metadata_details', {}).get('year', 'N/A')
self.title = self.payload.get('source_metadata_details', {}).get('title', 'N/A')
self.summary = self.payload.get('source_metadata_details', {}).get('summary', 'N/A')
self.quality = self.payload.get('source_metadata_details', {}).get('video_full_resolution', 'N/A')
self.air_date = self.payload.get('source_metadata_details', {}).get('air_date', 'N/A')
self.genres = self.payload.get('source_metadata_details', {}).get('genres', 'N/A')
self.release_date = self.payload.get('source_metadata_details', {}).get('release_date', 'N/A')
self.season_number = self.payload.get('source_metadata_details', {}).get('season_num00', 'N/A')
self.episode_number = self.payload.get('source_metadata_details', {}).get('episode_num00', 'N/A')
self.episode_count = self.payload.get('source_metadata_details', {}).get('episode_count', 'N/A')
self.poster_url = self.payload.get('source_metadata_details', {}).get('poster_url', 'N/A')
self.imdb_url = self.payload.get('source_metadata_details', {}).get('imdb_url', 'N/A')
self.tvdb_url = self.payload.get('source_metadata_details', {}).get('thetvdb_url', 'N/A')
self.trakt_url = self.payload.get('source_metadata_details', {}).get('trakt_url', 'N/A')
self.critic_rating = self.payload.get('source_metadata_details', {}).get('critic_rating', 'N/A')
self.audience_rating = self.payload.get('source_metadata_details', {}).get('audience_rating', 'N/A')
self.rating = self.payload.get('source_metadata_details', {}).get('rating', 'N/A')
self.username = self.payload.get('stream_details', {}).get('username', 'N/A')
self.platform = self.payload.get('stream_details', {}).get('platform', 'N/A')
self.player = self.payload.get('stream_details', {}).get('player', 'N/A')
self.product = self.payload.get('stream_details', {}).get('product', 'N/A')
self.video_decision = self.payload.get('stream_details', {}).get('video_decision', 'N/A').capitalize()
self.remaining_time = self.payload.get('stream_details', {}).get('remaining_time', 'N/A')
self.duration_time = self.payload.get('stream_details', {}).get('duration_time', 'N/A')
self.server_name = self.payload.get('server_info', {}).get('server_name', 'N/A')
self.webhook_type = self.payload.get('server_info', {}).get('webhook_type', 'N/A')
def extract_details(self):
details = self.payload.get('source_metadata_details', {})
stream_details = self.payload.get('stream_details', {})
server_info = self.payload.get('server_info', {})
fields = [
'media_type', 'year', 'title', 'summary', 'quality', 'air_date', 'genres',
'release_date', 'season_num00', 'episode_num00', 'episode_count', 'poster_url',
'imdb_url', 'tvdb_url', 'trakt_url', 'plex_url', 'critic_rating', 'audience_rating',
'rating', 'username', 'platform', 'player', 'product', 'video_decision',
'remaining_time', 'duration_time', 'server_name', 'webhook_type'
]
for field in fields:
field_value = 'N/A'
if field in details:
field_value = details.get(field)
elif field in stream_details:
field_value = stream_details.get(field)
elif field in server_info:
field_value = server_info.get(field)
setattr(self, field, field_value)

async def handle_webhook(self):
logger.info(f"Processing Plex webhook payload for event type: {self.webhook_type}")
Expand All @@ -47,97 +39,64 @@ async def handle_webhook(self):
def determine_channel_id(self):
channel_ids = {
'nowplaying': PLEX_PLAYING,
'nowresuming': PLEX_PLAYING,
'finished': PLEX_PLAYING,
#'nowresuming': PLEX_PLAYING,
#'finished': PLEX_PLAYING,
'newcontent_episode': PLEX_CONTENT,
'newcontent_season': PLEX_CONTENT,
'newcontent_movie': PLEX_CONTENT,
}
return channel_ids.get(self.webhook_type, 'default_channel_id')

def generate_embed(self):
# Use a dictionary to map event types to their respective embed creation methods
embed_creators = {
'nowplaying': self.embed_for_nowplaying,
'nowresuming': self.embed_for_nowresuming,
'finished': self.embed_for_finished,
'newcontent_episode': self.embed_for_newcontent_episode,
'newcontent_season': self.embed_for_newcontent_season,
'newcontent_movie': self.embed_for_newcontent_movie,
'nowplaying': self.embed_for_event,
#'nowresuming': self.embed_for_event,
#'finished': self.embed_for_event,
'newcontent_episode': self.embed_for_newcontent,
'newcontent_season': self.embed_for_newcontent,
'newcontent_movie': self.embed_for_newcontent,
}
return embed_creators.get(self.webhook_type)()

# Get the embed creation method for the event type
generate_embed = embed_creators.get(self.webhook_type)

# Call the embed creation method and return the embed
return generate_embed()

def embed_for_nowplaying(self):
title = f"{self.title} ({self.year})" if self.media_type == "Movie" else f"{self.title} (S{self.season_number}E{self.episode_number})"
embed = EmbedBuilder(title=title, color=0xe5a00d)
poster_path = self.poster_url
if poster_path:
embed.set_thumbnail(url=poster_path)
embed.set_author(name="Playing on Plex", icon_url=PLEX_ICON)
embed.set_footer(text=f"{self.username}{self.video_decision}{self.product}")
return embed

def embed_for_nowresuming(self):
title = f"{self.title} ({self.year})" if self.media_type == "Movie" else f"{self.title} (S{self.season_number}E{self.episode_number})"
embed = EmbedBuilder(title=title, color=0xe5a00d)
poster_path = self.poster_url
if poster_path:
embed.set_thumbnail(url=poster_path)
embed.set_author(name="Resuming on Plex", icon_url=PLEX_ICON)
embed.set_footer(text=f"{self.username}{self.video_decision}{self.product}")
return embed

def embed_for_finished(self):
title = f"{self.title} ({self.year})" if self.media_type == "Movie" else f"{self.title} (S{self.season_number}E{self.episode_number})"
embed = EmbedBuilder(title=title, color=0xe5a00d)
poster_path = self.poster_url
if poster_path:
embed.set_thumbnail(url=poster_path)
embed.set_author(name="Finished on Plex", icon_url=PLEX_ICON)
embed.set_footer(text=f"{self.username}{self.video_decision}{self.product}")
def embed_for_event(self):
title = f"{self.title} ({self.year})" if self.media_type == "Movie" else f"{self.title} (S{self.season_num00}E{self.episode_num00})"
embed = EmbedBuilder(title=title, url=self.plex_url, color=0xe5a00d)
if self.poster_url:
embed.set_thumbnail(url=self.poster_url)
embed.set_author(name="Now Playing on Plex", icon_url=PLEX_ICON)
embed.set_footer(text=f"{self.username.capitalize()}{self.video_decision.title()}{self.product}")
return embed

def embed_for_newcontent_episode(self):
title = f"{self.title} (S{self.season_number}E{self.episode_number})"
embed = EmbedBuilder(title=title, color=0xe5a00d)
poster_path = self.poster_url
if poster_path:
embed.set_thumbnail(url=poster_path)
embed.set_author(name="New Episode added to Plex", icon_url=PLEX_ICON)
embed.set_footer(text=f"{self.server_name}")
return embed

def embed_for_newcontent_season(self):
title = f"{self.title}"
embed = EmbedBuilder(title=title, color=0xe5a00d)
poster_path = self.poster_url
if poster_path:
embed.set_thumbnail(url=poster_path)
embed.add_field(name="Season", value=f"Season {self.season_number}", inline=True)
embed.add_field(name="Episodes", value=f"{self.episode_count}", inline=True)
embed.set_author(name="New Season added to Plex", icon_url=PLEX_ICON)
embed.set_footer(text=f"{self.server_name}")
return embed

def embed_for_newcontent_movie(self):
title = f"{self.title} ({self.year})"
embed = EmbedBuilder(title=title, color=0xe5a00d)
poster_path = self.poster_url
if poster_path:
embed.set_thumbnail(url=poster_path)
embed.set_author(name="New Movie added to Plex", icon_url=PLEX_ICON)
embed.set_footer(text=f"{self.genres}")

def embed_for_newcontent(self):
embed = EmbedBuilder(title=self.get_newcontent_title(), description=self.get_description(), url=self.plex_url, color=0xe5a00d)
if self.poster_url:
embed.set_thumbnail(url=self.poster_url)
if self.webhook_type == 'newcontent_season':
embed.add_field(name="Season", value=f"Season {self.season_num00}", inline=True)
embed.add_field(name="Episodes", value=f"{self.episode_count}", inline=True)
embed.set_author(name=f"New {self.media_type.capitalize()} added to Plex", icon_url=PLEX_ICON)
return embed


def get_newcontent_title(self):
titles = {
'newcontent_episode': f"{self.title} (S{self.season_num00}E{self.episode_num00})",
'newcontent_season': f"{self.title} - Season {self.season_num00}",
'newcontent_movie': f"{self.title} ({self.year})"
}
return titles.get(self.webhook_type, self.title)

def get_description(self):
descriptions = {
'newcontent_episode': '',
'newcontent_season': self.genres,
'newcontent_movie': self.genres,
}
return descriptions.get(self.webhook_type, '')

async def dispatch_embed(self):
embed = self.generate_embed()
channel_id = self.determine_channel_id()
channel = self.discord_bot.bot.get_channel(channel_id) # Get the Channel object using the ID
channel = self.discord_bot.bot.get_channel(channel_id)
if channel:
await embed.send_embed(channel)
else:
Expand Down
4 changes: 2 additions & 2 deletions src/discord/embed.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import discord

class EmbedBuilder:
def __init__(self, title='', description='', color=discord.Color.default()):
self.embed = discord.Embed(title=title, description=description, color=color)
def __init__(self, title='', description='', color=discord.Color.default(), url=''):
self.embed = discord.Embed(title=title, description=description, color=color, url=url)

def set_author(self, name, icon_url=None, url=None):
self.embed.set_author(name=name, icon_url=icon_url, url=url)
Expand Down

0 comments on commit a25f06a

Please sign in to comment.