mirror of
https://github.com/Xevion/the-office.git
synced 2025-12-15 08:13:29 -06:00
Delete deprecated and unused Flask server code
This commit is contained in:
@@ -1,3 +0,0 @@
|
||||
FLASK_APP=server.create_app
|
||||
FLASK_ENV=development
|
||||
FLASK_DEBUG=1
|
||||
538
server/cli.py
538
server/cli.py
@@ -1,538 +0,0 @@
|
||||
"""
|
||||
cli.py
|
||||
|
||||
CLI entrypoint for fetching, processing and compiling quote data.
|
||||
"""
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from collections import OrderedDict, defaultdict
|
||||
from pprint import pprint
|
||||
from typing import Dict, List, Optional, Tuple, Union
|
||||
|
||||
import click
|
||||
import enlighten
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from lxml import etree
|
||||
|
||||
sys.path[0] += '\\..'
|
||||
from server.helpers import algolia_transform, character_id, clean_string
|
||||
from server.process import DATA_DIR, get_appearances, get_episodes, get_filepath, load_file, \
|
||||
save_file, sleep_from, \
|
||||
verify_episode
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger('cli')
|
||||
logger.setLevel(logging.DEBUG)
|
||||
manager = enlighten.get_manager()
|
||||
|
||||
|
||||
@click.group()
|
||||
def cli():
|
||||
pass
|
||||
|
||||
|
||||
@cli.group()
|
||||
def misc():
|
||||
pass
|
||||
|
||||
|
||||
@misc.command('characters')
|
||||
@click.option('-s', '--season', type=int, help='Season to be processed for character names')
|
||||
@click.option('-e', '--episode', type=int, help='Episode to be processed. Requires --season to be specified.')
|
||||
@click.option('-a', '--all', is_flag=True, help='Process all episodes, regardless of previous specifications.')
|
||||
@click.option('-i', '--individual', is_flag=True,
|
||||
help='List characters from individual episodes instead of just compiling a masterlist')
|
||||
def characters(season: int, episode: int, all: bool, individual: bool):
|
||||
"""
|
||||
Retrieves all characters from all quotes available.
|
||||
Used in order to compile a list of characters for the character page, for scanning speakers for anomalies
|
||||
and mistakes, as well as for compiling a list of verified 'main' characters.
|
||||
"""
|
||||
|
||||
if all:
|
||||
episodes = list(get_episodes())
|
||||
elif season:
|
||||
if episode:
|
||||
if verify_episode(season, episode):
|
||||
episodes = [(season, episode)]
|
||||
else:
|
||||
logger.error(f'Season {season}, Episode {episode} is not a valid combination.')
|
||||
return
|
||||
else:
|
||||
episodes = list(get_episodes(season=season))
|
||||
logger.info(f'Fetching Season {season}...')
|
||||
else:
|
||||
if episode:
|
||||
logger.info('You must specify more than just an episode.')
|
||||
else:
|
||||
logger.info('You must specify which episodes to process.')
|
||||
logger.info('Check --help for more information on this command.')
|
||||
return
|
||||
|
||||
master = dict()
|
||||
for _season, _episode in episodes:
|
||||
appearances = get_appearances(_season, _episode)
|
||||
|
||||
if not appearances:
|
||||
continue
|
||||
|
||||
if individual:
|
||||
logger.info(' '.join(item['name'] for item in appearances))
|
||||
|
||||
for item in appearances:
|
||||
if item['id'] in master.keys():
|
||||
master[item['id']]['appearances'] += item['appearances']
|
||||
else:
|
||||
master[item['id']] = item
|
||||
|
||||
# print(master)
|
||||
logger.info(
|
||||
', '.join(item['name'] for item in sorted(master.values(), reverse=True, key=lambda item: item['appearances'])))
|
||||
|
||||
|
||||
@cli.command('fetch')
|
||||
@click.option('-s', '--season', type=int,
|
||||
help='Season to be fetched. Without --episode, will download all episodes in a season.')
|
||||
@click.option('-e', '--episode', type=int, help='Specific episode to be fetched. Requires --season to be specified.')
|
||||
@click.option('-d', '--delay', type=float, default=0.5, help='Delay between each request')
|
||||
@click.option('-a', '--all', is_flag=True, help='Fetch all episodes, regardless of previous specifications.')
|
||||
@click.option('-o', '--overwrite', is_flag=True, help='Overwrite if a file already exists.')
|
||||
@click.option('-ss', '--silent-skip', is_flag=True, help='Skip existing files silently')
|
||||
def fetch(season: int, episode: int, delay: float, all: bool, overwrite: bool, silent_skip: bool):
|
||||
"""
|
||||
Downloads raw quote pages from 'officequotes.net'.
|
||||
|
||||
Fetches quote pages, placing them in 'html' folder in unmodified UTF-8 HTML files.
|
||||
"""
|
||||
episodes: List[Tuple[int, int]]
|
||||
|
||||
if all:
|
||||
episodes = list(get_episodes())
|
||||
elif season:
|
||||
if episode:
|
||||
if verify_episode(season, episode):
|
||||
episodes = [(season, episode)]
|
||||
else:
|
||||
logger.error(f'Season {season}, Episode {episode} is not a valid combination.')
|
||||
return
|
||||
else:
|
||||
episodes = list(get_episodes(season=season))
|
||||
logger.info(f'Fetching Season {season}...')
|
||||
else:
|
||||
if episode:
|
||||
logger.info('You must specify more than just an episode.')
|
||||
else:
|
||||
logger.info('You must specify which episodes to fetch.')
|
||||
logger.info('Check --help for more information on this command.')
|
||||
return
|
||||
|
||||
logger.debug(f'Ready to start fetching {len(episodes)} quote page{"s" if len(episodes) > 1 else ""}')
|
||||
session = requests.Session()
|
||||
last_request = time.time() - delay
|
||||
|
||||
with enlighten.Manager() as manager:
|
||||
with manager.counter(total=len(episodes), desc='Fetching...', unit='episodes') as pbar:
|
||||
for _season, _episode in episodes:
|
||||
filepath = get_filepath(_season, _episode, 'html')
|
||||
|
||||
# Check if HTML file exists
|
||||
if not overwrite and os.path.exists(filepath):
|
||||
if not silent_skip:
|
||||
logger.debug(f'Skipping Season {_season}, Episode {_episode}: File already exists.')
|
||||
else:
|
||||
logger.info(f'Fetching Season {_season}, Episode {_episode}...')
|
||||
|
||||
# Generate link, make request
|
||||
link = f"http://officequotes.net/no{_season}-{str(_episode).zfill(2)}.php"
|
||||
|
||||
sleep_from(delay, last_request, manager) # Sleep at least :delay: seconds.
|
||||
|
||||
resp = session.get(link)
|
||||
last_request = time.time()
|
||||
if resp.ok:
|
||||
# Write data to file
|
||||
save_file(filepath, resp.text, False)
|
||||
logger.debug('Successfully fetched & saved.')
|
||||
else:
|
||||
logger.error(f'Fetching failed. Erroneous response code {resp.status_code}.')
|
||||
pbar.update()
|
||||
logger.info('Fetching complete.')
|
||||
|
||||
|
||||
@cli.command('preprocess')
|
||||
@click.option('-s', '--season', type=int,
|
||||
help='Season to be fetched. Without --episode, will download all episodes in a season.')
|
||||
@click.option('-e', '--episode', type=int, help='Specific episode to be fetched. Requires --season to be specified.')
|
||||
@click.option('-a', '--all', is_flag=True, help='Fetch all episodes, regardless of previous specifications.')
|
||||
@click.option('-o', '--overwrite', is_flag=True, help='Overwrite if a file already exists.')
|
||||
@click.option('-ss', '--silent-skip', is_flag=True, help='Skip missing/existing files silently')
|
||||
@click.option('-ssm', '--silent-skip-missing', is_flag=True, help='Skip missing files silently')
|
||||
@click.option('-sse', '--silent-skip-existing', is_flag=True, help='Skip overwrite skips silently')
|
||||
def preprocess(season: int, episode: int, all: bool, overwrite: bool, silent_skip: bool, silent_skip_missing: bool,
|
||||
silent_skip_existing: bool):
|
||||
"""
|
||||
Pre-processes raw HTML files into mangled custom quote data.
|
||||
|
||||
Custom quote data requires manual inspection and formatting, making it a dangerous operation that may overwrite
|
||||
precious quote data.
|
||||
"""
|
||||
print(silent_skip_existing)
|
||||
episodes: List[Tuple[int, int]]
|
||||
|
||||
if all:
|
||||
episodes = list(get_episodes())
|
||||
elif season:
|
||||
if episode:
|
||||
if verify_episode(season, episode):
|
||||
episodes = [(season, episode)]
|
||||
else:
|
||||
logger.error(f'Season {season}, Episode {episode} is not a valid combination.')
|
||||
return
|
||||
else:
|
||||
episodes = list(get_episodes(season=season))
|
||||
logger.info(f'Preprocessing Season {season}...')
|
||||
else:
|
||||
if episode:
|
||||
logger.info('You must specify more than just an episode.')
|
||||
else:
|
||||
logger.info('You must specify which episodes to pre-process.')
|
||||
logger.info('Check --help for more information on this command.')
|
||||
return
|
||||
|
||||
for season, episode in episodes:
|
||||
# Overwrite protection
|
||||
save_path = get_filepath(season, episode, 'raw')
|
||||
if os.path.exists(save_path) and not overwrite:
|
||||
if (not silent_skip) or (not silent_skip_existing):
|
||||
logger.info(f'Skipping Season {season}, Episode {episode}, file already exists. Skipping processing.')
|
||||
continue
|
||||
|
||||
try:
|
||||
page_data = load_file(get_filepath(season, episode, 'html'), False)
|
||||
except FileNotFoundError:
|
||||
if not silent_skip or not silent_skip_missing:
|
||||
logger.warning(f'No data for Season {season}, Episode {episode} available. Skipping processing.')
|
||||
else:
|
||||
soup = BeautifulSoup(page_data, "html.parser")
|
||||
data = []
|
||||
|
||||
sections = soup.find_all(attrs={"class": "quote"})
|
||||
for section in sections:
|
||||
for br in section.find_all('br'):
|
||||
br.replace_with("\n" + br.text)
|
||||
|
||||
for line in section.get_text().split('\n'):
|
||||
data.append(line.strip())
|
||||
|
||||
data.append('-')
|
||||
data.pop(-1)
|
||||
|
||||
data = '\n'.join(data)
|
||||
save_file(save_path, data, False)
|
||||
|
||||
|
||||
@cli.command('process')
|
||||
@click.option('-s', '--season', type=int,
|
||||
help='Season to be fetched. Without --episode, will download all episodes in a season.')
|
||||
@click.option('-e', '--episode', type=int, help='Specific episode to be fetched. Requires --season to be specified.')
|
||||
@click.option('-a', '--all', 'all_', is_flag=True, help='Fetch all episodes, regardless of previous specifications.')
|
||||
@click.option('-r', '--report', is_flag=True, help='Report quote statistics once processing completed.')
|
||||
def process(season: Optional[int], episode: Optional[int], all_: bool, report: bool):
|
||||
"""
|
||||
Processes manually processed raw quote data into JSON.
|
||||
"""
|
||||
episodes: List[Tuple[int, int]]
|
||||
|
||||
if all_:
|
||||
episodes = list(get_episodes())
|
||||
elif season:
|
||||
if episode:
|
||||
if verify_episode(season, episode):
|
||||
episodes = [(season, episode)]
|
||||
else:
|
||||
logger.error(f'Season {season}, Episode {episode} is not a valid combination.')
|
||||
return
|
||||
else:
|
||||
episodes = list(get_episodes(season=season))
|
||||
logger.info(f'Processing Season {season}...')
|
||||
else:
|
||||
if episode:
|
||||
logger.info('You must specify more than just an episode.')
|
||||
else:
|
||||
logger.info('You must specify which episodes to process.')
|
||||
logger.info('Check --help for more information on this command.')
|
||||
return
|
||||
|
||||
speakers: Dict = load_file(os.path.join(DATA_DIR, 'speakers.json'), True)
|
||||
speakers = {original: new for original, new in speakers.items() if original != new and type(new) == str}
|
||||
|
||||
quote: Union[str, List[str]]
|
||||
section_num: int
|
||||
for _season, _episode in episodes:
|
||||
sections = []
|
||||
try:
|
||||
preprocessed_data = load_file(get_filepath(_season, _episode, 'raw'))
|
||||
for section_num, raw_section in enumerate(re.split('^-', preprocessed_data, flags=re.MULTILINE), start=1):
|
||||
section = {
|
||||
'quotes': []
|
||||
}
|
||||
|
||||
section_data = list(raw_section.strip().split('\n'))
|
||||
if section_data[0].startswith('!'):
|
||||
section['deleted'] = int(re.search('!(\d+)', section_data.pop(0)).group(1))
|
||||
|
||||
for quote in section_data:
|
||||
quote = quote.split('|', 1)
|
||||
|
||||
section['quotes'].append(
|
||||
{
|
||||
'speaker': clean_string(speakers.get(quote[0], quote[0])),
|
||||
'text': clean_string(quote[1])
|
||||
}
|
||||
)
|
||||
sections.append(section)
|
||||
except FileNotFoundError:
|
||||
logger.info(f'Skipped Season {_season}, Episode {_episode}, no file found.')
|
||||
continue
|
||||
except:
|
||||
logger.exception(f'Skipped Season {_season}, Episode {_episode}: Malformed data.')
|
||||
if quote:
|
||||
logger.info(
|
||||
f'Last quote seen "{quote if type(quote) is str else "|".join(quote)}" in section {section_num}')
|
||||
else:
|
||||
# Save processed data
|
||||
save_file(get_filepath(_season, _episode, 'processed'), sections, True)
|
||||
|
||||
if report:
|
||||
deleted_count = [0, set()]
|
||||
quote_count = 0
|
||||
speakers = set()
|
||||
|
||||
for section in sections:
|
||||
quote_count += len(section['quotes'])
|
||||
|
||||
if 'deleted' in section.keys():
|
||||
deleted_count[0] += 1
|
||||
deleted_count[1].add(section['deleted'])
|
||||
|
||||
for quote in section['quotes']:
|
||||
speakers.add(quote['speaker'])
|
||||
|
||||
logger.debug(f'{quote_count} quotes.')
|
||||
logger.debug(f'{deleted_count[0]} different deleted sections, {len(deleted_count[1])} unique.')
|
||||
logger.info(f'{len(speakers)} Speakers:')
|
||||
logger.info(', '.join(speakers))
|
||||
|
||||
|
||||
@cli.command('xml')
|
||||
@click.option('-s', '--season', type=int, help='Season to be fetched. Without --episode, will download all episodes in a season.')
|
||||
@click.option('-e', '--episode', type=int, help='Specific episode to be fetched. Requires --season to be specified.')
|
||||
@click.option('-a', '--all', 'all_', is_flag=True, help='Fetch all episodes, regardless of previous specifications.')
|
||||
@click.option('-r', '--report', is_flag=True, help='Report quote statistics once processing completed.')
|
||||
def xml(season: Optional[int], episode: Optional[int], all_: bool, report: bool):
|
||||
"""
|
||||
Processes manually processed raw quote data into JSON.
|
||||
"""
|
||||
episodes: List[Tuple[int, int]]
|
||||
|
||||
if all_:
|
||||
episodes = list(get_episodes())
|
||||
elif season:
|
||||
if episode:
|
||||
if verify_episode(season, episode):
|
||||
episodes = [(season, episode)]
|
||||
else:
|
||||
logger.error(f'Season {season}, Episode {episode} is not a valid combination.')
|
||||
return
|
||||
else:
|
||||
episodes = list(get_episodes(season=season))
|
||||
logger.info(f'Processing Season {season}...')
|
||||
else:
|
||||
if episode:
|
||||
logger.info('You must specify more than just an episode.')
|
||||
else:
|
||||
logger.info('You must specify which episodes to process.')
|
||||
logger.info('Check --help for more information on this command.')
|
||||
return
|
||||
|
||||
for _season, _episode in episodes:
|
||||
try:
|
||||
processed_data = load_file(get_filepath(_season, _episode, 'processed'), True)
|
||||
rootElement = etree.Element('SceneList')
|
||||
for scene in processed_data:
|
||||
sceneElement = etree.Element('Scene')
|
||||
for quote in scene['quotes']:
|
||||
charactersElement = etree.Element('Characters')
|
||||
sceneElement.append(charactersElement)
|
||||
|
||||
rootElement.append(sceneElement)
|
||||
|
||||
save_file(get_filepath(_season, _episode, 'xml'))
|
||||
except FileNotFoundError:
|
||||
logger.info(f'Skipped Season {_season}, Episode {_episode}, no file found.')
|
||||
continue
|
||||
|
||||
@cli.command('truth')
|
||||
def truth():
|
||||
"""Modify"""
|
||||
|
||||
|
||||
@cli.command('characters')
|
||||
def characters():
|
||||
"""Collects all characters from every single processed JSON file."""
|
||||
episodes = list(get_episodes())
|
||||
speakersList = OrderedDict()
|
||||
|
||||
for _season, _episode in episodes:
|
||||
try:
|
||||
processed_data = load_file(get_filepath(_season, _episode, 'processed'), True)
|
||||
for scene in processed_data:
|
||||
for quote in scene['quotes']:
|
||||
speakersList[quote['speaker']] = None
|
||||
except FileNotFoundError:
|
||||
logger.warning(f"Skipped {_season}-{_episode}, no file found.")
|
||||
|
||||
speaker_data = OrderedDict([(item, item) for item in sorted(speakersList.keys())])
|
||||
print(f'{len(speaker_data)} speakers identified.')
|
||||
|
||||
pprint(list(speaker_data.keys()))
|
||||
save_file(os.path.join(DATA_DIR, 'speakers.json'), speaker_data, True)
|
||||
|
||||
|
||||
@cli.group('build')
|
||||
def build():
|
||||
"""Build final data files used by Algolia and the backend API."""
|
||||
pass
|
||||
|
||||
|
||||
@build.command('algolia')
|
||||
@click.option('-ss', '--silent-skip', is_flag=True, help='Skip existing files silently')
|
||||
@click.option('--process', 'process_', is_flag=True, help='Run processing before building final data.')
|
||||
def algolia(silent_skip: bool, process_: bool):
|
||||
"""
|
||||
Generates algolia.json, a all encompassing file for Algolia's search index.
|
||||
"""
|
||||
if process_:
|
||||
logger.info('Processing before building algolia.json')
|
||||
try:
|
||||
process(["--all", '--silent'])
|
||||
except:
|
||||
pass
|
||||
|
||||
data = []
|
||||
episode_num_abs, section_num_abs, quote_num_abs = 0, 0, 0
|
||||
for season, episode in get_episodes():
|
||||
episode_num_abs += 1
|
||||
try:
|
||||
episode_data = load_file(get_filepath(season, episode, 'processed'), True)
|
||||
except FileNotFoundError:
|
||||
if not silent_skip:
|
||||
logger.warning(f'Skipping Season {season}, Episode {episode}. No episode data file found.')
|
||||
else:
|
||||
for section_num_rel, section in enumerate(episode_data, start=1):
|
||||
section_num_abs += 1
|
||||
for quote_num_rel, quote in enumerate(section['quotes'], start=1):
|
||||
quote_num_abs += 1
|
||||
|
||||
# Relative position
|
||||
quote['quote_rel'] = quote_num_rel
|
||||
quote['section_rel'] = section_num_rel
|
||||
quote['episode_rel'] = episode
|
||||
# Absolute position
|
||||
quote['quote_abs'] = quote_num_abs
|
||||
quote['section_abs'] = section_num_abs
|
||||
quote['episode_abs'] = episode_num_abs
|
||||
|
||||
quote['season'] = season
|
||||
|
||||
quote['is_deleted'] = 'deleted' in section.keys()
|
||||
quote['deleted_section'] = section.get('deleted')
|
||||
|
||||
data.append(quote)
|
||||
|
||||
logger.info(f'Saving {len(data):,} quotes to algolia.json')
|
||||
save_file(os.path.join(DATA_DIR, 'algolia.json'), data, True)
|
||||
|
||||
|
||||
@build.command('character')
|
||||
def character():
|
||||
"""
|
||||
Uses algolia.json to build a characters.json file, a masterlist of quotes separated by the speaker.
|
||||
Speakers not considered 'main characters' are excluded from the list.
|
||||
This file also pulls information to build character descriptions and other relevant information.
|
||||
"""
|
||||
data = load_file(os.path.join(DATA_DIR, 'algolia.json'), True)
|
||||
descriptions = load_file(os.path.join(DATA_DIR, 'character_descriptions.json'), True)
|
||||
|
||||
key_list = [('speaker',), ('text',), ('season',), ('episode_rel', 'episode'), ('section_rel', 'scene'),
|
||||
('quote_rel', 'quote')]
|
||||
master = map(lambda item: algolia_transform(item, key_list), filter(lambda item: True, data))
|
||||
|
||||
# Separate the quotes based on speaker
|
||||
char_data = defaultdict(list)
|
||||
for quote in master:
|
||||
char_data[character_id(quote['speaker'])].append(quote)
|
||||
|
||||
final_data = {}
|
||||
for character, quotes in char_data.items():
|
||||
final_data[character] = {'quotes': quotes, 'summary': None, 'name': None}
|
||||
if character in descriptions.keys():
|
||||
for key in ['name', 'summary', 'actor']:
|
||||
final_data[character][key] = descriptions[character].get(key)
|
||||
|
||||
# Filter for main characters.
|
||||
main_characters = list(map(character_id, load_file(os.path.join(DATA_DIR, 'main_characters.json'), True)))
|
||||
for character in list(final_data.keys()):
|
||||
if character not in main_characters:
|
||||
del final_data[character]
|
||||
|
||||
# Save to characters.json
|
||||
save_file(os.path.join(DATA_DIR, 'characters.json'), final_data, True)
|
||||
|
||||
|
||||
@build.command('final')
|
||||
@click.option('-ss', '--silent-skip', is_flag=True, help='Skip existing files silently')
|
||||
@click.option('--process', 'process_', is_flag=True, help='Run processing before building final data.')
|
||||
def final(silent_skip: bool, process_: bool):
|
||||
"""Generates the latest application static data.json file, used by the backend API."""
|
||||
descriptions = load_file(os.path.join(DATA_DIR, 'descriptions.json'), True)
|
||||
seasons = [{'season_id': season, 'episodes': []} for season in range(1, 10)]
|
||||
|
||||
if process_:
|
||||
logger.info('Processing before building final.json')
|
||||
try:
|
||||
process(["--all"])
|
||||
except:
|
||||
pass
|
||||
|
||||
for season_id, episode_id in get_episodes():
|
||||
# Load data file
|
||||
try:
|
||||
episode_data = load_file(get_filepath(season_id, episode_id, 'processed'), True)
|
||||
except FileNotFoundError:
|
||||
if not silent_skip:
|
||||
logger.warning(
|
||||
f'No data for Season {season_id}, Episode {episode_id} available. Null data inserted.')
|
||||
episode_data = None
|
||||
|
||||
description = descriptions[season_id - 1][episode_id - 1]
|
||||
seasons[season_id - 1]['episodes'].append(
|
||||
{
|
||||
'title': description['title'].strip(),
|
||||
'description': description['description'].strip(),
|
||||
'episode_id': episode_id,
|
||||
'characters': get_appearances(season_id, episode_id),
|
||||
'scenes': episode_data
|
||||
}
|
||||
)
|
||||
|
||||
logger.info('Saving to data.json')
|
||||
save_file(os.path.join(DATA_DIR, 'data.json'), seasons, True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
@@ -1,42 +0,0 @@
|
||||
"""
|
||||
config.py
|
||||
|
||||
Stores all configurations used by the application from database URLs to Secret keys to extension settings.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
configs = {
|
||||
'development': 'server.config.DevelopmentConfig',
|
||||
'testing': 'server.config.TestingConfig',
|
||||
'production': 'server.config.ProductionConfig'
|
||||
}
|
||||
|
||||
|
||||
class Config:
|
||||
"""
|
||||
Base configuration.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class DevelopmentConfig(Config):
|
||||
"""
|
||||
Insecure and unrecommended config for use during development.
|
||||
"""
|
||||
SECRET_KEY = 'INSECURE'
|
||||
|
||||
|
||||
class TestingConfig(DevelopmentConfig):
|
||||
"""
|
||||
Configuration used for testing the application.
|
||||
"""
|
||||
TESTING = True
|
||||
WTF_CSRF_ENABLED = False
|
||||
|
||||
|
||||
class ProductionConfig(Config):
|
||||
"""
|
||||
Configuration used for running in secure production environment.
|
||||
"""
|
||||
SECRET_KEY = os.getenv('SECRET_KEY')
|
||||
@@ -1,50 +0,0 @@
|
||||
"""
|
||||
create_app.py
|
||||
|
||||
The create_app function used to create and initialize the app with all of it's extensions and settings.
|
||||
"""
|
||||
|
||||
from flask import Flask, render_template
|
||||
from flask_cors import CORS
|
||||
from flask_wtf.csrf import CSRFProtect
|
||||
|
||||
from server.config import configs
|
||||
|
||||
csrf = CSRFProtect()
|
||||
cors = CORS(resources={r'/api/*': {'origins': '*'}})
|
||||
|
||||
|
||||
def create_app(env=None):
|
||||
"""
|
||||
The create_app function used to create and initialize the app with all of it's extensions and settings.
|
||||
"""
|
||||
app = Flask(__name__,
|
||||
static_folder="./../dist/static",
|
||||
template_folder="./../dist"
|
||||
)
|
||||
|
||||
# Load configuration values
|
||||
if not env:
|
||||
env = app.config['ENV']
|
||||
app.config.from_object(configs[env])
|
||||
|
||||
# Initialize Flask extensions
|
||||
csrf.init_app(app)
|
||||
cors.init_app(app)
|
||||
|
||||
# CLI commands setup
|
||||
@app.shell_context_processor
|
||||
def shell_context():
|
||||
"""Provides specific Flask components to the shell."""
|
||||
return {'app': app}
|
||||
|
||||
with app.app_context():
|
||||
# noinspection PyUnresolvedReferences
|
||||
from server import api
|
||||
|
||||
@app.route('/', defaults={'path': ''})
|
||||
@app.route('/<path:path>')
|
||||
def catch_all(path):
|
||||
return render_template("index.html")
|
||||
|
||||
return app
|
||||
@@ -1,15 +0,0 @@
|
||||
"""
|
||||
data.py
|
||||
|
||||
Manages API quote/character data, caching static responses and reloading from disk.
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
with open(os.path.join(BASE_DIR, 'data', 'data.json'), 'r', encoding='utf-8') as file:
|
||||
data = json.load(file)
|
||||
|
||||
with open(os.path.join(BASE_DIR, 'data', 'characters.json'), 'r', encoding='utf-8') as file:
|
||||
character_data = json.load(file)
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
"""
|
||||
helpers.py
|
||||
|
||||
|
||||
"""
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import unicodedata
|
||||
from collections import OrderedDict
|
||||
from difflib import SequenceMatcher
|
||||
from heapq import nlargest as _nlargest
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import unidecode
|
||||
|
||||
episode_counts = [6, 22, 23, 14, 26, 24, 24, 24, 23]
|
||||
|
||||
|
||||
def check_validity(season: int, episode: int):
|
||||
"""Shorthand function for checking if a specific episode is valid."""
|
||||
return (1 <= season <= 9) and (1 <= episode <= episode_counts[season])
|
||||
|
||||
|
||||
def default(value, other):
|
||||
"""Value default, similar to dict.get, but better."""
|
||||
return value if value is not None else other
|
||||
|
||||
|
||||
def get_neighbors(array: List, index: int, distance: int = 2) -> Tuple[List, List]:
|
||||
"""Get neighbors above and below a specific index in an array. Returns maximum number of items possible."""
|
||||
top, below = [], []
|
||||
for i in range(1, distance + 1):
|
||||
top_index = index - i
|
||||
below_index = index + i
|
||||
if top_index >= 0:
|
||||
top.append(array[top_index])
|
||||
if below_index < len(array):
|
||||
below.append(array[below_index])
|
||||
return top[::-1], below
|
||||
|
||||
|
||||
def algolia_transform(old_dictionary: dict, key_list: List[Tuple[str, Optional[str]]]) -> dict:
|
||||
"""
|
||||
Transforms a dictionary object of a quote (from algolia.json) into a API-ready quote.
|
||||
Used for cli.character (i.e. characters.json)
|
||||
:param old_dictionary: The original Algolia dictionary
|
||||
:param key_list: A list of keys to keep in the dictionary in a tuple. One item tuple to keep the tuple's name, a
|
||||
second item requests a 'rename' for the quote.
|
||||
:return: The reformatted dictionary.
|
||||
"""
|
||||
|
||||
new_dictionary = {}
|
||||
for keyItem in key_list:
|
||||
if len(keyItem) > 1:
|
||||
new_dictionary[keyItem[1]] = old_dictionary[keyItem[0]]
|
||||
else:
|
||||
new_dictionary[keyItem[0]] = old_dictionary[keyItem[0]]
|
||||
|
||||
return new_dictionary
|
||||
|
||||
def character_id(name: str) -> str:
|
||||
return '-'.join(name.split(' ')).lower()
|
||||
|
||||
|
||||
alphabet: str = string.ascii_letters + string.digits
|
||||
|
||||
|
||||
def random_id(length: int = 8) -> str:
|
||||
"""Generate a random {length} character long string."""
|
||||
return ''.join(random.choices(alphabet, k=length))
|
||||
|
||||
|
||||
|
||||
@@ -1,124 +0,0 @@
|
||||
"""
|
||||
process.py
|
||||
|
||||
Functions and shortcuts for loading/saving/extracting data for processing quote data.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from math import ceil
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
import enlighten
|
||||
import requests
|
||||
|
||||
from server.helpers import character_id
|
||||
|
||||
session = requests.Session()
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
DATA_DIR = os.path.join(BASE_DIR, 'data')
|
||||
|
||||
folder_exts = {'html': 'html', 'processed': 'json', 'raw': 'txt'}
|
||||
episode_counts = [6, 22, 23, 14, 26, 24, 24, 24, 23]
|
||||
|
||||
|
||||
def get_filename(season: int, episode: int, extension: str) -> str:
|
||||
"""Get filename for any given episode in standardized format"""
|
||||
return f'{season}-{str(episode).zfill(2)}.{extension}'
|
||||
|
||||
|
||||
def get_filepath(season: int, episode: int, folder: str) -> str:
|
||||
"""Get full filepath for a episode's datafile for a given folder."""
|
||||
if folder:
|
||||
return os.path.join(DATA_DIR, folder, get_filename(season, episode, folder_exts.get(folder, 'json')))
|
||||
return os.path.join(DATA_DIR, get_filename(season, episode, 'json'))
|
||||
|
||||
|
||||
def load_file(filepath: str, json_decode: bool = False):
|
||||
"""Shortcut function for loading file from filepath, with JSON parsing flag."""
|
||||
if json_decode:
|
||||
with open(filepath, 'r', encoding='utf-8') as file:
|
||||
return json.load(file)
|
||||
else:
|
||||
with open(filepath, 'r', encoding='utf-8') as file:
|
||||
return file.read()
|
||||
|
||||
|
||||
def save_file(filepath: str, data, json_encode: bool):
|
||||
"""Shortcut function for saving data to a file, JSON encoding flag."""
|
||||
if json_encode:
|
||||
with open(filepath, 'w', encoding='utf-8') as file:
|
||||
json.dump(data, file, ensure_ascii=False, indent=4)
|
||||
else:
|
||||
with open(filepath, 'w', encoding='utf-8') as file:
|
||||
file.write(data)
|
||||
|
||||
|
||||
def get_episodes(season: int = None) -> Iterable[Tuple[int, int]]:
|
||||
"""
|
||||
Yields a list of Episode & Season tuples.
|
||||
If Season is specified, it yields
|
||||
"""
|
||||
if season:
|
||||
if 1 <= season <= 9:
|
||||
for episode in range(1, episode_counts[season - 1]):
|
||||
yield season, episode
|
||||
else:
|
||||
for season, ep_count in enumerate(episode_counts, start=1):
|
||||
for episode in range(1, ep_count + 1):
|
||||
yield season, episode
|
||||
|
||||
|
||||
def verify_episode(season: int, episode: int = None) -> bool:
|
||||
"""
|
||||
Verifies that specific Season and/or Episode is valid.
|
||||
"""
|
||||
return 1 <= season <= 9 and (episode is None or 1 <= episode <= episode_counts[season - 1])
|
||||
|
||||
|
||||
def sleep_from(wait_time: float, moment: float, manager: enlighten.Manager = None) -> float:
|
||||
"""
|
||||
Sleeps for a specific amount of time, accordingly to a previous moment.
|
||||
|
||||
:param wait_time: The minimum amount of time that must be waited since the specified moment.
|
||||
:param moment: Epoch time.
|
||||
:param manager: Progressbar Manager
|
||||
"""
|
||||
passed = time.time() - moment
|
||||
time_slept = wait_time - passed
|
||||
if time_slept > 0.01:
|
||||
if manager:
|
||||
time_slept = round(time_slept, 2)
|
||||
total, delay = ceil(time_slept * 100), time_slept / 100
|
||||
bar = manager.counter(total=total, desc='Sleeping...', leave=False)
|
||||
for _ in range(total):
|
||||
time.sleep(delay)
|
||||
bar.update()
|
||||
bar.close()
|
||||
else:
|
||||
time.sleep(time_slept)
|
||||
return time_slept
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
def get_appearances(season, episode) -> Optional[List[Dict[str, Union[int, str]]]]:
|
||||
"""
|
||||
Extracts all characters and their number of appearances from a specific episode.
|
||||
Prepared in a list of dictionary, preferable storage/for loop method.
|
||||
"""
|
||||
filepath = get_filepath(season, episode, 'processed')
|
||||
if not os.path.exists(filepath):
|
||||
return
|
||||
scenes = load_file(filepath, True)
|
||||
|
||||
characters = defaultdict(int)
|
||||
for scene in scenes:
|
||||
for quote in scene.get('quotes', []):
|
||||
characters[quote.get('speaker')] += 1
|
||||
characters = [{'name': character, 'appearances': appearances, 'id': character_id(character)}
|
||||
for character, appearances in characters.items()]
|
||||
return list(sorted(characters, key=lambda item: item['appearances'], reverse=True))
|
||||
Reference in New Issue
Block a user