From e0a7d58b41703de29d4f8b0be82391e903712311 Mon Sep 17 00:00:00 2001 From: Xevion Date: Wed, 11 May 2022 03:18:10 -0500 Subject: [PATCH] Delete deprecated and unused Flask server code --- .flaskenv | 3 - server/__init__.py | 0 server/cli.py | 538 ------------------------------------------- server/config.py | 42 ---- server/create_app.py | 50 ---- server/data.py | 15 -- server/helpers.py | 74 ------ server/process.py | 124 ---------- 8 files changed, 846 deletions(-) delete mode 100644 .flaskenv delete mode 100644 server/__init__.py delete mode 100644 server/cli.py delete mode 100644 server/config.py delete mode 100644 server/create_app.py delete mode 100644 server/data.py delete mode 100644 server/helpers.py delete mode 100644 server/process.py diff --git a/.flaskenv b/.flaskenv deleted file mode 100644 index 13d2694..0000000 --- a/.flaskenv +++ /dev/null @@ -1,3 +0,0 @@ -FLASK_APP=server.create_app -FLASK_ENV=development -FLASK_DEBUG=1 \ No newline at end of file diff --git a/server/__init__.py b/server/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/server/cli.py b/server/cli.py deleted file mode 100644 index 39fc16e..0000000 --- a/server/cli.py +++ /dev/null @@ -1,538 +0,0 @@ -""" -cli.py - -CLI entrypoint for fetching, processing and compiling quote data. -""" -import logging -import os -import re -import sys -import time -from collections import OrderedDict, defaultdict -from pprint import pprint -from typing import Dict, List, Optional, Tuple, Union - -import click -import enlighten -import requests -from bs4 import BeautifulSoup -from lxml import etree - -sys.path[0] += '\\..' -from server.helpers import algolia_transform, character_id, clean_string -from server.process import DATA_DIR, get_appearances, get_episodes, get_filepath, load_file, \ - save_file, sleep_from, \ - verify_episode - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger('cli') -logger.setLevel(logging.DEBUG) -manager = enlighten.get_manager() - - -@click.group() -def cli(): - pass - - -@cli.group() -def misc(): - pass - - -@misc.command('characters') -@click.option('-s', '--season', type=int, help='Season to be processed for character names') -@click.option('-e', '--episode', type=int, help='Episode to be processed. Requires --season to be specified.') -@click.option('-a', '--all', is_flag=True, help='Process all episodes, regardless of previous specifications.') -@click.option('-i', '--individual', is_flag=True, - help='List characters from individual episodes instead of just compiling a masterlist') -def characters(season: int, episode: int, all: bool, individual: bool): - """ - Retrieves all characters from all quotes available. - Used in order to compile a list of characters for the character page, for scanning speakers for anomalies - and mistakes, as well as for compiling a list of verified 'main' characters. - """ - - if all: - episodes = list(get_episodes()) - elif season: - if episode: - if verify_episode(season, episode): - episodes = [(season, episode)] - else: - logger.error(f'Season {season}, Episode {episode} is not a valid combination.') - return - else: - episodes = list(get_episodes(season=season)) - logger.info(f'Fetching Season {season}...') - else: - if episode: - logger.info('You must specify more than just an episode.') - else: - logger.info('You must specify which episodes to process.') - logger.info('Check --help for more information on this command.') - return - - master = dict() - for _season, _episode in episodes: - appearances = get_appearances(_season, _episode) - - if not appearances: - continue - - if individual: - logger.info(' '.join(item['name'] for item in appearances)) - - for item in appearances: - if item['id'] in master.keys(): - master[item['id']]['appearances'] += item['appearances'] - else: - master[item['id']] = item - - # print(master) - logger.info( - ', '.join(item['name'] for item in sorted(master.values(), reverse=True, key=lambda item: item['appearances']))) - - -@cli.command('fetch') -@click.option('-s', '--season', type=int, - help='Season to be fetched. Without --episode, will download all episodes in a season.') -@click.option('-e', '--episode', type=int, help='Specific episode to be fetched. Requires --season to be specified.') -@click.option('-d', '--delay', type=float, default=0.5, help='Delay between each request') -@click.option('-a', '--all', is_flag=True, help='Fetch all episodes, regardless of previous specifications.') -@click.option('-o', '--overwrite', is_flag=True, help='Overwrite if a file already exists.') -@click.option('-ss', '--silent-skip', is_flag=True, help='Skip existing files silently') -def fetch(season: int, episode: int, delay: float, all: bool, overwrite: bool, silent_skip: bool): - """ - Downloads raw quote pages from 'officequotes.net'. - - Fetches quote pages, placing them in 'html' folder in unmodified UTF-8 HTML files. - """ - episodes: List[Tuple[int, int]] - - if all: - episodes = list(get_episodes()) - elif season: - if episode: - if verify_episode(season, episode): - episodes = [(season, episode)] - else: - logger.error(f'Season {season}, Episode {episode} is not a valid combination.') - return - else: - episodes = list(get_episodes(season=season)) - logger.info(f'Fetching Season {season}...') - else: - if episode: - logger.info('You must specify more than just an episode.') - else: - logger.info('You must specify which episodes to fetch.') - logger.info('Check --help for more information on this command.') - return - - logger.debug(f'Ready to start fetching {len(episodes)} quote page{"s" if len(episodes) > 1 else ""}') - session = requests.Session() - last_request = time.time() - delay - - with enlighten.Manager() as manager: - with manager.counter(total=len(episodes), desc='Fetching...', unit='episodes') as pbar: - for _season, _episode in episodes: - filepath = get_filepath(_season, _episode, 'html') - - # Check if HTML file exists - if not overwrite and os.path.exists(filepath): - if not silent_skip: - logger.debug(f'Skipping Season {_season}, Episode {_episode}: File already exists.') - else: - logger.info(f'Fetching Season {_season}, Episode {_episode}...') - - # Generate link, make request - link = f"http://officequotes.net/no{_season}-{str(_episode).zfill(2)}.php" - - sleep_from(delay, last_request, manager) # Sleep at least :delay: seconds. - - resp = session.get(link) - last_request = time.time() - if resp.ok: - # Write data to file - save_file(filepath, resp.text, False) - logger.debug('Successfully fetched & saved.') - else: - logger.error(f'Fetching failed. Erroneous response code {resp.status_code}.') - pbar.update() - logger.info('Fetching complete.') - - -@cli.command('preprocess') -@click.option('-s', '--season', type=int, - help='Season to be fetched. Without --episode, will download all episodes in a season.') -@click.option('-e', '--episode', type=int, help='Specific episode to be fetched. Requires --season to be specified.') -@click.option('-a', '--all', is_flag=True, help='Fetch all episodes, regardless of previous specifications.') -@click.option('-o', '--overwrite', is_flag=True, help='Overwrite if a file already exists.') -@click.option('-ss', '--silent-skip', is_flag=True, help='Skip missing/existing files silently') -@click.option('-ssm', '--silent-skip-missing', is_flag=True, help='Skip missing files silently') -@click.option('-sse', '--silent-skip-existing', is_flag=True, help='Skip overwrite skips silently') -def preprocess(season: int, episode: int, all: bool, overwrite: bool, silent_skip: bool, silent_skip_missing: bool, - silent_skip_existing: bool): - """ - Pre-processes raw HTML files into mangled custom quote data. - - Custom quote data requires manual inspection and formatting, making it a dangerous operation that may overwrite - precious quote data. - """ - print(silent_skip_existing) - episodes: List[Tuple[int, int]] - - if all: - episodes = list(get_episodes()) - elif season: - if episode: - if verify_episode(season, episode): - episodes = [(season, episode)] - else: - logger.error(f'Season {season}, Episode {episode} is not a valid combination.') - return - else: - episodes = list(get_episodes(season=season)) - logger.info(f'Preprocessing Season {season}...') - else: - if episode: - logger.info('You must specify more than just an episode.') - else: - logger.info('You must specify which episodes to pre-process.') - logger.info('Check --help for more information on this command.') - return - - for season, episode in episodes: - # Overwrite protection - save_path = get_filepath(season, episode, 'raw') - if os.path.exists(save_path) and not overwrite: - if (not silent_skip) or (not silent_skip_existing): - logger.info(f'Skipping Season {season}, Episode {episode}, file already exists. Skipping processing.') - continue - - try: - page_data = load_file(get_filepath(season, episode, 'html'), False) - except FileNotFoundError: - if not silent_skip or not silent_skip_missing: - logger.warning(f'No data for Season {season}, Episode {episode} available. Skipping processing.') - else: - soup = BeautifulSoup(page_data, "html.parser") - data = [] - - sections = soup.find_all(attrs={"class": "quote"}) - for section in sections: - for br in section.find_all('br'): - br.replace_with("\n" + br.text) - - for line in section.get_text().split('\n'): - data.append(line.strip()) - - data.append('-') - data.pop(-1) - - data = '\n'.join(data) - save_file(save_path, data, False) - - -@cli.command('process') -@click.option('-s', '--season', type=int, - help='Season to be fetched. Without --episode, will download all episodes in a season.') -@click.option('-e', '--episode', type=int, help='Specific episode to be fetched. Requires --season to be specified.') -@click.option('-a', '--all', 'all_', is_flag=True, help='Fetch all episodes, regardless of previous specifications.') -@click.option('-r', '--report', is_flag=True, help='Report quote statistics once processing completed.') -def process(season: Optional[int], episode: Optional[int], all_: bool, report: bool): - """ - Processes manually processed raw quote data into JSON. - """ - episodes: List[Tuple[int, int]] - - if all_: - episodes = list(get_episodes()) - elif season: - if episode: - if verify_episode(season, episode): - episodes = [(season, episode)] - else: - logger.error(f'Season {season}, Episode {episode} is not a valid combination.') - return - else: - episodes = list(get_episodes(season=season)) - logger.info(f'Processing Season {season}...') - else: - if episode: - logger.info('You must specify more than just an episode.') - else: - logger.info('You must specify which episodes to process.') - logger.info('Check --help for more information on this command.') - return - - speakers: Dict = load_file(os.path.join(DATA_DIR, 'speakers.json'), True) - speakers = {original: new for original, new in speakers.items() if original != new and type(new) == str} - - quote: Union[str, List[str]] - section_num: int - for _season, _episode in episodes: - sections = [] - try: - preprocessed_data = load_file(get_filepath(_season, _episode, 'raw')) - for section_num, raw_section in enumerate(re.split('^-', preprocessed_data, flags=re.MULTILINE), start=1): - section = { - 'quotes': [] - } - - section_data = list(raw_section.strip().split('\n')) - if section_data[0].startswith('!'): - section['deleted'] = int(re.search('!(\d+)', section_data.pop(0)).group(1)) - - for quote in section_data: - quote = quote.split('|', 1) - - section['quotes'].append( - { - 'speaker': clean_string(speakers.get(quote[0], quote[0])), - 'text': clean_string(quote[1]) - } - ) - sections.append(section) - except FileNotFoundError: - logger.info(f'Skipped Season {_season}, Episode {_episode}, no file found.') - continue - except: - logger.exception(f'Skipped Season {_season}, Episode {_episode}: Malformed data.') - if quote: - logger.info( - f'Last quote seen "{quote if type(quote) is str else "|".join(quote)}" in section {section_num}') - else: - # Save processed data - save_file(get_filepath(_season, _episode, 'processed'), sections, True) - - if report: - deleted_count = [0, set()] - quote_count = 0 - speakers = set() - - for section in sections: - quote_count += len(section['quotes']) - - if 'deleted' in section.keys(): - deleted_count[0] += 1 - deleted_count[1].add(section['deleted']) - - for quote in section['quotes']: - speakers.add(quote['speaker']) - - logger.debug(f'{quote_count} quotes.') - logger.debug(f'{deleted_count[0]} different deleted sections, {len(deleted_count[1])} unique.') - logger.info(f'{len(speakers)} Speakers:') - logger.info(', '.join(speakers)) - - -@cli.command('xml') -@click.option('-s', '--season', type=int, help='Season to be fetched. Without --episode, will download all episodes in a season.') -@click.option('-e', '--episode', type=int, help='Specific episode to be fetched. Requires --season to be specified.') -@click.option('-a', '--all', 'all_', is_flag=True, help='Fetch all episodes, regardless of previous specifications.') -@click.option('-r', '--report', is_flag=True, help='Report quote statistics once processing completed.') -def xml(season: Optional[int], episode: Optional[int], all_: bool, report: bool): - """ - Processes manually processed raw quote data into JSON. - """ - episodes: List[Tuple[int, int]] - - if all_: - episodes = list(get_episodes()) - elif season: - if episode: - if verify_episode(season, episode): - episodes = [(season, episode)] - else: - logger.error(f'Season {season}, Episode {episode} is not a valid combination.') - return - else: - episodes = list(get_episodes(season=season)) - logger.info(f'Processing Season {season}...') - else: - if episode: - logger.info('You must specify more than just an episode.') - else: - logger.info('You must specify which episodes to process.') - logger.info('Check --help for more information on this command.') - return - - for _season, _episode in episodes: - try: - processed_data = load_file(get_filepath(_season, _episode, 'processed'), True) - rootElement = etree.Element('SceneList') - for scene in processed_data: - sceneElement = etree.Element('Scene') - for quote in scene['quotes']: - charactersElement = etree.Element('Characters') - sceneElement.append(charactersElement) - - rootElement.append(sceneElement) - - save_file(get_filepath(_season, _episode, 'xml')) - except FileNotFoundError: - logger.info(f'Skipped Season {_season}, Episode {_episode}, no file found.') - continue - -@cli.command('truth') -def truth(): - """Modify""" - - -@cli.command('characters') -def characters(): - """Collects all characters from every single processed JSON file.""" - episodes = list(get_episodes()) - speakersList = OrderedDict() - - for _season, _episode in episodes: - try: - processed_data = load_file(get_filepath(_season, _episode, 'processed'), True) - for scene in processed_data: - for quote in scene['quotes']: - speakersList[quote['speaker']] = None - except FileNotFoundError: - logger.warning(f"Skipped {_season}-{_episode}, no file found.") - - speaker_data = OrderedDict([(item, item) for item in sorted(speakersList.keys())]) - print(f'{len(speaker_data)} speakers identified.') - - pprint(list(speaker_data.keys())) - save_file(os.path.join(DATA_DIR, 'speakers.json'), speaker_data, True) - - -@cli.group('build') -def build(): - """Build final data files used by Algolia and the backend API.""" - pass - - -@build.command('algolia') -@click.option('-ss', '--silent-skip', is_flag=True, help='Skip existing files silently') -@click.option('--process', 'process_', is_flag=True, help='Run processing before building final data.') -def algolia(silent_skip: bool, process_: bool): - """ - Generates algolia.json, a all encompassing file for Algolia's search index. - """ - if process_: - logger.info('Processing before building algolia.json') - try: - process(["--all", '--silent']) - except: - pass - - data = [] - episode_num_abs, section_num_abs, quote_num_abs = 0, 0, 0 - for season, episode in get_episodes(): - episode_num_abs += 1 - try: - episode_data = load_file(get_filepath(season, episode, 'processed'), True) - except FileNotFoundError: - if not silent_skip: - logger.warning(f'Skipping Season {season}, Episode {episode}. No episode data file found.') - else: - for section_num_rel, section in enumerate(episode_data, start=1): - section_num_abs += 1 - for quote_num_rel, quote in enumerate(section['quotes'], start=1): - quote_num_abs += 1 - - # Relative position - quote['quote_rel'] = quote_num_rel - quote['section_rel'] = section_num_rel - quote['episode_rel'] = episode - # Absolute position - quote['quote_abs'] = quote_num_abs - quote['section_abs'] = section_num_abs - quote['episode_abs'] = episode_num_abs - - quote['season'] = season - - quote['is_deleted'] = 'deleted' in section.keys() - quote['deleted_section'] = section.get('deleted') - - data.append(quote) - - logger.info(f'Saving {len(data):,} quotes to algolia.json') - save_file(os.path.join(DATA_DIR, 'algolia.json'), data, True) - - -@build.command('character') -def character(): - """ - Uses algolia.json to build a characters.json file, a masterlist of quotes separated by the speaker. - Speakers not considered 'main characters' are excluded from the list. - This file also pulls information to build character descriptions and other relevant information. - """ - data = load_file(os.path.join(DATA_DIR, 'algolia.json'), True) - descriptions = load_file(os.path.join(DATA_DIR, 'character_descriptions.json'), True) - - key_list = [('speaker',), ('text',), ('season',), ('episode_rel', 'episode'), ('section_rel', 'scene'), - ('quote_rel', 'quote')] - master = map(lambda item: algolia_transform(item, key_list), filter(lambda item: True, data)) - - # Separate the quotes based on speaker - char_data = defaultdict(list) - for quote in master: - char_data[character_id(quote['speaker'])].append(quote) - - final_data = {} - for character, quotes in char_data.items(): - final_data[character] = {'quotes': quotes, 'summary': None, 'name': None} - if character in descriptions.keys(): - for key in ['name', 'summary', 'actor']: - final_data[character][key] = descriptions[character].get(key) - - # Filter for main characters. - main_characters = list(map(character_id, load_file(os.path.join(DATA_DIR, 'main_characters.json'), True))) - for character in list(final_data.keys()): - if character not in main_characters: - del final_data[character] - - # Save to characters.json - save_file(os.path.join(DATA_DIR, 'characters.json'), final_data, True) - - -@build.command('final') -@click.option('-ss', '--silent-skip', is_flag=True, help='Skip existing files silently') -@click.option('--process', 'process_', is_flag=True, help='Run processing before building final data.') -def final(silent_skip: bool, process_: bool): - """Generates the latest application static data.json file, used by the backend API.""" - descriptions = load_file(os.path.join(DATA_DIR, 'descriptions.json'), True) - seasons = [{'season_id': season, 'episodes': []} for season in range(1, 10)] - - if process_: - logger.info('Processing before building final.json') - try: - process(["--all"]) - except: - pass - - for season_id, episode_id in get_episodes(): - # Load data file - try: - episode_data = load_file(get_filepath(season_id, episode_id, 'processed'), True) - except FileNotFoundError: - if not silent_skip: - logger.warning( - f'No data for Season {season_id}, Episode {episode_id} available. Null data inserted.') - episode_data = None - - description = descriptions[season_id - 1][episode_id - 1] - seasons[season_id - 1]['episodes'].append( - { - 'title': description['title'].strip(), - 'description': description['description'].strip(), - 'episode_id': episode_id, - 'characters': get_appearances(season_id, episode_id), - 'scenes': episode_data - } - ) - - logger.info('Saving to data.json') - save_file(os.path.join(DATA_DIR, 'data.json'), seasons, True) - - -if __name__ == "__main__": - cli() diff --git a/server/config.py b/server/config.py deleted file mode 100644 index 14403f3..0000000 --- a/server/config.py +++ /dev/null @@ -1,42 +0,0 @@ -""" -config.py - -Stores all configurations used by the application from database URLs to Secret keys to extension settings. -""" - -import os - -configs = { - 'development': 'server.config.DevelopmentConfig', - 'testing': 'server.config.TestingConfig', - 'production': 'server.config.ProductionConfig' -} - - -class Config: - """ - Base configuration. - """ - pass - - -class DevelopmentConfig(Config): - """ - Insecure and unrecommended config for use during development. - """ - SECRET_KEY = 'INSECURE' - - -class TestingConfig(DevelopmentConfig): - """ - Configuration used for testing the application. - """ - TESTING = True - WTF_CSRF_ENABLED = False - - -class ProductionConfig(Config): - """ - Configuration used for running in secure production environment. - """ - SECRET_KEY = os.getenv('SECRET_KEY') diff --git a/server/create_app.py b/server/create_app.py deleted file mode 100644 index 3aa94d6..0000000 --- a/server/create_app.py +++ /dev/null @@ -1,50 +0,0 @@ -""" -create_app.py - -The create_app function used to create and initialize the app with all of it's extensions and settings. -""" - -from flask import Flask, render_template -from flask_cors import CORS -from flask_wtf.csrf import CSRFProtect - -from server.config import configs - -csrf = CSRFProtect() -cors = CORS(resources={r'/api/*': {'origins': '*'}}) - - -def create_app(env=None): - """ - The create_app function used to create and initialize the app with all of it's extensions and settings. - """ - app = Flask(__name__, - static_folder="./../dist/static", - template_folder="./../dist" - ) - - # Load configuration values - if not env: - env = app.config['ENV'] - app.config.from_object(configs[env]) - - # Initialize Flask extensions - csrf.init_app(app) - cors.init_app(app) - - # CLI commands setup - @app.shell_context_processor - def shell_context(): - """Provides specific Flask components to the shell.""" - return {'app': app} - - with app.app_context(): - # noinspection PyUnresolvedReferences - from server import api - - @app.route('/', defaults={'path': ''}) - @app.route('/') - def catch_all(path): - return render_template("index.html") - - return app diff --git a/server/data.py b/server/data.py deleted file mode 100644 index 267ef7e..0000000 --- a/server/data.py +++ /dev/null @@ -1,15 +0,0 @@ -""" -data.py - -Manages API quote/character data, caching static responses and reloading from disk. -""" -import os -import json - -BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -with open(os.path.join(BASE_DIR, 'data', 'data.json'), 'r', encoding='utf-8') as file: - data = json.load(file) - -with open(os.path.join(BASE_DIR, 'data', 'characters.json'), 'r', encoding='utf-8') as file: - character_data = json.load(file) - diff --git a/server/helpers.py b/server/helpers.py deleted file mode 100644 index 8f757c7..0000000 --- a/server/helpers.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -helpers.py - - -""" -import random -import re -import string -import unicodedata -from collections import OrderedDict -from difflib import SequenceMatcher -from heapq import nlargest as _nlargest -from typing import List, Optional, Tuple - -import unidecode - -episode_counts = [6, 22, 23, 14, 26, 24, 24, 24, 23] - - -def check_validity(season: int, episode: int): - """Shorthand function for checking if a specific episode is valid.""" - return (1 <= season <= 9) and (1 <= episode <= episode_counts[season]) - - -def default(value, other): - """Value default, similar to dict.get, but better.""" - return value if value is not None else other - - -def get_neighbors(array: List, index: int, distance: int = 2) -> Tuple[List, List]: - """Get neighbors above and below a specific index in an array. Returns maximum number of items possible.""" - top, below = [], [] - for i in range(1, distance + 1): - top_index = index - i - below_index = index + i - if top_index >= 0: - top.append(array[top_index]) - if below_index < len(array): - below.append(array[below_index]) - return top[::-1], below - - -def algolia_transform(old_dictionary: dict, key_list: List[Tuple[str, Optional[str]]]) -> dict: - """ - Transforms a dictionary object of a quote (from algolia.json) into a API-ready quote. - Used for cli.character (i.e. characters.json) - :param old_dictionary: The original Algolia dictionary - :param key_list: A list of keys to keep in the dictionary in a tuple. One item tuple to keep the tuple's name, a - second item requests a 'rename' for the quote. - :return: The reformatted dictionary. - """ - - new_dictionary = {} - for keyItem in key_list: - if len(keyItem) > 1: - new_dictionary[keyItem[1]] = old_dictionary[keyItem[0]] - else: - new_dictionary[keyItem[0]] = old_dictionary[keyItem[0]] - - return new_dictionary - -def character_id(name: str) -> str: - return '-'.join(name.split(' ')).lower() - - -alphabet: str = string.ascii_letters + string.digits - - -def random_id(length: int = 8) -> str: - """Generate a random {length} character long string.""" - return ''.join(random.choices(alphabet, k=length)) - - - diff --git a/server/process.py b/server/process.py deleted file mode 100644 index 4f81791..0000000 --- a/server/process.py +++ /dev/null @@ -1,124 +0,0 @@ -""" -process.py - -Functions and shortcuts for loading/saving/extracting data for processing quote data. -""" - -import json -import os -import time -from collections import defaultdict -from math import ceil -from typing import Dict, Iterable, List, Optional, Tuple, Union - -import enlighten -import requests - -from server.helpers import character_id - -session = requests.Session() - -BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -DATA_DIR = os.path.join(BASE_DIR, 'data') - -folder_exts = {'html': 'html', 'processed': 'json', 'raw': 'txt'} -episode_counts = [6, 22, 23, 14, 26, 24, 24, 24, 23] - - -def get_filename(season: int, episode: int, extension: str) -> str: - """Get filename for any given episode in standardized format""" - return f'{season}-{str(episode).zfill(2)}.{extension}' - - -def get_filepath(season: int, episode: int, folder: str) -> str: - """Get full filepath for a episode's datafile for a given folder.""" - if folder: - return os.path.join(DATA_DIR, folder, get_filename(season, episode, folder_exts.get(folder, 'json'))) - return os.path.join(DATA_DIR, get_filename(season, episode, 'json')) - - -def load_file(filepath: str, json_decode: bool = False): - """Shortcut function for loading file from filepath, with JSON parsing flag.""" - if json_decode: - with open(filepath, 'r', encoding='utf-8') as file: - return json.load(file) - else: - with open(filepath, 'r', encoding='utf-8') as file: - return file.read() - - -def save_file(filepath: str, data, json_encode: bool): - """Shortcut function for saving data to a file, JSON encoding flag.""" - if json_encode: - with open(filepath, 'w', encoding='utf-8') as file: - json.dump(data, file, ensure_ascii=False, indent=4) - else: - with open(filepath, 'w', encoding='utf-8') as file: - file.write(data) - - -def get_episodes(season: int = None) -> Iterable[Tuple[int, int]]: - """ - Yields a list of Episode & Season tuples. - If Season is specified, it yields - """ - if season: - if 1 <= season <= 9: - for episode in range(1, episode_counts[season - 1]): - yield season, episode - else: - for season, ep_count in enumerate(episode_counts, start=1): - for episode in range(1, ep_count + 1): - yield season, episode - - -def verify_episode(season: int, episode: int = None) -> bool: - """ - Verifies that specific Season and/or Episode is valid. - """ - return 1 <= season <= 9 and (episode is None or 1 <= episode <= episode_counts[season - 1]) - - -def sleep_from(wait_time: float, moment: float, manager: enlighten.Manager = None) -> float: - """ - Sleeps for a specific amount of time, accordingly to a previous moment. - - :param wait_time: The minimum amount of time that must be waited since the specified moment. - :param moment: Epoch time. - :param manager: Progressbar Manager - """ - passed = time.time() - moment - time_slept = wait_time - passed - if time_slept > 0.01: - if manager: - time_slept = round(time_slept, 2) - total, delay = ceil(time_slept * 100), time_slept / 100 - bar = manager.counter(total=total, desc='Sleeping...', leave=False) - for _ in range(total): - time.sleep(delay) - bar.update() - bar.close() - else: - time.sleep(time_slept) - return time_slept - else: - return 0 - - -def get_appearances(season, episode) -> Optional[List[Dict[str, Union[int, str]]]]: - """ - Extracts all characters and their number of appearances from a specific episode. - Prepared in a list of dictionary, preferable storage/for loop method. - """ - filepath = get_filepath(season, episode, 'processed') - if not os.path.exists(filepath): - return - scenes = load_file(filepath, True) - - characters = defaultdict(int) - for scene in scenes: - for quote in scene.get('quotes', []): - characters[quote.get('speaker')] += 1 - characters = [{'name': character, 'appearances': appearances, 'id': character_id(character)} - for character, appearances in characters.items()] - return list(sorted(characters, key=lambda item: item['appearances'], reverse=True))