diff --git a/main.py b/main.py deleted file mode 100644 index 9fd911a..0000000 --- a/main.py +++ /dev/null @@ -1,7 +0,0 @@ -import sys, os -from package import app - -os.environ["GOOGLE_APPLICATION_CREDENTIALS"]=os.path.join(sys.path[0], 'package', 'key', 'photo_tagging_service.json') - -if __name__ == "__main__": - sys.exit(app.run()) \ No newline at end of file diff --git a/package/__init__.py b/package/__init__.py new file mode 100644 index 0000000..37cb1e2 --- /dev/null +++ b/package/__init__.py @@ -0,0 +1,31 @@ +import os +import sys +import logging +import progressbar + +# Logging and Progressbar work +progressbar.streams.wrap_stderr() +logging.basicConfig(level=logging.INFO) +log = logging.getLogger('init') +log.info('Progressbar/Logging ready.') + + +# Path Constants +ROOT = os.path.dirname(os.path.realpath(__file__)) +INPUT_PATH = ROOT +TEMP_PATH = os.path.join(ROOT, 'temp') +OUTPUT_PATH = os.path.join(ROOT, 'output') +log.info('Path Constants Built.') + +# Extension Constants +RAW_EXTS = [ + "3fr", "ari", "arw", "bay", "braw", "crw", + "cr2", "cr3", "cap", "data", "dcs", "dcr", + "dng", "drf", "eip", "erf", "fff", "gpr", + "iiq", "k25", "kdc", "mdc", "mef", "mos", + "mrw", "nef", "nrw", "obm", "orf", "pef", + "ptx", "pxn", "r3d", "raf", "raw", "rwl", + "rw2", "rwz", "sr2", "srf", "srw", "tif", + "x3f", +] +LOSSY_EXTS = ["jpeg", "jpg", "png"] \ No newline at end of file diff --git a/package/__main__.py b/package/__main__.py new file mode 100644 index 0000000..0cf84d7 --- /dev/null +++ b/package/__main__.py @@ -0,0 +1,13 @@ +import os +import logging + +from . import INPUT_PATH, OUTPUT_PATH + +# Ensure that 'input' and 'output' directories are created +if not os.path.exists(INPUT_PATH): + logging.fatal('Input directory did not exist, creating and quitting.') + os.makedirs(INPUT_PATH) + +if not os.path.exists(OUTPUT_PATH): + logging.info('Output directory did not exist. Creating...') + os.makedirs(OUTPUT_PATH) \ No newline at end of file diff --git a/package/app.py b/package/app.py index d3e3381..cb0216e 100644 --- a/package/app.py +++ b/package/app.py @@ -1,171 +1,55 @@ -import io, sys, os, time, rawpy, imageio, progressbar, shutil, iptcinfo3 -from google.cloud.vision import types +import io +import sys +import os +import time +import rawpy +import imageio +import progressbar +import shutil +import logging + from google.cloud import vision -from package import xmp -from PIL import Image -# The name of the image file to annotate -input_path = os.path.join(sys.path[0], 'package', 'processing', 'input') -temp_path = os.path.join(sys.path[0], 'package', 'processing', 'temp') -output_path = os.path.join(sys.path[0], 'package', 'processing', 'output') +from .xmp import XMPParser +from .process import FileProcessor +from . import INPUT_PATH, TEMP_PATH, OUTPUT_PATH +from . import RAW_EXTS, LOSSY_EXTS -# Process a single file in these steps: -# 1) Create a temporary file -# 2) Send it to GoogleAPI -# 3) Read XMP, then write new tags to it -# 4) Delete temporary file, move NEF/JPEG and XMP +log = logging.getLogger('app') -def process_file(file_name, xmp_name=None): - global client - - # Remove the temporary file - def _cleanup(): - if os.path.exists(temp_file_path): - # Deletes the temporary file - os.remove(temp_file_path) - - # Get the size of the file. Is concerned with filesize type. 1024KiB -> 1MiB - def _size(file_path): - size, type = os.path.getsize(file_path) / 1024, 'KiB' - if size >= 1024: size /= 1024; type = 'MiB' - return round(size, 2), type - - # Optimizes a file using JPEG thumbnailing and compression. - def _optimize(file_path, size=(512, 512), quality=85, copy=None): - image = Image.open(file_path) - image.thumbnail(size, resample=Image.ANTIALIAS) - if copy: - image.save(copy, format='jpeg', optimize=True, quality=quality) - else: - image.save(file_path, format='jpeg', optimize=True, quality=quality) - - base, ext = os.path.splitext(file_name) - temp_file_path = os.path.join(temp_path, base + '.jpeg') - - try: - if xmp_name: - # Process the file into a JPEG - rgb = rawpy.imread(os.path.join(input_path, file_name)) - imageio.imsave(temp_file_path, rgb.postprocess()) - rgb.close() - - # Information on file sizes - print("Raw Size: {} {}".format(*_size(os.path.join(input_path, file_name))), end=' | ') - print("Resave Size: {} {}".format(*_size(temp_file_path)), end=' | ') - pre = os.path.getsize(temp_file_path) - _optimize(temp_file_path) - post = os.path.getsize(temp_file_path) - print("Optimized Size: {} {} ({}% savings)".format(*_size(temp_file_path), round((1.0 - (post / pre)) * 100), 2) ) - else: - pre = os.path.getsize(os.path.join(input_path, file_name)) - _optimize(os.path.join(input_path, file_name), copy=temp_file_path) - post = os.path.getsize(temp_file_path) - print("Optimized Size: {} {} ({}% savings)".format(*_size(temp_file_path), round((1.0 - (post / pre)) * 100), 2) ) - - # Open the image, read as bytes, convert to types Image - image = Image.open(temp_file_path) - bytesIO = io.BytesIO() - image.save(bytesIO, format='jpeg') - image.close() - image = vision.types.Image(content=bytesIO.getvalue()) - - # Performs label detection on the image file - response = client.label_detection(image=image) - labels = [label.description for label in response.label_annotations] - print('\tLabels: {}'.format(', '.join(labels))) - - # XMP sidecar file specified, write to it using XML module - if xmp_name: - print('\tWriting {} tags to output XMP...'.format(len(labels))) - parser = xmp.XMPParser(os.path.join(input_path, xmp_name)) - parser.add_keywords(labels) - # Save the new XMP file - parser.save(os.path.join(output_path, xmp_name)) - # Remove the old XMP file - os.remove(os.path.join(input_path, xmp_name)) - # No XMP file is specified, using IPTC tagging - else: - print('\tWriting {} tags to output {}'.format(len(labels), ext[1:].upper())) - info = iptcinfo3.IPTCInfo(os.path.join(input_path, file_name)) - info['keywords'].extend(labels) - info.save() - # Remove the weird ghsot file created by this iptc read/writer. - os.remove(os.path.join(input_path, file_name + '~')) - - # Copy dry-run - # shutil.copy2(os.path.join(input_path, file_name), os.path.join(output_path, file_name)) - os.rename(os.path.join(input_path, file_name), os.path.join(output_path, file_name)) - except: - _cleanup() - raise - _cleanup() - -# Driver code for the package def run(): - global client - - # Ensure that 'input' and 'output' directories are created - if not os.path.exists(input_path): - print('Input directory did not exist, creating and quitting.') - os.makedirs(input_path) - return - - if not os.path.exists(output_path): - print('Output directory did not exist. Creating...') - os.makedirs(output_path) - - # Clients client = vision.ImageAnnotatorClient() # Find files we want to process based on if they have a corresponding .XMP - files = os.listdir(input_path) - select = [file for file in files if os.path.splitext(file)[1] != '.xmp'] + log.info('Locating processable files...') + files = os.listdir(INPUT_PATH) + select = [file for file in files if os.path.splitext(file)[1][1:].lower() in (RAW_EXTS + LOSSY_EXTS)] + log.info(f'Found {len(select)} valid files') # Create the 'temp' directory - print(f'Initializing file processing for {len(select)} files...') - os.makedirs(temp_path) - + if not os.path.exists(TEMP_PATH): + log.info('Creating temporary processing directory') + os.makedirs(TEMP_PATH) + if not os.path.exists(OUTPUT_PATH): + log.info('Creating output processing directory') + os.makedirs(OUTPUT_PATH) + try: # Process files for index, file in progressbar.progressbar(list(enumerate(select)), redirect_stdout=True, term_width=110): - name, ext = os.path.splitext(file) - ext = ext.upper() - # Raw files contain their metadata in an XMP file usually - if ext in ['.NEF', '.CR2']: - # Get all possible files - identicals = [possible for possible in files - if possible.startswith(os.path.splitext(file)[0]) - and not possible.endswith(os.path.splitext(file)[1]) - and not possible.upper().endswith('.XMP')] - - # Alert the user that there are duplicates in the directory and ask whether or not to continue - if len(identicals) > 0: - print('Identical files were found in the directory, continue?') - print(',\n\t'.join(identicals)) - - xmps = [possible for possible in files - if possible.startswith(os.path.splitext(file)[0]) - and possible.upper().endswith('.XMP')] - - # Skip and warn if more than 1 possible files, user error - if len(xmps) > 1: - print('More than 1 possible XMP metadata file for \'{}\'...'.format(file)) - print(',\n'.join(['\t{}'.format(possible) for possible in xmps])) - # Zero possible files, user error, likely - elif len(xmps) <= 0: - print('No matching XMP metadata file for \'{}\'. skipping...'.format(file)) - # Process individual file - else: - print('Processing file {}, \'{}\''.format(index + 1, xmps[0]), end=' | ') - process_file(file_name=file, xmp_name=xmps[0]) - elif ext in ['.JPEG', '.JPG', '.PNG']: - print('Processing file {}, \'{}\''.format(index + 1, file), end=' | ') - process_file(file_name=file) - - except: - os.rmdir(temp_path) + _, ext = os.path.splitext(file) + ext = ext[1:].lower() + if ext in LOSSY_EXTS or ext in RAW_EXTS: + process = FileProcessor(file) + log.info(f"Processing file '{file}'...") + process.run(client) + except Exception as error: + log.error(str(error)) + log.warning( + 'Removing temporary directory before raising exception.') + os.rmdir(TEMP_PATH) raise # Remove the directory, we are done here - print('Cleaning up temporary directory...') - os.rmdir(temp_path) \ No newline at end of file + log.info('Removing temporary directory.') + os.rmdir(TEMP_PATH) diff --git a/package/process.py b/package/process.py new file mode 100644 index 0000000..b1a0bc2 --- /dev/null +++ b/package/process.py @@ -0,0 +1,99 @@ +import os +import sys +import rawpy +import imageio +import io +import iptcinfo3 +import logging +from PIL import Image +from google.cloud.vision import types +from google.cloud import vision + +from . import TEMP_PATH, INPUT_PATH, OUTPUT_PATH, RAW_EXTS, LOSSY_EXTS +from .xmp import XMPParser + +log = logging.getLogger('process') + +class FileProcessor(object): + def __init__(self, file_name: str): + self.file_name = file_name + self.base, self.ext = os.path.splitext(self.file_name) + self.ext = self.ext[1:] + # Path to temporary file that will be optimized for upload to Google + self.temp_file_path = os.path.join(TEMP_PATH, self.base + '.jpeg') + # Decide whether a XMP file is available + self.xmp = None + if self.ext.lower() in RAW_EXTS: + self.xmp = self.base + '.xmp' + self.input_xmp = os.path.join(INPUT_PATH, self.xmp) + self.output_xmp = os.path.join(OUTPUT_PATH, self.xmp) + if not os.path.exists(self.input_xmp): + raise Exception('Sidecar file for \'{}\' does not exist.'.format(self.xmp)) + + # Optimizes a file using JPEG thumbnailing and compression. + def _optimize(self, file: str, size: tuple = (512, 512), quality : int = 85, copy : str = None): + image = Image.open(file) + image.thumbnail(size, resample=Image.ANTIALIAS) + if copy: + image.save(copy, format='jpeg', optimize=True, quality=quality) + else: + image.save(file, format='jpeg', optimize=True, quality=quality) + + def optimize(self): + if self.xmp: + # Long runn + rgb = rawpy.imread(os.path.join(INPUT_PATH, self.file_name)) + imageio.imsave(self.temp_file_path, rgb.postprocess()) + rgb.close() + self._optimize(self.temp_file_path) + else: + self._optimize(os.path.join( + INPUT_PATH, self.file_name), copy=self.temp_file_path) + + def run(self, client: vision.ImageAnnotatorClient): + try: + self.optimize() + + # Open the image, read as bytes, convert to types Image + image = Image.open(self.temp_file_path) + bytesIO = io.BytesIO() + image.save(bytesIO, format='jpeg') + image.close() + image = vision.types.Image(content=bytesIO.getvalue()) + + # Performs label detection on the image file + response = client.label_detection(image=image) + labels = [label.description for label in response.label_annotations] + log.info('Keywords Identified: {}'.format(', '.join(labels))) + + # XMP sidecar file specified, write to it using XML module + if self.xmp: + log.info('Writing {} tags to output XMP.'.format(len(labels))) + parser = XMPParser(self.input_xmp) + parser.add_keywords(labels) + # Save the new XMP file + log.debug('Saving to new XMP file.') + parser.save(self.output_xmp) + log.debug('Removing old XMP file.') + os.remove(self.input_xmp) + # No XMP file is specified, using IPTC tagging + else: + log.info('Writing {} tags to image IPTC'.format(len(labels))) + info = iptcinfo3.IPTCInfo(os.path.join(INPUT_PATH, self.file_name)) + info['keywords'].extend(labels) + info.save() + # Remove the weird ghsot file created by this iptc read/writer. + os.remove(os.path.join(INPUT_PATH, self.file_name + '~')) + + # Copy dry-run + # shutil.copy2(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name)) + os.rename(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name)) + except: + self._cleanup() + raise + self._cleanup() + + # Remove the temporary file (if it exists) + def _cleanup(self): + if os.path.exists(self.temp_file_path): + os.remove(self.temp_file_path) diff --git a/phototag.py b/phototag.py new file mode 100644 index 0000000..4812820 --- /dev/null +++ b/phototag.py @@ -0,0 +1,20 @@ +import sys +import os +import logging +import click +from package import app + +log = logging.getLogger('main') + +os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.join( + sys.path[0], 'package', 'key', 'photo_tagging_service.json') + + +@click.command() +def cli(): + log.info('Executing package...') + sys.exit(app.run()) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..66d763b --- /dev/null +++ b/setup.py @@ -0,0 +1,40 @@ +import sys +import os +import io +from setuptools import find_packages, setup + +DEPENDENCIES = ['Click'] +EXCLUDE_FROM_PACKAGES = [] +CURDIR = sys.path[0] + +with open(os.path.join(CURDIR, 'README.md')) as file: + README = file.read() + +setup( + name="phototag", + version="1.0.0", + author="Xevion", + author_email="xevion@xevion.dev", + description="", + long_description=README, + long_description_content_type="text/markdown", + url="https://github.com/xevion/photo-tagging", + packages=find_packages(exclude=EXCLUDE_FROM_PACKAGES), + include_package_data=True, + keywords=[], + scripts=[], + entry_points=''' + [console_scripts] + phototag=phototag.phototag:cli + ''', + zip_safe=False, + install_requires=DEPENDENCIES, + python_requires=">=3.6", + # license and classifier list: + # https://pypi.org/pypi?%3Aaction=list_classifiers + license="License :: OSI Approved :: GNU General Public License v3 (GPLv3)", + classifiers=[ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", + ], +) \ No newline at end of file