diff --git a/Pipfile b/Pipfile index 7d5ce53..4a06849 100644 --- a/Pipfile +++ b/Pipfile @@ -6,10 +6,12 @@ name = "pypi" [packages] requests = "*" python-dateutil = "*" -pytz = "*" -twitch-python = "*" +pytz = ">=2018.9" +twitch-python = ">=0.0.11" [dev-packages] +wheel = "*" +twine = "*" [requires] python_version = "3.7" diff --git a/Pipfile.lock b/Pipfile.lock index 62f3724..e4f293a 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "e3fc214df40ca53fe36b6711472078254e56e49642947dbaa9279b2d9520273b" + "sha256": "f4bd74580ae162bd2792ecb7e119d68e7b1db455f92e6c973ab1ef8ea6c921ae" }, "pipfile-spec": 6, "requires": { @@ -91,5 +91,122 @@ "version": "==1.24.1" } }, - "develop": {} + "develop": { + "bleach": { + "hashes": [ + "sha256:213336e49e102af26d9cde77dd2d0397afabc5a6bf2fed985dc35b5d1e285a16", + "sha256:3fdf7f77adcf649c9911387df51254b813185e32b2c6619f690b593a617e19fa" + ], + "version": "==3.1.0" + }, + "certifi": { + "hashes": [ + "sha256:59b7658e26ca9c7339e00f8f4636cdfe59d34fa37b9b04f6f9e9926b3cece1a5", + "sha256:b26104d6835d1f5e49452a26eb2ff87fe7090b89dfcaee5ea2212697e1e1d7ae" + ], + "version": "==2019.3.9" + }, + "chardet": { + "hashes": [ + "sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae", + "sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691" + ], + "version": "==3.0.4" + }, + "docutils": { + "hashes": [ + "sha256:02aec4bd92ab067f6ff27a38a38a41173bf01bed8f89157768c1573f53e474a6", + "sha256:51e64ef2ebfb29cae1faa133b3710143496eca21c530f3f71424d77687764274", + "sha256:7a4bd47eaf6596e1295ecb11361139febe29b084a87bf005bf899f9a42edc3c6" + ], + "version": "==0.14" + }, + "idna": { + "hashes": [ + "sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407", + "sha256:ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c" + ], + "version": "==2.8" + }, + "pkginfo": { + "hashes": [ + "sha256:7424f2c8511c186cd5424bbf31045b77435b37a8d604990b79d4e70d741148bb", + "sha256:a6d9e40ca61ad3ebd0b72fbadd4fba16e4c0e4df0428c041e01e06eb6ee71f32" + ], + "version": "==1.5.0.1" + }, + "pygments": { + "hashes": [ + "sha256:5ffada19f6203563680669ee7f53b64dabbeb100eb51b61996085e99c03b284a", + "sha256:e8218dd399a61674745138520d0d4cf2621d7e032439341bc3f647bff125818d" + ], + "version": "==2.3.1" + }, + "readme-renderer": { + "hashes": [ + "sha256:bb16f55b259f27f75f640acf5e00cf897845a8b3e4731b5c1a436e4b8529202f", + "sha256:c8532b79afc0375a85f10433eca157d6b50f7d6990f337fa498c96cd4bfc203d" + ], + "version": "==24.0" + }, + "requests": { + "hashes": [ + "sha256:502a824f31acdacb3a35b6690b5fbf0bc41d63a24a45c4004352b0242707598e", + "sha256:7bf2a778576d825600030a110f3c0e3e8edc51dfaafe1c146e39a2027784957b" + ], + "index": "pypi", + "version": "==2.21.0" + }, + "requests-toolbelt": { + "hashes": [ + "sha256:380606e1d10dc85c3bd47bf5a6095f815ec007be7a8b69c878507068df059e6f", + "sha256:968089d4584ad4ad7c171454f0a5c6dac23971e9472521ea3b6d49d610aa6fc0" + ], + "version": "==0.9.1" + }, + "six": { + "hashes": [ + "sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c", + "sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73" + ], + "version": "==1.12.0" + }, + "tqdm": { + "hashes": [ + "sha256:d385c95361699e5cf7622485d9b9eae2d4864b21cd5a2374a9c381ffed701021", + "sha256:e22977e3ebe961f72362f6ddfb9197cc531c9737aaf5f607ef09740c849ecd05" + ], + "version": "==4.31.1" + }, + "twine": { + "hashes": [ + "sha256:0fb0bfa3df4f62076cab5def36b1a71a2e4acb4d1fa5c97475b048117b1a6446", + "sha256:d6c29c933ecfc74e9b1d9fa13aa1f87c5d5770e119f5a4ce032092f0ff5b14dc" + ], + "index": "pypi", + "version": "==1.13.0" + }, + "urllib3": { + "hashes": [ + "sha256:61bf29cada3fc2fbefad4fdf059ea4bd1b4a86d2b6d15e1c7c0b582b9752fe39", + "sha256:de9529817c93f27c8ccbfead6985011db27bd0ddfcdb2d86f3f663385c6a9c22" + ], + "version": "==1.24.1" + }, + "webencodings": { + "hashes": [ + "sha256:a0af1213f3c2226497a97e2b3aa01a7e4bee4f403f95be16fc9acd2947514a78", + "sha256:b36a1c245f2d304965eb4e0a82848379241dc04b865afcc4aab16748587e1923" + ], + "version": "==0.5.1" + }, + "wheel": { + "hashes": [ + "sha256:66a8fd76f28977bb664b098372daef2b27f60dc4d1688cfab7b37a09448f0e9d", + "sha256:8eb4a788b3aec8abf5ff68d4165441bc57420c9f64ca5f471f58c3969fe08668" + ], + "index": "pypi", + "version": "==0.33.1" + } + } } diff --git a/app.py b/app.py deleted file mode 100644 index c843b9b..0000000 --- a/app.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env python3 -from typing import List - -import app - - -def main(): - if app.arguments.format == 'all': - - # Whitelist and blacklist - whitelist: List[str] = [] - blacklist: List[str] = [] - - # Populate lists if configured in settings - if 'all' in app.settings['formats']: - if 'whitelist' in app.settings['formats']['all']: - whitelist = app.settings['formats']['all']['whitelist'] - - if 'blacklist' in app.settings['formats']['all']: - blacklist = app.settings['formats']['all']['blacklist'] - - # If not input, download JSON data form API and - # use it as input value for the other formats. - if app.arguments.input is None: - app.arguments.input = app.download(app.arguments.video, 'json') - - # Download all formats. Ignore 'all' and 'json'. - for format_name in app.settings['formats']: - if format_name not in ['all', 'json']: - - if (whitelist and format_name not in whitelist) or (blacklist and format_name in blacklist): - if app.arguments.verbose: - print('Skipping {format_name}'.format(format_name=format_name)) - continue - else: - app.download(app.arguments.video, format_name) - - else: - app.download(app.arguments.video, app.arguments.format) - - if app.arguments.verbose: - print('Done') - - -if __name__ == "__main__": - main() diff --git a/app/__init__.py b/app/__init__.py deleted file mode 100644 index 8e4650d..0000000 --- a/app/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .cli import arguments -from .config import settings -from .downloader import download, draw_progress diff --git a/app/cli.py b/app/cli.py deleted file mode 100644 index cd0048b..0000000 --- a/app/cli.py +++ /dev/null @@ -1,95 +0,0 @@ -import argparse - -import app.config - - -# Ask for video ID -def prompt_video_id() -> str: - return input('Video ID: ').strip('v').strip() - - -# Ask for Twitch client ID -def prompt_client_id(initialize: bool = False): - print('Twitch requires a client ID to use their API.' - '\nRegister an application on https://dev.twitch.tv/dashboard to get yours.') - app.config.settings['client_id'] = input('Client ID: ').strip() - if initialize: - app.config.save(app.config.SETTINGS_FILE, app.config.settings) - else: - answer: str = input('Save client ID? (Y/n): ').strip().lower() - if not answer.startswith('n'): - app.config.save(app.config.SETTINGS_FILE, app.config.settings) - - -# Arguments -parser: argparse.ArgumentParser = argparse.ArgumentParser( - description='Twitch Chat Downloader v{version}'.format(version=app.config.settings['version'])) - -parser.add_argument('-v', '--video', type=str, help='Video id') -# parser.add_argument('-c', '--channel', type=str, help='Channel name') -# parser.add_argument('--limit', type=int, help='Number of videos from channel') -parser.add_argument('--client_id', type=str, help='Twitch client id') -parser.add_argument('--verbose', action='store_true') -parser.add_argument('-q', '--quiet', action='store_true') -parser.add_argument('-o', '--output', type=str, help='Output folder', default='./output') -parser.add_argument('-f', '--format', type=str, help='Message format', default='default') -# parser.add_argument('--start', type=int, help='Start time in seconds from video start') -# parser.add_argument('--stop', type=int, help='Stop time in seconds from video start') -parser.add_argument('--timezone', type=str, help='Timezone name') -parser.add_argument('--init', action='store_true', help='Script setup') -parser.add_argument('--update', action='store_true', help='Update settings') -parser.add_argument('--version', action='store_true', help='Settings version') -parser.add_argument('--formats', action='store_true', help='List available formats') -parser.add_argument('--preview', action='store_true', help='Print chat lines') -parser.add_argument('--input', type=str, help='Read data from JSON file') - -arguments = parser.parse_args() - -# Turn format to lowercase -arguments.format = str(arguments.format).lower() - -# Initialize -if arguments.init: - prompt_client_id(initialize=True) - print('Twitch Chat Downloader has been initialized.') - exit(1) - -# Update -if arguments.update: - print('You are up to date with v{}'.format(app.config.settings['version'])) - exit(1) - -# Version -if arguments.version: - print('Twitch Chat Downloader v{version}'.format(version=str(app.config.settings['version']))) - exit(1) - -# List formats -if arguments.formats: - for format_name in app.config.settings['formats']: - print(format_name) - _format = app.config.settings['formats'][format_name] - if 'comments' in _format: - print('\tcomment: {}'.format(app.config.settings['formats'][format_name]['comments']['format'])) - if 'output' in _format: - print('\toutput: {}'.format(app.config.settings['formats'][format_name]['output']['format'])) - print('\n') - - exit(1) - -# Video ID -if arguments.video is None and arguments.input is None: - arguments.video = prompt_video_id() - -# Client ID -if app.config.settings['client_id'] is None and arguments.client_id is None: - prompt_client_id() - -# Client ID argument -if arguments.client_id: - if app.config.settings['client_id'] is not arguments.client_id: - app.config.settings['client_id'] = str(arguments.client_id).strip() - save: str = input('Save client ID? (Y/n): ').strip().lower() - if not save.startswith('n'): - app.config.save(app.config.SETTINGS_FILE, app.config.settings) - diff --git a/app/config.py b/app/config.py deleted file mode 100644 index c9b0df5..0000000 --- a/app/config.py +++ /dev/null @@ -1,69 +0,0 @@ -import json -import shutil -from pathlib import Path - -SETTINGS_EXAMPLE_FILE: str = 'settings.example.json' -SETTINGS_FILE: str = 'settings.json' - - -def read(filename: str) -> dict: - with open(filename, 'r', encoding='utf-8') as file: - return json.load(file) - - -def load(filename: str) -> dict: - # Copy example file if necessary - if not Path(filename).is_file(): - shutil.copyfile(SETTINGS_EXAMPLE_FILE, filename) - - # Load config files - config_example: dict = read(SETTINGS_EXAMPLE_FILE) - config: dict = read(filename) - - # Config versioning and updating - if config['version'] != config_example['version']: - return prompt_update(config, config_example) - - return config - - -def save(filename: str, data: dict): - """ - Convert config dictionary to file and save to file. - :param filename: Output filename - :param data: Config dictionary - :return: - """ - with open(filename, 'w') as file: - json.dump(data, file, indent=4, sort_keys=True) - - -def prompt_update(current_config: dict, new_config: dict) -> dict: - print('Your settings file is outdated ({}). Please update to {}'.format(current_config['version'], - new_config['version'])) - - answer = input('Update to new version? Existing settings will be backed up. (Y/n): ') - if answer.lower().startswith('n'): - exit(1) - else: - return update(current_config, new_config) - - -def update(current_config: dict, new_config: dict) -> dict: - save('settings.{}.backup.json'.format(current_config['version']), current_config) - - # Copy client id to new config file - new_config['client_id'] = current_config['client_id'] - - # Copy user-defined formats to new config file - for format_name, format_dictionary in dict(current_config['formats']).items(): - if format_name not in new_config['formats']: - new_config['formats'][format_name] = format_dictionary - - # Overwrite current config file with new config. - save(SETTINGS_FILE, new_config) - - return new_config - - -settings: dict = load('settings.json') diff --git a/app/downloader.py b/app/downloader.py deleted file mode 100644 index ce7925a..0000000 --- a/app/downloader.py +++ /dev/null @@ -1,57 +0,0 @@ -import json -import os -import sys - -import app.cli -import app.formats as formats -import app.twitch as twitch - - -def draw_progress(current: float, end: float, description: str = 'Downloading'): - sys.stdout.write('[{}] {}%\r'.format(description, '%.2f' % min(current * 10 / end * 10, 100.00))) - sys.stdout.flush() - - -def download_multiple_formats(): - pass - - -def download(video_id: str, format_name: str) -> str: - if app.cli.arguments.verbose: - print('Downloading {} initialized'.format(format_name)) - - # Get Video - video: twitch.Video = twitch.Video(video_id) - - # Format video comments and output - lines, output = formats.use(format_name, video) - - # Create output directory - if not os.path.exists(os.path.dirname(output)): - os.makedirs(os.path.dirname(output)) - - # Save to file - with open(output, 'w+', encoding='utf-8') as file: - - # Special case for saving JSON data - if format_name == 'json': - for data in lines: - json.dump(data, file, indent=4, sort_keys=True) - else: - # Save formatted comments/lines to file - for line, line_dictionary in lines: - if not app.cli.arguments.quiet and not app.cli.arguments.verbose: - if app.cli.arguments.preview: - print(line) - elif 'content_offset_seconds' in line_dictionary: - draw_progress(line_dictionary['content_offset_seconds'], video.metadata['length'], format_name) - - file.write('{}\n'.format(line)) - - # Print finished message - if not app.cli.arguments.quiet: - if app.cli.arguments.verbose: - print('Finished downloading {} to {}'.format(format_name, output)) - else: - print('[{}] {}'.format(format_name, output)) - return output diff --git a/app/formats/__init__.py b/app/formats/__init__.py deleted file mode 100644 index 131fd3b..0000000 --- a/app/formats/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .formats import use diff --git a/app/formats/custom.py b/app/formats/custom.py deleted file mode 100644 index 642b876..0000000 --- a/app/formats/custom.py +++ /dev/null @@ -1,19 +0,0 @@ -import app.twitch as twitch -import app.pipe as pipe -from typing import Generator, Tuple - - -def use(custom_format: dict, video: twitch.Video) -> Tuple[Generator[Tuple[str, dict], None, None], str]: - # Format comments - comments: Generator[Tuple[str, dict], None, None] = comment_generator(video.comments, custom_format['comments']) - - # Format output - output = pipe.output(video.metadata, custom_format['output']) - - return comments, output - - -def comment_generator(comments: Generator[dict, None, None], - comment_format: dict) -> Generator[Tuple[str, dict], None, None]: - for comment in comments: - yield pipe.comment(comment, comment_format), comment diff --git a/app/formats/formats.py b/app/formats/formats.py deleted file mode 100644 index 7ea93f8..0000000 --- a/app/formats/formats.py +++ /dev/null @@ -1,22 +0,0 @@ -from typing import Tuple, Generator, Union - -import app -import app.twitch as twitch -from app.formats import custom, srt, ssa, json as _json - - -def use(format_name: str, video: twitch.Video) -> Tuple[Generator[Union[Tuple[str, dict], dict], None, None], str]: - # Check if format name exists - if format_name not in app.config.settings['formats']: - print('Unknown format: {}'.format(format_name)) - exit() - - # Select format method - if format_name == 'json': - return _json.use(video) - if format_name == 'srt': - return srt.use(video) - if format_name == 'ssa': - return ssa.use(video) - else: - return custom.use(app.config.settings['formats'][format_name], video) diff --git a/app/formats/json.py b/app/formats/json.py deleted file mode 100644 index 5b210ea..0000000 --- a/app/formats/json.py +++ /dev/null @@ -1,32 +0,0 @@ -from typing import Tuple, Generator, List - -import app -import app.pipe as pipe -import app.twitch as twitch - - -def use(video: twitch.Video) -> Tuple[Generator[dict, None, None], str]: - # Send video through pipe to generate output - output: str = pipe.output(video.metadata, app.settings['formats']['json']['output']) - - json_object = dict() - json_object['video']: dict = video.metadata - json_object['comments']: List[dict] = [] - - # Download every comment and add to comments list - for comment in video.comments: - - # Draw progress - if not app.arguments.quiet and not app.arguments.verbose: - app.draw_progress(comment['content_offset_seconds'], video.metadata['length'], 'json') - - # Append to comments - json_object['comments'].append(comment) - - # Transform json object to a generator - return generator(json_object), output - - -# Simply yield the json object once -def generator(json_object: dict) -> Generator[dict, None, None]: - yield json_object diff --git a/app/formats/srt.py b/app/formats/srt.py deleted file mode 100644 index f0e632d..0000000 --- a/app/formats/srt.py +++ /dev/null @@ -1,34 +0,0 @@ -import datetime -from typing import Tuple, Generator - -import app -import app.pipe as pipe -import app.twitch as twitch -from app.utils import SafeDict - -irc_format: dict = app.settings['formats']['srt'] - - -def use(video: twitch.Video) -> Tuple[Generator[Tuple[str, dict], None, None], str]: - return subtitles(video.comments), pipe.output(video.metadata, irc_format['output']) - - -def subtitles(comments: Generator[dict, None, None]) -> Generator[Tuple[str, dict], None, None]: - for index, comment in enumerate(comments): - # Start and stop timestamps. Add a millisecond for timedelta to include millisecond digits. - start: datetime.timedelta = datetime.timedelta(seconds=comment['content_offset_seconds'], milliseconds=0.001) - stop: datetime.timedelta = start + datetime.timedelta(milliseconds=irc_format['duration']) - - # Format message - message = pipe.comment(comment, irc_format['comments']) - - # Subtitle variables - # Subtract the last three millisecond digits from timestamps, required by srt. - subtitle: dict = { - 'index': index + 1, - 'start': str(start).replace('.', ',')[:-3], - 'stop': str(stop).replace('.', ',')[:-3], - 'message': message - } - - yield '{index}\n{start} --> {stop}\n{message}\n'.format_map(SafeDict(subtitle)), comment diff --git a/app/formats/ssa.py b/app/formats/ssa.py deleted file mode 100644 index d0f3c5b..0000000 --- a/app/formats/ssa.py +++ /dev/null @@ -1,150 +0,0 @@ -import datetime -from itertools import chain -from typing import Tuple, Generator, List - -import app -import app.pipe as pipe -import app.twitch as twitch -from app.utils import SafeDict - -ssa_format: dict = app.settings['formats']['ssa'] - -SSA_OPEN: str = '[SSA_OPEN]' -SSA_CLOSE: str = '[SSA_CLOSE]' -SSA_SPECIAL: str = '♣' - - -def use(video: twitch.Video) -> Tuple[Generator[Tuple[str, dict], None, None], str]: - """ - Formatted according to https://www.matroska.org/technical/specs/subtitles/ssa.html - """ - output = pipe.output(video.metadata, ssa_format['output']) - - return generator(video), output - - -def generator(video: twitch.Video) -> Generator[Tuple[str, dict], None, None]: - for line in chain(prefix(video.metadata), dialogues(video.comments)): - yield line - - -def ssa_timestamp_formatter(time: datetime.timedelta) -> str: - """ - Convert timedelta to h:mm:ss.cc - https://www.matroska.org/technical/specs/subtitles/ssa.html - - :param time: Timedelta - :return: Formatted time string - """ - days, seconds = divmod(time.total_seconds(), 24 * 60 * 60) - hours, seconds = divmod(seconds, 60 * 60) - minutes, seconds = divmod(seconds, 60) - centiseconds = int((seconds - int(seconds)) * 100) - - # Floor seconds and merge days to hours - seconds = int(seconds) - hours += days * 24 - - return f'{int(hours):01d}:{int(minutes):02d}:{int(seconds):02d}.{centiseconds:02d}' - - -def dialogues(comments: Generator[dict, None, None]) -> Generator[Tuple[str, dict], None, None]: - for comment in comments: - start: datetime.timedelta = datetime.timedelta(seconds=comment['content_offset_seconds']) - end: datetime.timedelta = start + datetime.timedelta(milliseconds=ssa_format['duration']) - - # Avoid SSA variable conflicts with Python string formatting - # This is done by temporarily removing opening and closing curly brackets used by SSA. - # - # The main problem is detecting these curly brackets. We want to differentiate brackets that - # should be used by the Python string formatter, and those used by SSA. - # - # Opening curly brackets for SSA can easily be found by looking for "{\", however, - # closing curly brackets are used in the same way (just a "}") for both and requires a bit more effort. - # - # By incrementing a counter for opening brackets meant for Python formatting and decrementing for every - # closing bracket meant for Python formatting, we can define every closing bracket to belong to SSA whenever - # the counter is at zero. - - ssa_closing_brackets_indices: list = [] - open_bracket_counter: int = 0 - - # Loop through every character in formatting string - for index in range(len(ssa_format['comments']['format'])): - letter: str = ssa_format['comments']['format'][index] - - # Check if SSA bracket first, before altering the counter. - if letter is '}' and open_bracket_counter is 0: - ssa_closing_brackets_indices.append(index) - continue - - # Update counter - open_bracket_counter += { - '{': 1, # Bracket is opened - '\\': -1, # Bracket was meant for SSA, not for Python - '}': -1 # Closing bracket - }.get(letter, 0) - - # Multiple SSA commands within a curly brackets could make it negative - # Example: {\\c�&\\b1} will count 1, 0, -1, -2 - open_bracket_counter = max(0, open_bracket_counter) - - # Add a temporary special character for SSA closing curly brackets - for index in ssa_closing_brackets_indices: - ssa_format['comments']['format'] = ssa_format['comments']['format'][:index] + SSA_SPECIAL + \ - ssa_format['comments']['format'][index + 1:] - - ssa_format['comments']['format'] = ssa_format['comments']['format'].replace('{\\', SSA_OPEN).replace( - SSA_SPECIAL, SSA_CLOSE) - - # Format comment - comment_text = pipe.comment(comment, ssa_format['comments']) - - # Insert opening and closing curly brackets for SSA - comment_text = comment_text.replace(SSA_OPEN, '{\\').replace(SSA_CLOSE, '}') - - # Convert color code into SSA color code. - comment_text = comment_text.replace('\\c&#', '\\c&H').replace('\\c&H#', '\\c&H') - - dialogue: dict = { - 'start': ssa_timestamp_formatter(start), - 'end': ssa_timestamp_formatter(end), - 'comment': comment_text - } - dialogue.update(comment) - - yield ssa_format['events']['dialogue'].format_map(SafeDict(dialogue)), comment - - -def prefix(video_metadata: dict) -> Generator[Tuple[str, dict], None, None]: - lines: List[str] = list() - - # Script info - lines.append('[Script Info]') - lines.append('Title: {title}'.format_map(SafeDict(video_metadata))) - lines.append('ScriptType: v4.00') - lines.append('Collisions: Normal') - lines.append('PlayResX: {resolution[x]}'.format_map(SafeDict(ssa_format))) - lines.append('PlayResY: {resolution[y]}'.format_map(SafeDict(ssa_format))) - lines.append('PlayDepth: 0') - lines.append('Timer: 100,0000') - - # V4 Styles - lines.append('\n[V4 Styles]') - lines.append(ssa_format['styles']['format']) - lines.append(ssa_format['styles']['values']) - - # Fonts - lines.append('\n[Fonts]') - lines.append(ssa_format['fonts']) - - # Graphics - lines.append('\n[Graphics]') - lines.append(ssa_format['fonts']) - - # Events - lines.append('\n[Events]') - lines.append(ssa_format['events']['format']) - - for line in lines: - yield line, {} diff --git a/app/pipe/__init__.py b/app/pipe/__init__.py deleted file mode 100644 index 9f51ee9..0000000 --- a/app/pipe/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .pipe import comment, output diff --git a/app/pipe/filter.py b/app/pipe/filter.py deleted file mode 100644 index 35ad1f6..0000000 --- a/app/pipe/filter.py +++ /dev/null @@ -1,10 +0,0 @@ -import string - - -def output(video_metadata: dict, output_format: dict): - # Valid directory characters - valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits) - - # Strip illegal characters from title - if '{title}' in output_format['format']: - video_metadata['title'] = ''.join(c for c in video_metadata['title'] if c in valid_chars) diff --git a/app/pipe/mapper.py b/app/pipe/mapper.py deleted file mode 100644 index 0cf6e77..0000000 --- a/app/pipe/mapper.py +++ /dev/null @@ -1,116 +0,0 @@ -import hashlib -from typing import List - -import app -from app.pipe import timestamp - - -def use(dictionary: dict, format_dictionary: dict): - """ - Map new values onto dictionary - :param dictionary: input - :param format_dictionary: input format dictionary - :return nothing - """ - - # Timestamps - if 'timestamp' in format_dictionary and '{timestamp' in format_dictionary['format']: - - dictionary['timestamp'] = {} - - # Absolute timestamp - if 'absolute' in format_dictionary['timestamp'] and '{timestamp[absolute]}' in format_dictionary['format']: - - # Millisecond precision - remove $f (milliseconds) from time format - if 'millisecond_precision' in format_dictionary: - format_dictionary['timestamp']['absolute'] = str(format_dictionary['timestamp']['absolute']).replace( - '%f', '_MILLISECONDS_') - - # Format timestamp - dictionary['timestamp']['absolute'] = timestamp.use(format_dictionary['timestamp']['absolute'], - dictionary['created_at'], - app.arguments.timezone) - - # Millisecond precision - add milliseconds to timestamp - if 'millisecond_precision' in format_dictionary: - milliseconds: str = timestamp.use('%f', dictionary['created_at'], app.arguments.timezone) - milliseconds = milliseconds[:format_dictionary['millisecond_precision']] - dictionary['timestamp']['absolute'] = str(dictionary['timestamp']['absolute']).replace('_MILLISECONDS_', - milliseconds) - - # Relative timestamp - if '{timestamp[relative]}' in format_dictionary['format']: - # Todo: 'relative' in format_dictionary['timestamp'] when relative formatting is implemented. - dictionary['timestamp']['relative'] = timestamp.relative(float(dictionary['content_offset_seconds'])) - - # User colors - if 'message' in dictionary: - - # Set color - if 'user_color' not in dictionary['message']: - if 'default_user_color' in format_dictionary and format_dictionary['default_user_color'] not in ['random', - 'hash']: - dictionary['message']['user_color'] = format_dictionary['default_user_color'] - else: - # Assign color based on commenter's ID - sha256 = hashlib.sha256() - sha256.update(str.encode(dictionary['commenter']['_id'])) - - # Truncate hash and mod it by 0xffffff-1 for color hex. - color: str = hex(int(sha256.hexdigest()[:32], 16) % int(hex(0xffffff), 16)).lstrip('0x') - - # Add any missing digits - while len(color) < 6: - color = color + '0' - - dictionary['message']['user_color'] = '#{color}'.format(color=color[:6]) - - # SSA Color - if 'message[ssa_user_color]' in format_dictionary['format']: - dictionary['message']['ssa_user_color'] = '#{b}{g}{r}'.format( - b=dictionary['message']['user_color'][5:7], - g=dictionary['message']['user_color'][3:5], - r=dictionary['message']['user_color'][1:3]) - - # User badges - # The Twitch API returns an array of badges, ordered by their importance (descending). - if '{commenter[badge]}' in format_dictionary['format'] and 'message' in dictionary: - - # Add empty badge if no badge - if 'user_badges' not in dictionary['message']: - dictionary['message']['user_badges'] = [{'_id': '', 'version': 1}] - - # Default badges - if 'badges' not in format_dictionary: - format_dictionary['badges'] = { - 'turbo': '[turbo]', - 'premium': '[prime]', - 'bits': '[bits]', - 'subscriber': '[subscriber]', - 'moderator': '[moderator]', - 'global_mod': '[global mod]', - 'admin': '[admin]', - 'staff': '[staff]', - 'broadcaster': '[streamer]', - } - - # Default badges setting - if 'multiple_badges' not in format_dictionary: - format_dictionary['multiple_badges'] = False - - # Get badge display text - badges: List[str] = [] - for badge in dictionary['message']['user_badges']: - badges.append(format_dictionary['badges'].get(badge['_id'], '')) - - # Display multiple badges or not - if format_dictionary['multiple_badges']: - dictionary['commenter']['badge'] = ''.join(badges) - else: - dictionary['commenter']['badge'] = '' - - # Find first defined user badge - for badge in badges: - if badge != '': - dictionary['commenter']['badge'] = badge - break diff --git a/app/pipe/pipe.py b/app/pipe/pipe.py deleted file mode 100644 index 3512226..0000000 --- a/app/pipe/pipe.py +++ /dev/null @@ -1,20 +0,0 @@ -import app -from app.pipe import mapper, reducer, filter - - -# Formatting pipes - -def comment(comment_input: dict, comment_format: dict) -> str: - mapper.use(comment_input, comment_format) - - return reducer.use(comment_input, comment_format) - - -def output(video_metadata: dict, output_format: dict) -> str: - filter.output(video_metadata, output_format) - mapper.use(video_metadata, output_format) - - # Ignore video metadata from reducer output - output_string = reducer.use(video_metadata, output_format) - - return '{}/{}'.format(app.arguments.output.rstrip('/').rstrip('\\'), output_string) diff --git a/app/pipe/reducer.py b/app/pipe/reducer.py deleted file mode 100644 index a6a4313..0000000 --- a/app/pipe/reducer.py +++ /dev/null @@ -1,15 +0,0 @@ -from app.utils import SafeDict - - -def use(dictionary: dict, format_dictionary: dict) -> str: - """ - The reducer's job is to format an input to an output based on a format dictionary. - :param dictionary: - :param format_dictionary: - :return: formatted string - """ - # Action format - if 'action_format' in format_dictionary and 'is_action' in dictionary and bool(dictionary['is_action']): - return format_dictionary['action_format'].format_map(SafeDict(dictionary)) - - return format_dictionary['format'].format_map(SafeDict(dictionary)) diff --git a/app/pipe/timestamp.py b/app/pipe/timestamp.py deleted file mode 100644 index d255814..0000000 --- a/app/pipe/timestamp.py +++ /dev/null @@ -1,25 +0,0 @@ -from datetime import datetime, timedelta - -import dateutil.parser -from pytz import timezone - - -def parse_timestamp(value: str) -> datetime: - return dateutil.parser.parse(value) - - -def use(date_format: str, date_value: str, timezone_name: str = None) -> str: - date: datetime = parse_timestamp(date_value) - - # Convert to another timezone - if timezone_name is not None: - date = date.astimezone(timezone(timezone_name)) - - return date.strftime(date_format) - - -def relative(seconds: float) -> str: - # Todo: support formatting - delta = timedelta(seconds=seconds) - delta = delta - timedelta(microseconds=delta.microseconds) - return str(delta) diff --git a/app/twitch/__init__.py b/app/twitch/__init__.py deleted file mode 100644 index f78d3d2..0000000 --- a/app/twitch/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .video import Video diff --git a/app/twitch/api.py b/app/twitch/api.py deleted file mode 100644 index 7b15d33..0000000 --- a/app/twitch/api.py +++ /dev/null @@ -1,46 +0,0 @@ -from typing import Generator - -import requests - -import app.cli -import app.config - - -def get(path: str, params: dict = None, headers: dict = None) -> requests.Response: - params = {} if params is None else params - headers = {} if headers is None else headers - params['client_id'] = app.config.settings['client_id'] - - response: requests.Response = requests.get(url=str(app.config.settings['twitch_api']).format(path=path), - params=params, - headers=headers) - if response.status_code != requests.codes.ok: - print('\n[{}]'.format(response.json()['error'])) - print(response.json()['message']) - print('\nURL\t', response.url) - print('Params\t', params) - print('Headers\t', headers) - exit(1) - return response - - -def video(video_id: str) -> dict: - if app.cli.arguments.verbose: - print('Downloading video metadata from Twitch API') - return get('videos/{}'.format(video_id)).json() - - -def comment_fragment(video_id: str, cursor: str = '') -> dict: - return get('videos/{}/comments'.format(video_id), {'cursor': cursor}).json() - - -def comments(video_id: str) -> Generator[dict, None, None]: - if app.cli.arguments.verbose: - print('Downloading comments from Twitch API') - - fragment: dict = {'_next': ''} - - while '_next' in fragment: - fragment = comment_fragment(video_id, fragment['_next']) - for comment in fragment['comments']: - yield comment diff --git a/app/twitch/video.py b/app/twitch/video.py deleted file mode 100644 index 6327661..0000000 --- a/app/twitch/video.py +++ /dev/null @@ -1,69 +0,0 @@ -import json -from pathlib import Path -from typing import List, Generator - -import twitch -import twitch.helix as helix - -import app.cli -import app.config -import app.twitch.api as api - - -class Video: - - def __init__(self, video_id: str = None): - - # Check if data should be loaded from an input file or form the Twitch API - if app.cli.arguments.input: - if Path(app.cli.arguments.input).is_file(): - with open(app.cli.arguments.input, 'r', encoding='utf-8') as file: - json_data = json.load(file) - - # Check if JSON format is valid - if 'video' not in json_data or 'comments' not in json_data: - print('Error: Invalid JSON file.') - exit(1) - - # Set metadata and comments - self.metadata = json_data['video'] - self.comments = Video.comment_generator(json_data['comments']) - - if app.cli.arguments.verbose: - print('Loaded json data form input file') - else: - print('Error: Unable to find {}'.format(app.cli.arguments.input)) - exit(1) - - else: - # Download from Twitch API - helix = twitch.Helix(client_id=app.config.settings['client_id'], use_cache=True) - - try: - video: helix.Video = helix.video(video_id) - except KeyError: - print('Error: Invalid video or client id.') - - self.metadata: dict = api.video(video_id) - self.comments = self.comment_generator_from_api(video) - - - - def __str__(self): - return self.metadata['title'] - - def __eq__(self, other): - return self.id() == other.id() - - def id(self) -> str: - return self.metadata['_id'].strip('v') - - @staticmethod - def comment_generator_from_api(video: helix.Video) -> Generator[dict, None, None]: - for comment in video.comments(): - yield comment.data - - @staticmethod - def comment_generator(comments: List[dict]) -> Generator[dict, None, None]: - for comment in comments: - yield comment diff --git a/app/utils.py b/app/utils.py deleted file mode 100644 index d059582..0000000 --- a/app/utils.py +++ /dev/null @@ -1,4 +0,0 @@ -class SafeDict(dict): - # Return missing keys as string - def __missing__(self, key) -> str: - return '{' + key + '}' diff --git a/license b/license index d02cdef..04bb4a8 100644 --- a/license +++ b/license @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2018 Petter Kraabøl +Copyright (c) 2019 Petter Kraabøl Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/publish.sh b/publish.sh new file mode 100644 index 0000000..d94e16f --- /dev/null +++ b/publish.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +python setup.py sdist bdist_wheel +twine upload dist/* diff --git a/readme.md b/readme.md index 88b775f..c4c46d5 100644 --- a/readme.md +++ b/readme.md @@ -2,6 +2,8 @@ [![Discord](https://user-images.githubusercontent.com/7288322/34471967-1df7808a-efbb-11e7-9088-ed0b04151291.png)](https://discord.gg/wZJFeXC) +`pip install tcd` + A neat Python script to download chat messages from past broadcasts. ### Requirements @@ -9,26 +11,20 @@ A neat Python script to download chat messages from past broadcasts. * [Python 3.7 or newer](https://www.python.org/downloads/) * [A Twitch client ID](https://glass.twitch.tv/console/apps) -### Installation - -```bash -git clone https://github.com/PetterKraabol/Twitch-Chat-Downloader.git -cd Twitch-Chat-Downloader -pip install -r requirements.txt -``` - ### Usage ```bash -python app.py +tcd ``` ```bash -python app.py --help +# Download chat from VODs by video id +tcd --video 789654123,987456321 --format irc --output ~/Downloads ``` ```bash -python app.py -v 125936523 --format irc --output ~/Downloads +# Download chat from the first 10 VODs from multiple streamers +tcd --channel sodapoppin,nymn,lirik --first=10 ``` ### Features diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..368303f --- /dev/null +++ b/setup.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +import os + +from pipenv.project import Project +from pipenv.utils import convert_deps_to_pip +from setuptools import setup, find_packages + +this_directory = os.path.abspath(os.path.dirname(__file__)) +with open(os.path.join(this_directory, 'readme.md'), encoding='utf-8') as f: + readme = f.read() + +pipfile = Project(chdir=False).parsed_pipfile +requirements = convert_deps_to_pip(pipfile['packages'], r=False) +test_requirements = convert_deps_to_pip(pipfile['dev-packages'], r=False) +setup_requirements = ['pipenv', 'setuptools'] + +setup( + author='Petter Kraabøl', + author_email='petter.zarlach@gmail.com', + classifiers=[ + 'Development Status :: 4 - Beta', + 'Intended Audience :: End Users/Desktop', + 'License :: OSI Approved :: MIT License', + 'Natural Language :: English', + 'Programming Language :: Python :: 3.7', + ], + entry_points= + ''' + [console_scripts] + tcd=tcd:main + ''', + description='Twitch Chat Downloader', + install_requires=requirements, + license='MIT', + long_description=readme, + long_description_content_type='text/markdown', + include_package_data=True, + keywords='Twitch', + name='tcd', + packages=find_packages(), + python_requires=">=3.7", + setup_requires=setup_requirements, + test_suite='tests', + tests_require=test_requirements, + url='https://github.com/PetterKraabol/Twitch-Chat-Downloader', + package_data={'tcd': ['settings.reference.json']}, + version='3.0.1', + zip_safe=True, +) diff --git a/tcd/__init__.py b/tcd/__init__.py new file mode 100644 index 0000000..d0e30bf --- /dev/null +++ b/tcd/__init__.py @@ -0,0 +1,76 @@ +import argparse +import os +from pathlib import Path +from typing import List, Any + +from .arguments import Arguments +from .downloader import Downloader +from .logger import Logger, Log +from .settings import Settings + +__name__: str = 'tcd' +__all__: List[Any] = [Arguments, Settings, Downloader, Logger, Log] + + +def main(): + # Arguments + parser = argparse.ArgumentParser(description='Twitch Chat Downloader') + parser.add_argument('-v', f'--{Arguments.Name.VIDEO}', type=str, help='Video IDs separated by commas') + parser.add_argument('-c', f'--{Arguments.Name.CHANNEL}', type=str, help='Channel names separated by commas') + parser.add_argument(f'--{Arguments.Name.FIRST}', type=int, default=5, help='Download chat from the last n VODs') + parser.add_argument(f'--{Arguments.Name.CLIENT_ID}', type=str, help='Twitch client ID') + parser.add_argument(f'--{Arguments.Name.VERBOSE}', action='store_true', help='Verbose output') + parser.add_argument('-q', f'--{Arguments.Name.QUIET}', action='store_true') + parser.add_argument('-o', f'--{Arguments.Name.OUTPUT}', type=str, help='Output directory', default='./') + parser.add_argument('-f', f'--{Arguments.Name.FORMAT}', type=str, help='Message format', default='default') + parser.add_argument(f'--{Arguments.Name.TIMEZONE}', type=str, help='Timezone name') + parser.add_argument(f'--{Arguments.Name.INIT}', action='store_true', help='Script setup') + parser.add_argument(f'--{Arguments.Name.VERSION}', action='store_true', help='Settings version') + parser.add_argument(f'--{Arguments.Name.FORMATS}', action='store_true', help='List available formats') + parser.add_argument(f'--{Arguments.Name.PREVIEW}', action='store_true', help='Preview output') + parser.add_argument(f'--{Arguments.Name.SETTINGS_FILE}', type=str, + default=str(Path.home()) + '/.config/tcd/settings.json', + help='Settings file location') + parser.add_argument(f'--{Arguments.Name.DEBUG}', action='store_true', help='Print debug messages') + + Arguments(parser.parse_args().__dict__) + Settings(Arguments().settings_file, + reference_filepath=f'{os.path.dirname(os.path.abspath(__file__))}/settings.reference.json') + + # Print version number + if Arguments().print_version: + Logger().log('Twitch Chat Downloader {}'.format(Settings().config.get('version', '')), retain=False) + return + + # Client ID + Settings().config['client_id'] = Arguments().client_id or Settings().config.get('client_id', None) or input( + 'Twitch client ID: ').strip() + Settings().save() + + # List formats + if Arguments().print_formats: + for format_name in [f for f in Settings().config['formats'] if f not in ['all']]: + format_dictionary = Settings().config['formats'][format_name] + Logger().log(f'[{format_name}]', retain=False) + + if 'comments' in format_dictionary: + print('comment: {}'.format(Settings().config['formats'][format_name]['comments']['format'])) + + if 'output' in format_dictionary: + print('output: {}'.format(Settings().config['formats'][format_name]['output']['format'])) + + Logger().log('\n', retain=False) + return + + # Downloader + if Arguments().video_ids or Arguments().channels: + + if Arguments().video_ids: + Downloader().videos(Arguments().video_ids) + + if Arguments().channels: + Downloader().channels(Arguments().channels) + + return + + parser.print_help() diff --git a/tcd/__main__.py b/tcd/__main__.py new file mode 100644 index 0000000..abaa7bc --- /dev/null +++ b/tcd/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from . import main + +if __name__ == "__main__": + main() diff --git a/tcd/arguments.py b/tcd/arguments.py new file mode 100644 index 0000000..1afff09 --- /dev/null +++ b/tcd/arguments.py @@ -0,0 +1,66 @@ +from typing import Optional, Dict, Union, List + +from .singleton import Singleton + + +class Arguments(metaclass=Singleton): + """ + Arguments singleton + """ + + class Name: + SETTINGS_FILE: str = 'settings' + INIT: str = 'init' + VERBOSE: str = 'verbose' + QUIET: str = 'quiet' + PREVIEW: str = 'preview' + FORMATS: str = 'formats' + VERSION: str = 'version' + OUTPUT: str = 'output' + CLIENT_ID: str = 'client_id' + CHANNEL: str = 'channel' + FIRST: str = 'first' + VIDEO: str = 'video' + FORMAT: str = 'format' + TIMEZONE: str = 'timezone' + DEBUG: str = 'debug' + + def __init__(self, arguments: Optional[Dict[str, Union[str, bool, int]]] = None): + """ + Initialize arguments + :param arguments: Arguments from cli (Optional to call singleton instance without parameters) + """ + + if arguments is None: + print('Error: arguments were not provided') + exit() + + # Required arguments and booleans + self.settings_file: str = arguments[Arguments.Name.SETTINGS_FILE] + self.init: bool = arguments[Arguments.Name.INIT] + self.verbose: bool = arguments[Arguments.Name.VERBOSE] + self.debug: bool = arguments[Arguments.Name.DEBUG] + self.quiet: bool = arguments[Arguments.Name.QUIET] + self.preview: bool = arguments[Arguments.Name.PREVIEW] + self.print_formats: bool = arguments[Arguments.Name.FORMATS] + self.print_version: bool = arguments[Arguments.Name.VERSION] + self.output: str = arguments[Arguments.Name.OUTPUT] + + # Optional or prompted arguments + self.client_id: Optional[str] = arguments[Arguments.Name.CLIENT_ID] + self.first: Optional[int] = arguments[Arguments.Name.FIRST] + self.timezone: Optional[str] = arguments[Arguments.Name.TIMEZONE] + + # Arguments that require some formatting + self.video_ids: List[int] = [] + self.formats: List[str] = [] + self.channels: List[str] = [] + + if arguments[Arguments.Name.VIDEO]: + self.video_ids = [int(video_id) for video_id in arguments[Arguments.Name.VIDEO].lower().split(',')] + + if arguments[Arguments.Name.FORMAT]: + self.formats: Optional[List[str]] = arguments[Arguments.Name.FORMAT].lower().split(',') + + if arguments[Arguments.Name.CHANNEL]: + self.channels = arguments[Arguments.Name.CHANNEL].lower().split(',') diff --git a/tcd/downloader.py b/tcd/downloader.py new file mode 100644 index 0000000..3cbfa3a --- /dev/null +++ b/tcd/downloader.py @@ -0,0 +1,186 @@ +import datetime +import json +import os +import re +import sys +from typing import List + +import dateutil +import twitch + +from .arguments import Arguments +from .formatter import Formatter +from .logger import Logger, Log +from .pipe import Pipe +from .settings import Settings + + +class Downloader: + + def __init__(self): + self.helix_api = twitch.Helix(client_id=Settings().config['client_id'], use_cache=True) + + self.formats: List[str] = [] + self.whitelist: List[str] = [] + self.blacklist: List[str] = [] + + # Populate format list according to whitelist and blacklist + if 'all' in Arguments().formats and 'all' in Settings().config['formats']: + self.blacklist = Settings().config['formats']['all']['whitelist'] or [] + self.whitelist = Settings().config['formats']['all']['blacklist'] or [] + + # Append formats to list if they can be used + self.formats = [format_name for format_name in Settings().config['formats'].keys() if + self._can_use_format(format_name)] + + else: + self.formats = [format_name for format_name in Arguments().formats if self._can_use_format(format_name)] + + def _can_use_format(self, format_name: str) -> bool: + """ + Check if format name should be used based on whitelist and blacklist + :param format_name: Name of format + :return: If format should be used + """ + + # Lowercase format name + format_name = format_name.lower() + + # Reserved format names + if format_name in ['all']: + return False + + # Format does not exist + if format_name not in Settings().config['formats'].keys(): + return False + + # Whitelisted formats + if self.whitelist and format_name not in self.whitelist: + return False + + # Blacklisted formats + if self.blacklist and format_name in self.blacklist: + return False + + return True + + def video(self, video: twitch.helix.Video) -> None: + """ + Download chat from video + :param video: Video object + :return: None + """ + + # Parse video duration + regex = re.compile(r'((?P\d+?)h)?((?P\d+?)m)?((?P\d+?)s)?') + parts = regex.match(video.duration).groupdict() + + time_params = {} + for name, param in parts.items(): + if param: + time_params[name] = int(param) + + video_duration = datetime.timedelta(**time_params) + + formatter = Formatter(video) + + # Special case for JSON + # Build JSON object before writing it + if 'json' in self.formats: + output: str = Pipe(Settings().config['formats']['json']['output']).output(video.data) + os.makedirs(os.path.dirname(output), exist_ok=True) + + data: dict = { + 'video': video.data, + 'comments': [] + } + + for comment in video.comments(): + data['comments'].append(comment.data) + + # Ignore comments that were posted after the VOD finished + if Settings().config['formats']['json'].get('comments', {}).get('ignore_new_comments', False): + comment_date = dateutil.parser.parse(comment.created_at) + vod_finish_date = dateutil.parser.parse(video.created_at) + video_duration + + if comment_date > vod_finish_date: + continue + + if Logger().should_print(Log.PROGRESS): + self.draw_progress(current=comment.content_offset_seconds, + end=video_duration.seconds, + description='json') + + with open(output, 'w') as file: + json.dump(data, file, indent=4, sort_keys=True) + + Logger().log(f'[json] {output}') + + # For each format (ignore json this time) + for format_name in [x for x in self.formats if x not in ['json']]: + + # Get (formatted_comment, comment), output + comment_tuple, output = formatter.use(format_name) + + # Create output directory and write to file + os.makedirs(os.path.dirname(output), exist_ok=True) + with open(output, '+w', encoding='utf-8') as file: + + # For every comment in video + for formatted_comment, comment in comment_tuple: + + # Ignore comments that were posted after the VOD finished + if Settings().config['formats'][format_name].get('comments', {}).get('ignore_new_comments', False): + comment_date = dateutil.parser.parse(comment.created_at) + vod_finish_date = dateutil.parser.parse(video.created_at) + video_duration + + if comment_date > vod_finish_date: + continue + + # Draw progress + if comment and Logger().should_print(Log.PROGRESS): + self.draw_progress(current=comment.content_offset_seconds, + end=video_duration.seconds, + description=format_name) + + # Display preview + Logger().log(formatted_comment, Log.PREVIEW) + + # Write comment to file + file.write('{}\n'.format(formatted_comment)) + + Logger().log('[{}] {}'.format(format_name, output)) + + def videos(self, video_ids: List[int]) -> None: + """ + Download multiple video ids + :param video_ids: List of video ids + :return: None + """ + for video in self.helix_api.videos(video_ids): + Logger().log(format('\n{}'.format(video.title)), Log.REGULAR) + self.video(video) + + def channels(self, channels: List[str]) -> None: + """ + Download videos from multiple channels + :param channels: List of channel names + :return: None + """ + for channel, videos in self.helix_api.users(channels).videos(first=Arguments().first): + Logger().log(format('\n{}'.format(channel.display_name)), Log.REGULAR) + for video in videos: + Logger().log(format('\n{}'.format(video.title)), Log.REGULAR) + self.video(video) + + @staticmethod + def draw_progress(current: float, end: float, description: str = 'Downloading') -> None: + """ + Draw download progress + :param current: Current chat position (seconds) + :param end: End position (seconds) + :param description: Progress description + :return: + """ + sys.stdout.write('[{}] {}%\r'.format(description, '%.2f' % min(current * 10 / end * 10, 100.00))) + sys.stdout.flush() diff --git a/tcd/formats/__init__.py b/tcd/formats/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tcd/formats/custom.py b/tcd/formats/custom.py new file mode 100644 index 0000000..0fc850d --- /dev/null +++ b/tcd/formats/custom.py @@ -0,0 +1,29 @@ +from typing import Generator, Tuple + +import twitch + +from tcd.formats.format import Format +from tcd.pipe import Pipe + + +class Custom(Format): + + def __init__(self, video: twitch.helix.Video, format_name: str): + super().__init__(video, format_name) + + def use(self) -> Tuple[Generator[Tuple[str, twitch.v5.Comment], None, None], str]: + """ + Use this format + :return: tuple(formatted comment, comment), output format + """ + # Format comments + comments = self.comment_generator(self.video.comments()) + + # Format output + output: str = Pipe(self.format_dictionary['output']).output(self.video.data) + + return comments, output + + def comment_generator(self, comments: twitch.v5.Comments) -> Generator[Tuple[str, twitch.v5.Comment], None, None]: + for comment in comments: + yield Pipe(self.format_dictionary['comments']).comment(comment.data), comment diff --git a/tcd/formats/format.py b/tcd/formats/format.py new file mode 100644 index 0000000..80a7a38 --- /dev/null +++ b/tcd/formats/format.py @@ -0,0 +1,11 @@ +import twitch + +from tcd.settings import Settings + + +class Format: + + def __init__(self, video: twitch.helix.Video, format_name: str): + self.video: twitch.helix.Video = video + self.format_name: str = format_name + self.format_dictionary: dict = Settings().config['formats'][format_name] diff --git a/tcd/formats/srt.py b/tcd/formats/srt.py new file mode 100644 index 0000000..95fece3 --- /dev/null +++ b/tcd/formats/srt.py @@ -0,0 +1,70 @@ +import datetime +from typing import Tuple, Generator + +import twitch + +from tcd.formats.format import Format +from tcd.pipe import Pipe +from tcd.safedict import SafeDict + + +class SRT(Format): + + def __init__(self, video: twitch.helix.Video): + """ + Initialize SRT format + :param video: Video object + """ + super().__init__(video, format_name='srt') + + def use(self) -> Tuple[Generator[Tuple[str, twitch.v5.Comment], None, None], str]: + """ + Use SRT format + :return: Comment generator and output string + """ + return self.subtitles(self.video.comments()), Pipe(self.format_dictionary['output']).output(self.video.data) + + @staticmethod + def format_timestamp(time: datetime.timedelta) -> str: + """ + Convert timedelta to h:mm:ss.cc + https://www.matroska.org/technical/specs/subtitles/ssa.html + + :param time: Timedelta + :return: Formatted time string + """ + days, seconds = divmod(time.total_seconds(), 24 * 60 * 60) + hours, seconds = divmod(seconds, 60 * 60) + minutes, seconds = divmod(seconds, 60) + milliseconds = int((seconds - int(seconds)) * 1000) + + # Floor seconds and merge days to hours + seconds = int(seconds) + hours += days * 24 + + return f'{int(hours):01d}:{int(minutes):02d}:{int(seconds):02d},{milliseconds:03d}' + + def subtitles(self, comments: twitch.v5.Comments) -> Generator[Tuple[str, twitch.v5.Comment], None, None]: + """ + Subtitle generator + :param comments: Comments to turn into subtitles + :return: Generator with subtitles and subtitle data + """ + for index, comment in enumerate(comments): + # Stat and stop timestamps. Add a millisecond for timedelta to include millisecond digits + start = datetime.timedelta(seconds=comment.content_offset_seconds) + stop: datetime.timedelta = start + datetime.timedelta(milliseconds=self.format_dictionary['duration']) + + # Format message + message: str = Pipe(self.format_dictionary['comments']).comment(comment.data) + + # Subtitle variables + # Subtract the last three milliseconds form timestamp (required by SRT) + subtitle: dict = { + 'index': index + 1, + 'start': SRT.format_timestamp(start), + 'stop': SRT.format_timestamp(stop), + 'message': message + } + + yield '{index}\n{start} --> {stop}\n{message}\n'.format_map(SafeDict(subtitle)), comment diff --git a/tcd/formats/ssa.py b/tcd/formats/ssa.py new file mode 100644 index 0000000..7c79c95 --- /dev/null +++ b/tcd/formats/ssa.py @@ -0,0 +1,166 @@ +import datetime +from itertools import chain +from typing import Tuple, Generator, List, Optional + +import twitch + +from tcd.formats.format import Format +from tcd.pipe import Pipe +from tcd.safedict import SafeDict + + +class SSA(Format): + OPEN: str = '[SSA_OPEN]' + CLOSE: str = '[SSA_CLOSE]' + SPECIAL: str = '♣' + + def __init__(self, video: twitch.helix.Video): + super().__init__(video, format_name='ssa') + + def use(self) -> Tuple[Generator[Tuple[str, twitch.v5.Comment], None, None], str]: + """ + Use SSA format + :return: + """ + output: str = Pipe(self.format_dictionary['output']).output(self.video.data) + + return self.generator(), output + + def generator(self) -> Generator[Tuple[str, Optional[twitch.v5.Comment]], None, None]: + """ + Line generator + :return: + """ + for line in chain(self.prefix(), self.dialogues(self.video.comments())): + yield line + + @staticmethod + def format_timestamp(time: datetime.timedelta) -> str: + """ + Convert timedelta to h:mm:ss.cc + https://www.matroska.org/technical/specs/subtitles/ssa.html + + :param time: Timedelta + :return: Formatted time string + """ + days, seconds = divmod(time.total_seconds(), 24 * 60 * 60) + hours, seconds = divmod(seconds, 60 * 60) + minutes, seconds = divmod(seconds, 60) + centiseconds = int((seconds - int(seconds)) * 100) + + # Floor seconds and merge days to hours + seconds = int(seconds) + hours += days * 24 + + return f'{int(hours):01d}:{int(minutes):02d}:{int(seconds):02d}.{centiseconds:02d}' + + def dialogues(self, comments: twitch.v5.Comments) -> Generator[Tuple[str, twitch.v5.Comments], None, None]: + """ + Format comments as SSA dialogues + :param comments: Comment to format + :return: tuple(formatted comment, comment) + """ + for comment in comments: + start: datetime.timedelta = datetime.timedelta(seconds=comment.content_offset_seconds) + end: datetime.timedelta = start + datetime.timedelta(milliseconds=self.format_dictionary['duration']) + + # Avoid SSA variable conflicts with Python string formatting + # This is done by temporarily removing opening and closing curly brackets used by SSA. + # + # The main problem is detecting these curly brackets. We want to differentiate brackets that + # should be used by the Python string formatter, and those used by SSA. + # + # Opening curly brackets for SSA can easily be found by looking for "{\", however, + # closing curly brackets are used in the same way (just a "}") for both and requires a bit more effort. + # + # By incrementing a counter for opening brackets meant for Python formatting and decrementing for every + # closing bracket meant for Python formatting, we can define every closing bracket to belong to SSA whenever + # the counter is at zero. + + ssa_closing_brackets_indices: list = [] + open_bracket_counter: int = 0 + + # Loop through every character in formatting string + for index in range(len(self.format_dictionary['comments']['format'])): + letter: str = self.format_dictionary['comments']['format'][index] + + # Check if SSA bracket first, before altering the counter. + if letter is '}' and open_bracket_counter is 0: + ssa_closing_brackets_indices.append(index) + continue + + # Update counter + open_bracket_counter += { + '{': 1, # Bracket is opened + '\\': -1, # Bracket was meant for SSA, not for Python + '}': -1 # Closing bracket + }.get(letter, 0) + + # Multiple SSA commands within a curly brackets could make it negative + # Example: {\\c�&\\b1} will count 1, 0, -1, -2 + open_bracket_counter = max(0, open_bracket_counter) + + # Add a temporary special character for SSA closing curly brackets + for index in ssa_closing_brackets_indices: + self.format_dictionary['comments']['format'] = self.format_dictionary['comments']['format'][ + :index] + SSA.SPECIAL + \ + self.format_dictionary['comments']['format'][index + 1:] + + self.format_dictionary['comments']['format'] = self.format_dictionary['comments']['format'].replace('{\\', + SSA.OPEN).replace( + SSA.SPECIAL, SSA.CLOSE) + + # Format comment + comment_text = Pipe(self.format_dictionary['comments']).comment(comment.data) + + # Insert opening and closing curly brackets for SSA + comment_text = comment_text.replace(SSA.OPEN, '{\\').replace(SSA.CLOSE, '}') + + # Convert color code into SSA color code. + comment_text = comment_text.replace('\\c&#', '\\c&H').replace('\\c&H#', '\\c&H') + + dialogue: dict = { + 'start': SSA.format_timestamp(start), + 'end': SSA.format_timestamp(end), + 'comment': comment_text + } + dialogue.update(comment.data) + + yield self.format_dictionary['events']['dialogue'].format_map(SafeDict(dialogue)), comment + + def prefix(self) -> Generator[Tuple[str, None], None, None]: + """ + SSA file header + :return: Generator for header lines + """ + lines: List[str] = list() + + # Script info + lines.append('[Script Info]') + lines.append('Title: {title}'.format_map(SafeDict(self.video.data))) + lines.append('ScriptType: v4.00') + lines.append('Collisions: Normal') + lines.append('PlayResX: {resolution[x]}'.format_map(SafeDict(self.format_dictionary))) + lines.append('PlayResY: {resolution[y]}'.format_map(SafeDict(self.format_dictionary))) + lines.append('PlayDepth: 0') + lines.append('Timer: 100,0000') + + # V4 Styles + lines.append('\n[V4 Styles]') + lines.append(self.format_dictionary['styles']['format']) + lines.append(self.format_dictionary['styles']['values']) + + # Fonts + lines.append('\n[Fonts]') + lines.append(self.format_dictionary['fonts']) + + # Graphics + lines.append('\n[Graphics]') + lines.append(self.format_dictionary['graphics']) + + # Events + lines.append('\n[Events]') + lines.append(self.format_dictionary['events']['format']) + + for line in lines: + yield line, None diff --git a/tcd/formatter.py b/tcd/formatter.py new file mode 100644 index 0000000..2534257 --- /dev/null +++ b/tcd/formatter.py @@ -0,0 +1,31 @@ +from typing import Generator, Tuple + +import twitch + +from .formats.custom import Custom +from .formats.srt import SRT +from .formats.ssa import SSA +from .settings import Settings + + +class Formatter: + + def __init__(self, video: twitch.helix.Video): + self.video: twitch.helix.Video = video + + def use(self, format_name: str) -> Tuple[Generator[Tuple[str, twitch.v5.Comment], None, None], str]: + """ + Use format + :param format_name: + :return: tuple(Line, comment), output + """ + if format_name not in Settings().config['formats']: + print('Invalid format name') + exit(1) + + if format_name == 'srt': + return SRT(self.video).use() + elif format_name == 'ssa': + return SSA(self.video).use() + else: + return Custom(self.video, format_name).use() diff --git a/tcd/logger.py b/tcd/logger.py new file mode 100644 index 0000000..05ec6d5 --- /dev/null +++ b/tcd/logger.py @@ -0,0 +1,102 @@ +import time +from typing import List + +from .arguments import Arguments +from .singleton import Singleton + + +class Log: + DEBUG: str = 'debug' + ERROR: str = 'error' + REGULAR: str = 'regular' + CRITICAL: str = 'critical' + VERBOSE: str = 'verbose' + PREVIEW: str = 'preview' + PROGRESS: str = 'progress' + + def __init__(self, message: str = '', log_type: str = REGULAR): + self.message: str = message + self.type: str = log_type + self.timestamp: float = time.time() + + def __str__(self) -> str: + if self.type == Log.CRITICAL: + return f'[Critical]: {self.message}' + + if self.type == Log.DEBUG: + return f'[Debug]: {self.message}' + + return self.message + + def full(self) -> str: + """ + Return full log message with timestamp, type and message + :return: + """ + return '{} [{}]: {}'.format(self.timestamp, self.type, self.message) + + +class Logger(metaclass=Singleton): + + def __init__(self): + self.logs: List[Log] = [] + + def log(self, message: str = '', log_type: str = Log.REGULAR, retain: bool = True) -> None: + """ + Log a message + :param message: Log message + :param log_type: Log type + :param retain: Save log to memory + :return: None + """ + # Add log to + log = Log(message, log_type) + + # Save log entry to memory + if retain and log.type is not Log.PREVIEW: + self.logs.append(log) + + if self.should_print(log.type): + print(log) + + @staticmethod + def should_print(log_type: str) -> bool: + """ + Check if log should be printed + :param log_type: Log type + :return: Whether to print the log + """ + # Critical (always print) + if log_type == Log.CRITICAL: + return True + + # Quiet (only critical) + if Arguments().quiet: + return False + + # Progress - default output + if log_type == Log.PROGRESS and (Arguments().debug or Arguments().verbose or Arguments().preview): + return False + + # Debug + if log_type == Log.DEBUG and not Arguments().debug: + return False + + # Verbose + if log_type == Log.VERBOSE and not Arguments().verbose: + return False + + # Preview + if log_type == Log.PREVIEW and not Arguments().preview: + return False + + return True + + def save(self, filename: str = 'tcd.log') -> None: + """ + Save retained logs to file + :param filename: File to save to + :return: None + """ + with open(filename, 'w') as file: + [file.write('{}\n'.format(log.full())) for log in self.logs] diff --git a/tcd/pipe.py b/tcd/pipe.py new file mode 100644 index 0000000..2c4ae14 --- /dev/null +++ b/tcd/pipe.py @@ -0,0 +1,233 @@ +import hashlib +import string +from datetime import datetime, timedelta +from typing import List, Optional + +import dateutil.parser +from pytz import timezone + +from .arguments import Arguments +from .safedict import SafeDict + + +class Pipe: + """ + Pipe takes care of adding custom data fields and finally + format data into comment and output file strings + """ + + def __init__(self, format_dictionary: dict): + """ + Pipe + :param format_dictionary: Comment format + """ + self.format_dictionary: dict = format_dictionary + self.valid_directory_characters: str = "-_.() %s%s" % (string.ascii_letters, string.digits) + + # Combine regular format and action_format if provided. + self.combined_formats: str = '' + if 'format' in self.format_dictionary: + self.combined_formats += self.format_dictionary['format'] + if 'action_format' in self.format_dictionary: + self.combined_formats += self.format_dictionary['action_format'] + + def format(self, data: dict) -> str: + """ + Format comment + :param data: Input data + :return: + """ + self.filter(data) + self.mapper(data) + + return self.reduce(data) + + def comment(self, comment_data: dict) -> str: + """ + Format comment data to string + :param comment_data: Comment data + :return: Formatted comment line + """ + return self.format(comment_data) + + def output(self, video_data: dict) -> str: + """ + Format output path from data + :param video_data: Video data + :return: Output string + """ + output_string = self.format(video_data) + return '{}/{}'.format(Arguments().output.rstrip('/').rstrip('\\'), output_string) + + @staticmethod + def timestamp(date_format: str, date_value: str, timezone_name: Optional[str] = None) -> str: + """ + Parse timestamp, format it and change timezone if a timezone name is given + :param date_format: Wanted date format + :param date_value: Input value to be parsed + :param timezone_name: Timezone name + :return: Timestamp in string format + """ + date: datetime = dateutil.parser.parse(date_value) + + # Convert to another timezone + if timezone_name is not None: + date = date.astimezone(timezone(timezone_name)) + + return date.strftime(date_format) + + @staticmethod + def timestamp_relative(seconds: float) -> str: + # Todo: support formatting + delta = timedelta(seconds=seconds) + delta = delta - timedelta(microseconds=delta.microseconds) + return str(delta) + + def reduce(self, data: dict) -> str: + """ + Main formatting + + Map data dictionary to format string + :param data: Input data + :return: Formatted string + """ + + # If action format is defined and comment is an action + if 'action_format' in self.format_dictionary and 'is_action' in data and bool(data['is_action']): + try: + return str(self.format_dictionary['action_format']).format_map(SafeDict(data)) + except TypeError: + print('Invalid action format in settings file:', self.format_dictionary['is_action']) + exit(1) + else: + try: + return str(self.format_dictionary['format']).format_map(SafeDict(data)) + except TypeError: + print('Invalid format in settings file:', self.format_dictionary['format']) + exit(1) + + def filter(self, data: dict) -> dict: + """ + Remove or clean data + :param data: Input data + :return: Data (input data is muted) + """ + if '{title}' in self.combined_formats: + data['title'] = ''.join(c for c in data['title'] if c in self.valid_directory_characters) + + return data + + def mapper(self, data: dict) -> dict: + """ + Make custom changes to the input data according to the format dictionary + :param data: Input data + :return: Data (input data dict is mutated) + """ + + # Timestamps + if 'timestamp' in self.format_dictionary and '{timestamp' in self.combined_formats: + + data['timestamp'] = {} + + # Absolute timestamp + if 'absolute' in self.format_dictionary['timestamp'] and '{timestamp[absolute]}' in self.combined_formats: + + # Millisecond precision - remove $f (milliseconds) from time format + if 'millisecond_precision' in self.format_dictionary: + self.format_dictionary['timestamp']['absolute'] = str( + self.format_dictionary['timestamp']['absolute']).replace( + '%f', '_MILLISECONDS_') + + # Format timestamp + data['timestamp']['absolute'] = self.timestamp(self.format_dictionary['timestamp']['absolute'], + data['created_at'], + Arguments().timezone) + + # Millisecond precision - add milliseconds to timestamp + if 'millisecond_precision' in self.format_dictionary: + milliseconds: str = self.timestamp('%f', data['created_at'], Arguments().timezone) + milliseconds = milliseconds[:self.format_dictionary['millisecond_precision']] + data['timestamp']['absolute'] = str(data['timestamp']['absolute']).replace( + '_MILLISECONDS_', + milliseconds) + + # Relative timestamp + if '{timestamp[relative]}' in self.combined_formats: + # Todo: 'relative' in self.format_dictionary['timestamp'] when relative formatting is implemented. + data['timestamp']['relative'] = self.timestamp_relative( + float(data['content_offset_seconds'])) + + # User colors + if 'message' in data: + + # Set color + if 'user_color' not in data['message']: + if 'default_user_color' in self.format_dictionary and self.format_dictionary[ + 'default_user_color'] not in ['random', + 'hash']: + data['message']['user_color'] = self.format_dictionary['default_user_color'] + else: + # Assign color based on commenter's ID + sha256 = hashlib.sha256() + sha256.update(str.encode(data['commenter']['_id'])) + + # Truncate hash and mod it by 0xffffff-1 for color hex. + color: str = hex(int(sha256.hexdigest()[:32], 16) % int(hex(0xffffff), 16)).lstrip('0x') + + # Add any missing digits + while len(color) < 6: + color = color + '0' + + data['message']['user_color'] = '#{color}'.format(color=color[:6]) + + # SSA Color + if 'message[ssa_user_color]' in self.combined_formats: + data['message']['ssa_user_color'] = '#{b}{g}{r}'.format( + b=data['message']['user_color'][5:7], + g=data['message']['user_color'][3:5], + r=data['message']['user_color'][1:3]) + + # User badges + # The Twitch API returns an array of badges, ordered by their importance (descending). + if '{commenter[badge]}' in self.combined_formats and 'message' in data: + + # Add empty badge if no badge + if 'user_badges' not in data['message']: + data['message']['user_badges'] = [{'_id': '', 'version': 1}] + + # Default badges + if 'badges' not in self.format_dictionary: + self.format_dictionary['badges'] = { + 'turbo': '[turbo]', + 'premium': '[prime]', + 'bits': '[bits]', + 'subscriber': '[subscriber]', + 'moderator': '[moderator]', + 'global_mod': '[global mod]', + 'admin': '[admin]', + 'staff': '[staff]', + 'broadcaster': '[streamer]', + } + + # Default badges setting + if 'multiple_badges' not in self.format_dictionary: + self.format_dictionary['multiple_badges'] = False + + # Get badge display text + badges: List[str] = [] + for badge in data['message']['user_badges']: + badges.append(self.format_dictionary['badges'].get(badge['_id'], '')) + + # Display multiple badges or not + if self.format_dictionary['multiple_badges']: + data['commenter']['badge'] = ''.join(badges) + else: + data['commenter']['badge'] = '' + + # Find first defined user badge + for badge in badges: + if badge != '': + data['commenter']['badge'] = badge + break + + return data diff --git a/tcd/safedict.py b/tcd/safedict.py new file mode 100644 index 0000000..2d8add5 --- /dev/null +++ b/tcd/safedict.py @@ -0,0 +1,12 @@ +class SafeDict(dict): + """ + SafeDict retains keys that do not exist + """ + + def __missing__(self, key) -> str: + """ + Return missing key as string + :param key: + :return: + """ + return '{' + key + '}' diff --git a/tcd/settings.py b/tcd/settings.py new file mode 100644 index 0000000..4c9cc89 --- /dev/null +++ b/tcd/settings.py @@ -0,0 +1,107 @@ +import json +import pathlib +from typing import Optional, Dict, Any + +from .logger import Logger, Log +from .singleton import Singleton + + +class Settings(metaclass=Singleton): + + def __init__(self, filepath: Optional[str] = None, reference_filepath: Optional[str] = None): + """ + Initialize settings with filepath and reference filepath + :param filepath: Path to settings file + :param reference_filepath: Path to reference settings file + """ + if filepath is None: + print('Settings filepath was not provided') + exit(1) + + self.filepath = pathlib.Path(filepath) + self.directory: pathlib.Path = self.filepath.parent + + self.reference_filepath = pathlib.Path(reference_filepath) + + self.config: Dict[str, Any] = self.load(filepath) + + # Update + if self.out_of_date(): + self.update() + Logger().log('Updated to version {}'.format(self.config.get('version'))) + + def load(self, filepath: str) -> Dict[str, Any]: + """ + Load dictionary from json file + :param filepath: filepath to load from + :return: Configuration dictionary + """ + + # Create settings file from reference file if necessary + if not self.filepath.exists(): + self.directory.mkdir(exist_ok=True) + + # Missing reference file + if not self.reference_filepath.exists(): + Logger().log(f'Could not find {self.reference_filepath}', Log.CRITICAL) + exit(1) + + # Load config from reference settings + with open(self.reference_filepath, 'r') as file: + config = json.load(file) + + Settings.write(self.filepath, data=config) + + return config + + # Load from settings file + try: + with open(filepath, 'r') as file: + return json.load(file) + except json.JSONDecodeError: + print('Invalid settings format') + exit(1) + + @staticmethod + def write(filepath: str, data: dict) -> None: + """ + Save configuration to settings file + :param filepath: Filepath to save to + :param data: Configuration dictionary to save + :return: None + """ + with open(filepath, 'w') as file: + json.dump(data, file, indent=4, sort_keys=True) + + def save(self) -> None: + """ + Save settings to file + :return: None + """ + Settings.write(self.filepath, self.config) + + def out_of_date(self) -> bool: + reference: dict = self.load(self.reference_filepath) + + return self.config.get('version') != reference.get('version') + + def update(self) -> None: + """ + Update configuration settings and file using reference settings. + :return: None + """ + Settings.write(pathlib.Path('{}/settings.{}.backup.json'.format(self.directory, self.config['version'])), + self.config) + new_config: dict = self.load(self.reference_filepath) + + # Copy client ID to new config file + new_config['client_id'] = self.config.get('client_id', None) + + # Copy user-defined formats to new config file + for format_name, format_dictionary in dict(self.config['formats']).items(): + if format_name not in new_config['formats']: + new_config['formats'][format_name] = format_dictionary + + # Overwrite current config with new + Settings.write(self.filepath, new_config) + self.config = new_config diff --git a/settings.example.json b/tcd/settings.reference.json similarity index 88% rename from settings.example.json rename to tcd/settings.reference.json index d4e9a78..62bd765 100644 --- a/settings.example.json +++ b/tcd/settings.reference.json @@ -10,10 +10,11 @@ "format": "[{timestamp[relative]}] <{commenter[display_name]}> {message[body]}", "timestamp": { "relative": "%X" - } + }, + "ignore_new_messages": false }, "output": { - "format": "{channel[name]}/{_id}.txt", + "format": "{id}.txt", "timestamp": { "absolute": "%x" } @@ -40,12 +41,12 @@ } }, "output": { - "format": "{channel[name]}/{_id}.log" + "format": "{id}.log" } }, "json": { "output": { - "format": "{channel[name]}/{_id}.json" + "format": "{id}.json" } }, "srt": { @@ -54,7 +55,7 @@ }, "duration": 2000, "output": { - "format": "{channel[name]}/{_id}.srt" + "format": "{id}.srt" } }, "ssa": { @@ -70,7 +71,7 @@ "fonts": "", "graphics": "", "output": { - "format": "{channel[name]}/{_id}.ssa", + "format": "{id}.ssa", "timestamp": { "absolute": "%x" } @@ -85,6 +86,5 @@ } } }, - "twitch_api": "https://api.twitch.tv/v5/{path}", - "version": "2.0.10" + "version": "3.0.1" } diff --git a/tcd/singleton.py b/tcd/singleton.py new file mode 100644 index 0000000..5236b56 --- /dev/null +++ b/tcd/singleton.py @@ -0,0 +1,13 @@ +class Singleton(type): + """ + Abstract class for singletons + """ + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super().__call__(*args, **kwargs) + return cls._instances[cls] + + def get_instance(cls, *args, **kwargs): + cls.__call__(*args, **kwargs)