Add thumbnail & character generation with episode media refactor

This commit is contained in:
Xevion
2022-05-22 20:28:17 -05:00
parent 5c92c44bd6
commit 53a64cc32b

View File

@@ -1,4 +1,5 @@
import copy
import imghdr
import json
import logging
import os
@@ -33,11 +34,16 @@ RAW_DIR = os.path.join(CUR_DIR, 'raw')
BUILD_DIR = os.path.join(CUR_DIR, 'build')
IMG_DIR = os.path.join(CUR_DIR, 'img')
IMG_EPISODES_DIR = os.path.join(IMG_DIR, 'episodes')
IMG_CHARACTERS_DIR = os.path.join(IMG_DIR, 'characters')
RAW_FILES = os.listdir(RAW_DIR)
EPISODE_COUNTS = [6, 22, 23, 14, 26, 24, 24, 24, 23]
def abslistdir(path: str) -> List[str]:
return [os.path.join(path, item) for item in os.listdir(path)]
@click.group()
def cli():
pass
@@ -803,7 +809,7 @@ def app(path: str, make_dir: bool) -> None:
@click.option('--thumbnail/--no-thumbnail', default=True, help='Complete the thumbnailing stage.')
@click.argument('path', type=click.Path(file_okay=False))
def media(path: str, suppress: bool, copy: bool, thumbnail: bool) -> None:
def get_fullsize_args(input_path: str, output_path: str, geometry: str) -> List[str]:
def get_fullsize_args(input_path: str, output_path: str) -> List[str]:
return ['magick',
input_path,
'-gravity', 'Center',
@@ -815,7 +821,7 @@ def media(path: str, suppress: bool, copy: bool, thumbnail: bool) -> None:
'-strip',
output_path]
def get_thumbnailing_args(input_path: str, output_path: str, geometry: str) -> List[str]:
def get_thumbnailing_args(input_path: str, output_path: str, geometry: str = '156') -> List[str]:
return [
'magick',
input_path,
@@ -844,8 +850,11 @@ def media(path: str, suppress: bool, copy: bool, thumbnail: bool) -> None:
logger.error('Both copy and thumbnail stages are disabled. Quitting early.')
return
copy_operations: List[Tuple[str, str]] = []
thumbnail_operations: List[Tuple[str, str]] = []
with open(ConstantPaths.CHAR_DESC, 'r') as character_desc_file:
descriptions = json.load(character_desc_file)
character_ids = list(descriptions.keys())
operations: List[Tuple[str, str, List[str]]] = []
# /img/episode/03/04/full.jpeg
all_episodes: List[Tuple[int, int]] = [(season + 1, episode + 1) for season in range(9) for episode in range(EPISODE_COUNTS[season])]
@@ -854,7 +863,7 @@ def media(path: str, suppress: bool, copy: bool, thumbnail: bool) -> None:
TimeElapsedColumn())
with progress:
for season, episode in progress.track(all_episodes, description='Finding images'):
for season, episode in progress.track(all_episodes, description='Preparing epsiode image operations...'):
# Find what images are available, select the one with the lowest integer
episode_dir = os.path.join(IMG_EPISODES_DIR, f'{season:02}', f'{episode:02}')
if not os.path.exists(episode_dir):
@@ -864,55 +873,136 @@ def media(path: str, suppress: bool, copy: bool, thumbnail: bool) -> None:
input_path: str = os.path.join(episode_dir, images_available[0])
output_dir: str = os.path.abspath(os.path.join(path, f'{season:02}', f'{episode:02}'))
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_copy_path: str = os.path.join(output_dir, 'full.jpeg')
output_full_path: str = os.path.join(output_dir, 'full.jpeg')
output_thumb_path: str = os.path.join(output_dir, 'thumbnail.jpeg')
if copy: copy_operations.append((input_path, output_copy_path))
if thumbnail: thumbnail_operations.append((input_path, output_thumb_path))
if copy:
args = get_fullsize_args(input_path, output_full_path)
operations.append((input_path, output_full_path, args))
if thumbnail:
args = get_thumbnailing_args(input_path, output_thumb_path)
operations.append((input_path, output_thumb_path, args))
logger.debug(f'Starting {len(copy_operations) + len(thumbnail_operations)} copy/thumbnailing operations.')
character_folders: List[str] = abslistdir(IMG_CHARACTERS_DIR)
filetype_preference: List[str] = ['jpeg', 'jpg', 'png', 'webp', 'gif', 'bmp']
def select_by_preference(x: str) -> int:
"""A simple function for sorting files by the preferred extensions."""
extension = os.path.splitext(x)[1][1:]
try:
index: int = filetype_preference.index(extension)
return index
except ValueError:
return len(filetype_preference)
with progress:
for character_folder in progress.track(character_folders, description='Preparing & selecting character images...'):
character_id = os.path.split(character_folder)[1]
if character_id not in character_ids:
logger.warning(f'"{character_id}" is not a valid character identifier. Please check the character list.')
continue
character_files = os.listdir(character_folder)
character_files.sort(key=select_by_preference)
full_file: Optional[str] = None
face_file: Optional[str] = None
# Find full file
for file in character_files:
if file.startswith('full'):
full_file = file
break
if file.startswith('face'):
face_file = file
break
# Check what was found, use the files found as best as possible.
if face_file is None:
if full_file is None:
if len(character_files) > 0:
for file in character_files:
file_path = os.path.join(character_folder, file)
filetype = imghdr.what(file_path)
logger.debug(f'[{character_id}] File: "{file}" -> Filetype: {filetype}')
if filetype in filetype_preference:
full_file = file
face_file = file
break
if full_file is not None:
logger.warning(f'[{character_id}] No face nor full files could be located, but "{file}" has been selected automatically.')
else:
logger.warning(f'[{character_id}] Neither face nor full files could be located.')
else:
logger.warning(f'[{character_id}] No files available.')
else:
logger.debug(f'[{character_id}] Full file will be used as face file.')
face_file = full_file
elif full_file is None:
logger.debug(f'[{character_id}] Face file will be used as full file.')
full_file = face_file
if full_file is None and face_file is None:
logger.warning(f'[{character_id}] Skipping due to no files found.')
continue
if full_file is not None:
full_file = os.path.join(character_folder, full_file)
if face_file is not None:
face_file = os.path.join(character_folder, face_file)
output_dir: str = os.path.abspath(os.path.join(path, character_id))
if not os.path.exists(output_dir):
os.makedirs(output_dir)
if copy:
output_full_path: str = os.path.join(output_dir, 'full.jpeg')
output_face_path: str = os.path.join(output_dir, 'face.jpeg')
full_args = get_fullsize_args(full_file, output_full_path)
face_args = get_fullsize_args(face_file, output_face_path)
operations.append((full_file, output_full_path, full_args))
operations.append((face_file, output_face_path, face_args))
if thumbnail:
output_full_thumb_path: str = os.path.join(output_dir, 'full_thumb.jpeg')
output_face_thumb_path: str = os.path.join(output_dir, 'face_thumb.jpeg')
full_thumb_args = get_thumbnailing_args(full_file, output_full_thumb_path)
face_thumb_args = get_thumbnailing_args(face_file, output_face_thumb_path)
operations.append((full_file, output_full_thumb_path, full_thumb_args))
operations.append((face_file, output_face_thumb_path, face_thumb_args))
logger.debug(f'Starting {len(operations)} operations.')
sp_kwargs = {'capture_output': True, 'text': True} if suppress else {}
if copy:
with progress:
logger.debug('Beginning "smart copying"...')
with progress:
logger.debug('Beginning "smart copying"...')
task = progress.add_task(description='Wait...', total=len(operations))
for input, output in progress.track(copy_operations, description='Smart copying...'):
sp_args = get_fullsize_args(input, output, '1440')
try:
completed = subprocess.run(sp_args, **sp_kwargs, check=True)
except subprocess.CalledProcessError as e:
logger.error('Failed to process copy operation.', exc_info=e)
logger.error(f'Input: "{input}"')
logger.error(f'Output: "{output}"')
logger.error(f'Args: "{" ".join(sp_args)}"')
logger.error(f'Stdout: "{e.stdout.rstrip()}"')
logger.error(f'Stderr: "{e.stderr.rstrip()}"')
if thumbnail:
with progress:
logger.debug('Beginning "thumbnailing"...')
for input, output in progress.track(thumbnail_operations, description='Thumbnailing...'):
sp_args = get_thumbnailing_args(input, output, '156')
try:
completed = subprocess.run(sp_args, **sp_kwargs, check=True)
except subprocess.CalledProcessError as e:
logger.error('Failed to process thumbnail operation.', exc_info=e)
logger.error(f'Input: "{input}"')
logger.error(f'Output: "{output}"')
logger.error(f'Args: "{" ".join(sp_args)}"')
for input, output, args in operations:
try:
rel_output = os.path.relpath(output, start=path)
progress.update(task, description=rel_output, advance=1)
completed = subprocess.run(args, **sp_kwargs, check=True)
except Exception as e:
logger.error('Failed to process operation.', exc_info=e)
logger.error(f'Input: "{input}"')
logger.error(f'Output: "{output}"')
logger.error(f'Args: "{" ".join(args)}"')
if type(e) is subprocess.CalledProcessError:
logger.error(f'Stdout: "{e.stdout.rstrip()}"')
logger.error(f'Stderr: "{e.stderr.rstrip()}"')
with progress:
output_paths = [y for x, y in copy_operations]
output_paths.extend([y for x, y in thumbnail_operations])
output_paths = [y for x, y, z in operations]
file_sizes = []
for path in progress.track(output_paths, description='Acquiring sizes...'):