From 53a64cc32b7ecfc976cbb8a7a208a78bfc721417 Mon Sep 17 00:00:00 2001 From: Xevion Date: Sun, 22 May 2022 20:28:17 -0500 Subject: [PATCH] Add thumbnail & character generation with episode media refactor --- data/normalization/main.py | 172 ++++++++++++++++++++++++++++--------- 1 file changed, 131 insertions(+), 41 deletions(-) diff --git a/data/normalization/main.py b/data/normalization/main.py index c7ff1a8..fb7f7fd 100644 --- a/data/normalization/main.py +++ b/data/normalization/main.py @@ -1,4 +1,5 @@ import copy +import imghdr import json import logging import os @@ -33,11 +34,16 @@ RAW_DIR = os.path.join(CUR_DIR, 'raw') BUILD_DIR = os.path.join(CUR_DIR, 'build') IMG_DIR = os.path.join(CUR_DIR, 'img') IMG_EPISODES_DIR = os.path.join(IMG_DIR, 'episodes') +IMG_CHARACTERS_DIR = os.path.join(IMG_DIR, 'characters') RAW_FILES = os.listdir(RAW_DIR) EPISODE_COUNTS = [6, 22, 23, 14, 26, 24, 24, 24, 23] +def abslistdir(path: str) -> List[str]: + return [os.path.join(path, item) for item in os.listdir(path)] + + @click.group() def cli(): pass @@ -803,7 +809,7 @@ def app(path: str, make_dir: bool) -> None: @click.option('--thumbnail/--no-thumbnail', default=True, help='Complete the thumbnailing stage.') @click.argument('path', type=click.Path(file_okay=False)) def media(path: str, suppress: bool, copy: bool, thumbnail: bool) -> None: - def get_fullsize_args(input_path: str, output_path: str, geometry: str) -> List[str]: + def get_fullsize_args(input_path: str, output_path: str) -> List[str]: return ['magick', input_path, '-gravity', 'Center', @@ -815,7 +821,7 @@ def media(path: str, suppress: bool, copy: bool, thumbnail: bool) -> None: '-strip', output_path] - def get_thumbnailing_args(input_path: str, output_path: str, geometry: str) -> List[str]: + def get_thumbnailing_args(input_path: str, output_path: str, geometry: str = '156') -> List[str]: return [ 'magick', input_path, @@ -844,8 +850,11 @@ def media(path: str, suppress: bool, copy: bool, thumbnail: bool) -> None: logger.error('Both copy and thumbnail stages are disabled. Quitting early.') return - copy_operations: List[Tuple[str, str]] = [] - thumbnail_operations: List[Tuple[str, str]] = [] + with open(ConstantPaths.CHAR_DESC, 'r') as character_desc_file: + descriptions = json.load(character_desc_file) + + character_ids = list(descriptions.keys()) + operations: List[Tuple[str, str, List[str]]] = [] # /img/episode/03/04/full.jpeg all_episodes: List[Tuple[int, int]] = [(season + 1, episode + 1) for season in range(9) for episode in range(EPISODE_COUNTS[season])] @@ -854,7 +863,7 @@ def media(path: str, suppress: bool, copy: bool, thumbnail: bool) -> None: TimeElapsedColumn()) with progress: - for season, episode in progress.track(all_episodes, description='Finding images'): + for season, episode in progress.track(all_episodes, description='Preparing epsiode image operations...'): # Find what images are available, select the one with the lowest integer episode_dir = os.path.join(IMG_EPISODES_DIR, f'{season:02}', f'{episode:02}') if not os.path.exists(episode_dir): @@ -864,55 +873,136 @@ def media(path: str, suppress: bool, copy: bool, thumbnail: bool) -> None: input_path: str = os.path.join(episode_dir, images_available[0]) output_dir: str = os.path.abspath(os.path.join(path, f'{season:02}', f'{episode:02}')) + if not os.path.exists(output_dir): os.makedirs(output_dir) - output_copy_path: str = os.path.join(output_dir, 'full.jpeg') + + output_full_path: str = os.path.join(output_dir, 'full.jpeg') output_thumb_path: str = os.path.join(output_dir, 'thumbnail.jpeg') - if copy: copy_operations.append((input_path, output_copy_path)) - if thumbnail: thumbnail_operations.append((input_path, output_thumb_path)) + if copy: + args = get_fullsize_args(input_path, output_full_path) + operations.append((input_path, output_full_path, args)) + if thumbnail: + args = get_thumbnailing_args(input_path, output_thumb_path) + operations.append((input_path, output_thumb_path, args)) - logger.debug(f'Starting {len(copy_operations) + len(thumbnail_operations)} copy/thumbnailing operations.') + character_folders: List[str] = abslistdir(IMG_CHARACTERS_DIR) + filetype_preference: List[str] = ['jpeg', 'jpg', 'png', 'webp', 'gif', 'bmp'] + def select_by_preference(x: str) -> int: + """A simple function for sorting files by the preferred extensions.""" + extension = os.path.splitext(x)[1][1:] + try: + index: int = filetype_preference.index(extension) + return index + except ValueError: + return len(filetype_preference) + + with progress: + for character_folder in progress.track(character_folders, description='Preparing & selecting character images...'): + character_id = os.path.split(character_folder)[1] + if character_id not in character_ids: + logger.warning(f'"{character_id}" is not a valid character identifier. Please check the character list.') + continue + + character_files = os.listdir(character_folder) + character_files.sort(key=select_by_preference) + full_file: Optional[str] = None + face_file: Optional[str] = None + + # Find full file + for file in character_files: + if file.startswith('full'): + full_file = file + break + + if file.startswith('face'): + face_file = file + break + + # Check what was found, use the files found as best as possible. + if face_file is None: + if full_file is None: + if len(character_files) > 0: + for file in character_files: + file_path = os.path.join(character_folder, file) + filetype = imghdr.what(file_path) + logger.debug(f'[{character_id}] File: "{file}" -> Filetype: {filetype}') + if filetype in filetype_preference: + full_file = file + face_file = file + break + + if full_file is not None: + logger.warning(f'[{character_id}] No face nor full files could be located, but "{file}" has been selected automatically.') + else: + logger.warning(f'[{character_id}] Neither face nor full files could be located.') + else: + logger.warning(f'[{character_id}] No files available.') + else: + logger.debug(f'[{character_id}] Full file will be used as face file.') + face_file = full_file + elif full_file is None: + logger.debug(f'[{character_id}] Face file will be used as full file.') + full_file = face_file + + if full_file is None and face_file is None: + logger.warning(f'[{character_id}] Skipping due to no files found.') + continue + + if full_file is not None: + full_file = os.path.join(character_folder, full_file) + if face_file is not None: + face_file = os.path.join(character_folder, face_file) + + output_dir: str = os.path.abspath(os.path.join(path, character_id)) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + if copy: + output_full_path: str = os.path.join(output_dir, 'full.jpeg') + output_face_path: str = os.path.join(output_dir, 'face.jpeg') + + full_args = get_fullsize_args(full_file, output_full_path) + face_args = get_fullsize_args(face_file, output_face_path) + + operations.append((full_file, output_full_path, full_args)) + operations.append((face_file, output_face_path, face_args)) + + if thumbnail: + output_full_thumb_path: str = os.path.join(output_dir, 'full_thumb.jpeg') + output_face_thumb_path: str = os.path.join(output_dir, 'face_thumb.jpeg') + + full_thumb_args = get_thumbnailing_args(full_file, output_full_thumb_path) + face_thumb_args = get_thumbnailing_args(face_file, output_face_thumb_path) + + operations.append((full_file, output_full_thumb_path, full_thumb_args)) + operations.append((face_file, output_face_thumb_path, face_thumb_args)) + + logger.debug(f'Starting {len(operations)} operations.') sp_kwargs = {'capture_output': True, 'text': True} if suppress else {} - if copy: - with progress: - logger.debug('Beginning "smart copying"...') + with progress: + logger.debug('Beginning "smart copying"...') + task = progress.add_task(description='Wait...', total=len(operations)) - for input, output in progress.track(copy_operations, description='Smart copying...'): - sp_args = get_fullsize_args(input, output, '1440') - - try: - completed = subprocess.run(sp_args, **sp_kwargs, check=True) - except subprocess.CalledProcessError as e: - logger.error('Failed to process copy operation.', exc_info=e) - logger.error(f'Input: "{input}"') - logger.error(f'Output: "{output}"') - logger.error(f'Args: "{" ".join(sp_args)}"') - logger.error(f'Stdout: "{e.stdout.rstrip()}"') - logger.error(f'Stderr: "{e.stderr.rstrip()}"') - - if thumbnail: - with progress: - logger.debug('Beginning "thumbnailing"...') - - for input, output in progress.track(thumbnail_operations, description='Thumbnailing...'): - sp_args = get_thumbnailing_args(input, output, '156') - - try: - completed = subprocess.run(sp_args, **sp_kwargs, check=True) - except subprocess.CalledProcessError as e: - logger.error('Failed to process thumbnail operation.', exc_info=e) - logger.error(f'Input: "{input}"') - logger.error(f'Output: "{output}"') - logger.error(f'Args: "{" ".join(sp_args)}"') + for input, output, args in operations: + try: + rel_output = os.path.relpath(output, start=path) + progress.update(task, description=rel_output, advance=1) + completed = subprocess.run(args, **sp_kwargs, check=True) + except Exception as e: + logger.error('Failed to process operation.', exc_info=e) + logger.error(f'Input: "{input}"') + logger.error(f'Output: "{output}"') + logger.error(f'Args: "{" ".join(args)}"') + if type(e) is subprocess.CalledProcessError: logger.error(f'Stdout: "{e.stdout.rstrip()}"') logger.error(f'Stderr: "{e.stderr.rstrip()}"') with progress: - output_paths = [y for x, y in copy_operations] - output_paths.extend([y for x, y in thumbnail_operations]) + output_paths = [y for x, y, z in operations] file_sizes = [] for path in progress.track(output_paths, description='Acquiring sizes...'):