From f267a43a26aebc291a2df65aa9831390edf9d510 Mon Sep 17 00:00:00 2001 From: Xevion Date: Fri, 1 Nov 2019 23:03:31 -0500 Subject: [PATCH] exception handling -> printing, xmp identification and handling --- package/app.py | 18 +++++++++++------- package/process.py | 43 ++++++++++++++++++++++++------------------- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/package/app.py b/package/app.py index 26ebbaf..043194a 100644 --- a/package/app.py +++ b/package/app.py @@ -21,33 +21,37 @@ from . import RAW_EXTS, LOSSY_EXTS # 3) Read XMP, then write new tags to it # 4) Delete temporary file, move NEF/JPEG and XMP -# Driver code for the package + + def run(): - # Client client = vision.ImageAnnotatorClient() # Find files we want to process based on if they have a corresponding .XMP logging.info('Locating processable files...') files = os.listdir(INPUT_PATH) select = [file for file in files if os.path.splitext(file)[1] != '.xmp'] + logging.info(f'Found {len(files)} valid files') # Create the 'temp' directory - logging.info(f'Found {len(files)} valid files, beginning processing...') + logging.info('Creating temporary processing directory') os.makedirs(TEMP_PATH) try: # Process files for index, file in progressbar.progressbar(list(enumerate(select)), redirect_stdout=True, term_width=110): _, ext = os.path.splitext(file) + ext = ext[1:] if ext in LOSSY_EXTS or ext in RAW_EXTS: - logging.info(f"Processing file '{file}'...") file = FileProcessor(file) + logging.info(f"Processing file '{file}'...") file.run(client) - except: - logging.warning('Removing temporary directory before raising exception.') + except Exception as error: + logging.error(str(error)) + logging.warning( + 'Removing temporary directory before raising exception.') os.rmdir(TEMP_PATH) raise # Remove the directory, we are done here logging.info('Removing temporary directory.') - os.rmdir(TEMP_PATH) \ No newline at end of file + os.rmdir(TEMP_PATH) diff --git a/package/process.py b/package/process.py index fd96af4..599d164 100644 --- a/package/process.py +++ b/package/process.py @@ -12,19 +12,20 @@ from google.cloud import vision from . import TEMP_PATH, INPUT_PATH, OUTPUT_PATH from .xmp import XMPParser + class FileProcessor(object): - def __init__(self, file_name : str): + def __init__(self, file_name: str): self.file_name = file_name - self.base, self.ext = os.path.splitext(self.file_name) # fileNAME and fileEXTENSIOn + self.base, self.ext = os.path.splitext( + self.file_name) # fileNAME and fileEXTENSIOn # Path to temporary file that will be optimized for upload to Google self.temp_file_path = os.path.join(TEMP_PATH, self.base + '.jpeg') - - @property - def hasXMP(self): - return self.ext.lower() == 'xmp' - + # Decide whether a XMP file is available + self.xmp = os.path.join(INPUT_PATH, base + '.xmp') + self.xmp = self.xmp if os.path.exists(self.xmp) else None + # Optimizes a file using JPEG thumbnailing and compression. - def _optimize(self, file : str, size : tuple =(512, 512), quality=85, copy=None): + def _optimize(self, file: str, size: tuple = (512, 512), quality : int = 85, copy : str = None): image = Image.open(file) image.thumbnail(size, resample=Image.ANTIALIAS) if copy: @@ -40,12 +41,13 @@ class FileProcessor(object): rgb.close() self._optimize(self.temp_file_path) else: - self._optimize(os.path.join(INPUT_PATH, self.file_name), copy=self.temp_file_path) + self._optimize(os.path.join( + INPUT_PATH, self.file_name), copy=self.temp_file_path) - def run(self, client : vision.ImageAnnotatorClient): + def run(self, client: vision.ImageAnnotatorClient): try: - if self.hasXMP: - self.optimize() + + self.optimize() # Open the image, read as bytes, convert to types Image image = Image.open(self.temp_file_path) @@ -53,15 +55,16 @@ class FileProcessor(object): image.save(bytesIO, format='jpeg') image.close() image = vision.types.Image(content=bytesIO.getvalue()) - + # Performs label detection on the image file response = client.label_detection(image=image) labels = [label.description for label in response.label_annotations] print('\tLabels: {}'.format(', '.join(labels))) - + # XMP sidecar file specified, write to it using XML module - if self.xmp_name: - logging.info('\tWriting {} tags to output XMP.'.format(len(labels))) + if self.hasXMP: + logging.info( + '\tWriting {} tags to output XMP.'.format(len(labels))) parser = XMPParser(os.path.join(INPUT_PATH, self.xmp_name)) parser.add_keywords(labels) # Save the new XMP file @@ -71,8 +74,10 @@ class FileProcessor(object): os.remove(os.path.join(INPUT_PATH, self.xmp_name)) # No XMP file is specified, using IPTC tagging else: - logging.info('\tWriting {} tags to image IPTC'.format(len(labels))) - info = iptcinfo3.IPTCInfo(os.path.join(INPUT_PATH, self.file_name)) + logging.info( + '\tWriting {} tags to image IPTC'.format(len(labels))) + info = iptcinfo3.IPTCInfo( + os.path.join(INPUT_PATH, self.file_name)) info['keywords'].extend(labels) info.save() # Remove the weird ghsot file created by this iptc read/writer. @@ -88,4 +93,4 @@ class FileProcessor(object): # Remove the temporary file (if it exists) def _cleanup(self): if os.path.exists(self.temp_file_path): - os.remove(self.temp_file_path) \ No newline at end of file + os.remove(self.temp_file_path)