Add media processing command with ImageMagick thumbnailing & subprocess

This commit is contained in:
Xevion
2022-05-21 12:02:38 -05:00
parent 2c058f7840
commit 06d7880bab
91 changed files with 214 additions and 1 deletions

View File

@@ -4,6 +4,7 @@ import logging
import os
import re
import shutil
import subprocess
from collections import Counter, OrderedDict
from typing import Any, Dict, List, Optional, Tuple, Union
@@ -785,7 +786,7 @@ def app(path: str, make_dir: bool) -> None:
json.dump(character_data, character_file)
# Ensure character folder exists before writing files
if not os.path.lexists(character_folder):
if not os.path.exists(character_folder):
os.makedirs(character_folder)
for id, data in character_data.items():
@@ -794,5 +795,127 @@ def app(path: str, make_dir: bool) -> None:
json.dump(data, file)
@build.command('media')
@click.option('--suppress/--no-suppress', default=True, help='Disable stdout suppression for image magick commandline output.')
@click.option('--copy/--no-copy', default=True, help='Complete the copying stage.')
@click.option('--thumbnail/--no-thumbnail', default=True, help='Complete the thumbnailing stage.')
@click.argument('path', type=click.Path(file_okay=False))
def media(path: str, suppress: bool, copy: bool, thumbnail: bool) -> None:
def get_fullsize_args(input_path: str, output_path: str, geometry: str) -> List[str]:
return ['magick',
input_path,
'-gravity', 'Center',
'-crop', '1:1+0+0',
'+repage',
'-quality', '95',
'-interlace', 'none',
'-colorspace', 'sRGB',
'-strip',
output_path]
def get_thumbnailing_args(input_path: str, output_path: str, geometry: str) -> List[str]:
return [
'magick',
input_path,
'-gravity', 'Center',
'-crop', '1:1+0+0',
'+repage',
'-filter', 'Triangle',
'-define', 'filter:support=2',
'-thumbnail', geometry,
'-unsharp', '0.25x0.25+8+0.065',
'-dither', 'None',
'-posterize', '136',
'-quality', '82',
'-define', 'jpeg:fancy-upsampling=off',
'-define', 'png:compression-filter=5',
'-define', 'png:compression-level=9',
'-define', 'png:compression-strategy=1',
'-define', 'png:exclude-chunk=all',
'-interlace', 'none',
'-colorspace', 'sRGB',
'-strip',
output_path,
]
if not (copy or thumbnail):
logger.error('Both copy and thumbnail stages are disabled. Quitting early.')
return
copy_operations: List[Tuple[str, str]] = []
thumbnail_operations: List[Tuple[str, str]] = []
# /img/episode/03/04/full.jpeg
all_episodes: List[Tuple[int, int]] = [(season + 1, episode + 1) for season in range(9) for episode in range(EPISODE_COUNTS[season])]
progress = Progress(SpinnerColumn('dots10'), *Progress.get_default_columns(), MofNCompleteColumn(),
TimeElapsedColumn())
with progress:
for season, episode in progress.track(all_episodes, description='Finding images'):
# Find what images are available, select the one with the lowest integer
episode_dir = os.path.join(IMG_EPISODES_DIR, f'{season:02}', f'{episode:02}')
if not os.path.exists(episode_dir):
os.makedirs(episode_dir)
images_available = os.listdir(episode_dir)
images_available.sort(key=lambda x: int(x.split('.')[0]))
input_path: str = os.path.join(episode_dir, images_available[0])
output_dir: str = os.path.abspath(os.path.join(path, f'{season:02}', f'{episode:02}'))
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_copy_path: str = os.path.join(output_dir, 'full.jpeg')
output_thumb_path: str = os.path.join(output_dir, 'thumbnail.jpeg')
if copy: copy_operations.append((input_path, output_copy_path))
if thumbnail: thumbnail_operations.append((input_path, output_thumb_path))
logger.debug(f'Starting {len(copy_operations) + len(thumbnail_operations)} copy/thumbnailing operations.')
sp_kwargs = {'capture_output': True, 'text': True} if suppress else {}
if copy:
with progress:
logger.debug('Beginning "smart copying"...')
for input, output in progress.track(copy_operations, description='Smart copying...'):
sp_args = get_fullsize_args(input, output, '1440')
try:
completed = subprocess.run(sp_args, **sp_kwargs, check=True)
except subprocess.CalledProcessError as e:
logger.error('Failed to process copy operation.', exc_info=e)
logger.error(f'Input: "{input}"')
logger.error(f'Output: "{output}"')
logger.error(f'Args: "{" ".join(sp_args)}"')
logger.error(f'Stdout: "{e.stdout.rstrip()}"')
logger.error(f'Stderr: "{e.stderr.rstrip()}"')
if thumbnail:
with progress:
logger.debug('Beginning "thumbnailing"...')
for input, output in progress.track(thumbnail_operations, description='Thumbnailing...'):
sp_args = get_thumbnailing_args(input, output, '156')
try:
completed = subprocess.run(sp_args, **sp_kwargs, check=True)
except subprocess.CalledProcessError as e:
logger.error('Failed to process thumbnail operation.', exc_info=e)
logger.error(f'Input: "{input}"')
logger.error(f'Output: "{output}"')
logger.error(f'Args: "{" ".join(sp_args)}"')
logger.error(f'Stdout: "{e.stdout.rstrip()}"')
logger.error(f'Stderr: "{e.stderr.rstrip()}"')
with progress:
output_paths = [y for x, y in copy_operations]
output_paths.extend([y for x, y in thumbnail_operations])
file_sizes = []
for path in progress.track(output_paths, description='Acquiring sizes...'):
file_sizes.append(os.stat(path).st_size)
if __name__ == '__main__':
cli()