Files
the-office/server/process.py

248 lines
8.2 KiB
Python

import json
import os
import re
import time
from collections import defaultdict
from math import ceil
from typing import Iterable, List, Tuple
import enlighten
import requests
from bs4 import BeautifulSoup
session = requests.Session()
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(BASE_DIR, 'data')
folder_exts = {'html': 'html', 'processed': 'json', 'raw': 'txt'}
episode_counts = [6, 22, 23, 14, 26, 24, 24, 24, 23]
def get_filename(season: int, episode: int, extension: str) -> str:
"""Get filename for any given episode in standardized format"""
return f'{season}-{str(episode).zfill(2)}.{extension}'
def get_filepath(season: int, episode: int, folder: str) -> str:
"""Get full filepath for a episode's datafile for a given folder."""
if folder:
return os.path.join(DATA_DIR, folder, get_filename(season, episode, folder_exts.get(folder, 'json')))
return os.path.join(DATA_DIR, get_filename(season, episode, 'json'))
def load_file(filepath: str, json_decode: bool):
"""Shortcut function for loading file from filepath, with JSON parsing flag."""
if json_decode:
with open(filepath, 'r', encoding='utf-8') as file:
return json.load(file)
else:
with open(filepath, 'r', encoding='utf-8') as file:
return file.read()
def save_file(filepath: str, data, json_encode: bool):
"""Shortcut function for saving data to a file, JSON encoding flag."""
if json_encode:
with open(filepath, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
else:
with open(filepath, 'w', encoding='utf-8') as file:
file.write(data)
def get_episodes(season: int = None) -> Iterable[Tuple[int, int]]:
"""
Yields a list of Episode & Season tuples.
If Season is specified, it yields
"""
if season:
if 1 <= season <= 9:
for episode in range(1, episode_counts[season - 1]):
yield season, episode
else:
for season, ep_count in enumerate(episode_counts, start=1):
for episode in range(1, ep_count + 1):
yield season, episode
def verify_episode(season: int, episode: int = None) -> bool:
"""
Verifies that a Season or Season + Episode is valid.
"""
return 1 <= season <= 9 and (episode is None or 1 <= episode <= episode_counts[season])
def sleep_from(wait_time: float, moment: float, manager: enlighten.Manager = None) -> float:
"""
Sleeps for a specific amount of time, accordingly to a previous moment.
:param wait_time: The minimum amount of time that must be waited since the specified moment.
:param moment: Epoch time.
:param manager: Progressbar Manager
"""
passed = time.time() - moment
time_slept = wait_time - passed
if time_slept > 0.01:
if manager:
time_slept = round(time_slept, 2)
total, delay = ceil(time_slept * 100), time_slept / 100
bar = manager.counter(total=total, desc='Sleeping...', leave=False)
for _ in range(total):
time.sleep(delay)
bar.update()
bar.close()
else:
time.sleep(time_slept)
return time_slept
else:
return 0
def preprocess(page_data: str) -> List[str]:
soup = BeautifulSoup(page_data, "html.parser")
data = []
sections = soup.find_all(attrs={"class": "quote"})
for section in sections:
for br in section.find_all('br'):
br.replace_with("\n" + br.text)
for line in section.get_text().split('\n'):
data.append(line.strip())
data.append('-')
data.pop(-1)
return data
def process(season, episode):
with open(os.path.join(DATA_DIR, 'raw', f'{season}-{str(episode).zfill(2)}.txt'), 'r',
encoding='utf-8') as file:
sections = []
for s in re.split('^-', file.read(), flags=re.MULTILINE):
section = {
'quotes': []
}
section_data = list(s.strip().split('\n'))
if section_data[0].startswith('!'):
section['deleted'] = int(re.search('!(\d+)', section_data.pop(0)).group(1))
for q in section_data:
quote = q.split('|', 1)
print(quote)
section['quotes'].append(
{
'speaker': quote[0],
'text': quote[1]
}
)
sections.append(section)
with open(os.path.join(DATA_DIR, 'processed', f'{season}-{str(episode).zfill(2)}.json'), 'w',
encoding='utf-8') as file:
json.dump(sections, file, indent=4, ensure_ascii=False)
deleted_count = [0, set()]
quote_count = 0
speakers = set()
for section in sections:
quote_count += len(section['quotes'])
if 'deleted' in section.keys():
deleted_count[0] += 1
deleted_count[1].add(section['deleted'])
for quote in section['quotes']:
speakers.add(quote['speaker'])
print(f'{quote_count} quotes.')
print(f'{deleted_count[0]} different deleted sections, {len(deleted_count[1])} unique.')
print(f'{len(speakers)} Speakers:')
print(', '.join(speakers))
def generate_algolia():
data = []
quote_num = 0
for season, episode in episodes():
try:
with open(os.path.join(DATA_DIR, 'processed', f'{season}-{str(episode).zfill(2)}.json'), 'r',
encoding='utf-8') as file:
episode_data = json.load(file)
except FileNotFoundError:
print(f'No JSON data for Season {season} Episode {episode}')
else:
for section_num, section in enumerate(episode_data, start=1):
for quote in section['quotes']:
quote_num += 1
quote['quote'] = quote_num
quote['section'] = section_num
quote['episode'] = episode
quote['season'] = season
quote['is_deleted'] = 'deleted' in section.keys()
quote['deleted_section'] = section.get('deleted')
data.append(quote)
with open(os.path.join(DATA_DIR, 'algolia.json'), 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
def get_episode_scenes(season, episode):
filepath = os.path.join(DATA_DIR, 'processed', f'{season}-{str(episode).zfill(2)}.json')
if os.path.exists(filepath):
with open(filepath, 'r', encoding='utf-8') as file:
return json.load(file)
else:
return None
def get_characters(season, episode):
scenes = get_episode_scenes(season, episode)
if scenes is None:
return None
characters = defaultdict(int)
for scene in scenes:
for quote in scene['quotes']:
characters[quote['speaker']] += 1
characters = [{'name': character, 'appearances': appearances, 'id': '-'.join(character.split(' ')).lower()}
for character, appearances in characters.items()]
return list(sorted(characters, key=lambda item: item['appearances'], reverse=True))
def generate_final():
"""Merge episode descriptions/titles and quotes into final JSON file."""
with open(os.path.join(DATA_DIR, 'descriptions.json'), 'r', encoding='utf-8') as file:
data = json.load(file)
output = []
for season_id, season in enumerate(data, start=1):
output.append({
'season_id': season_id,
'episodes': [
{
'title': episode['title'].strip(),
'description': episode['description'].strip(),
'episode_id': episode_id,
'characters': get_characters(season_id, episode_id),
'scenes': get_episode_scenes(season_id, episode_id)
}
for episode_id, episode in enumerate(season, start=1)
]
})
with open(os.path.join(DATA_DIR, 'data.json'), 'w', encoding='utf-8') as file:
json.dump(output, file, ensure_ascii=False, indent=4)
# generate_algolia()
# process(3, 10)
generate_final()