From 45a2b415207b39c13cd7ae7319be2edd26e107ae Mon Sep 17 00:00:00 2001 From: Xevion Date: Fri, 28 Aug 2020 16:35:44 -0500 Subject: [PATCH] implement MasterFileProcessor in app.py, implement MasterFileProcessor thread loading and joining (tested), fix FileProcessor size property not using correct path --- phototag/app.py | 39 +++++++++++------------ phototag/process.py | 77 +++++++++++++++++++++++++++------------------ 2 files changed, 65 insertions(+), 51 deletions(-) diff --git a/phototag/app.py b/phototag/app.py index 2384cff..cba6150 100644 --- a/phototag/app.py +++ b/phototag/app.py @@ -6,15 +6,18 @@ Main app function file for running the program, delegating the tagging operation import logging import os -from threading import Thread +import colored_traceback from google.cloud import vision from . import INPUT_PATH, TEMP_PATH -from .helpers import valid_extension, get_extension -from .process import FileProcessor +from .helpers import valid_extension, get_extension, convert_to_bytes +from .process import MasterFileProcessor -log = logging.getLogger("app") +logger = logging.getLogger("app") +logger.setLevel(logging.DEBUG) + +colored_traceback.add_hook() def run(): @@ -25,33 +28,27 @@ def run(): select = list(filter(lambda file: valid_extension(get_extension(file)), files)) if len(select) == 0: - log.fatal("No valid files located.") + logger.fatal("No valid files located.") return else: - log.info(f"Found {len(select)} valid files") + logger.info(f"Found {len(select)} valid files") # Create the 'temp' directory if not os.path.exists(TEMP_PATH): - log.info("Creating temporary processing directory") + logger.info("Creating temporary processing directory") os.makedirs(TEMP_PATH) try: - # Process files with threading - processors = [FileProcessor(file) for file in select] - threads = [Thread(target=process.run, args=(client,)) for process in processors] - - # Start threads for each file - for i, thread in enumerate(threads): - log.info(f"Processing file '{processors[i].file_name}'...") - thread.start() - - # Wait for each thread to complete before stopping - for thread in threads: - thread.join() + mp = MasterFileProcessor(select, 8, convert_to_bytes("128 MB"), True, client=client) + logger.info('MasterFileProcessor created.') + mp.load() + logger.info('Finished loading/starting initial threads.') + mp.join() + logger.info('Finished joining threads, now quitting.') except Exception as error: - log.error(str(error)) + logger.error(str(error)) raise finally: os.rmdir(TEMP_PATH) - log.info("Temporary directory removed.") + logger.info("Temporary directory removed.") diff --git a/phototag/process.py b/phototag/process.py index 39ce3bb..2b65973 100644 --- a/phototag/process.py +++ b/phototag/process.py @@ -1,13 +1,15 @@ """ process.py -Holds the FileProcessor object, used for working with images in order to label and edit/tag their metadata. +Holds the majority of the file processing logic, including processing for individual files, as well as +logic for queued threading for dozens of files in parallel. """ import io import logging import os import shutil +from pprint import pprint from threading import Thread from typing import Tuple, AnyStr, Optional, List, Dict, Callable @@ -20,7 +22,8 @@ from google.cloud import vision from . import TEMP_PATH, INPUT_PATH, RAW_EXTS from .xmp import XMPParser -log = logging.getLogger("process") +logger = logging.getLogger("process") +logger.setLevel(logging.DEBUG) class MasterFileProcessor(object): @@ -72,25 +75,6 @@ class MasterFileProcessor(object): raise Exception("Invalid Configuration - the image_count is too low. Please set it to a positive " "non-zero integer or enable single_override.") - def load(self) -> None: - """ - Starts FileProcessor threads, loading zero or more threads simultaneously based on configuration options. - """ - available: List[int] = list(self.waiting.keys()) - # At least 1 FP in queue and # of - while len(available) > 0 and len(self.running) < self.image_count: - # check that the smallest FP can fit in the specified buffer - if self.total_size + self.waiting[available[0]].size <= self.buffer_size: - self._start(available[0]) - else: - # Could not fit under limit, thus all the subsequent items in the queue will not either. - # Subsequent items will be added through the _finished() callback. - break - - # Ensure that at least 1 is in queue with single_override enabled - if self.single_override and len(self.running) == 0: - self._start(available[0]) - def _start(self, key: int) -> None: """ Starts a new FileProcessor Thread, moving the FP and creating it's thread in the running dict. @@ -98,9 +82,8 @@ class MasterFileProcessor(object): :param key: The integer key representing the FileProcessor being added. """ fp = self.waiting.pop(key) - thread = Thread( - target=fp.run, args=(self.client,), kwargs={'callback': lambda: self._finished(key)} - ) + thread = Thread(target=fp.run, args=(self.client,), kwargs={'callback': lambda: self._finished(key)}) + self.running[key] = (fp, thread) thread.start() def _finished(self, key: int) -> None: @@ -133,6 +116,40 @@ class MasterFileProcessor(object): """ return sum(processor.size for processor, thread in self.running.values()) + def load(self) -> None: + """ + Starts FileProcessor threads, loading zero or more threads simultaneously based on configuration options. + """ + available: List[int] = list(self.waiting.keys()) + if len(available) == 0: + return + + # At least 1 FP in queue and # of + while len(available) > 0 and len(self.running) < self.image_count: + # check that the smallest FP can fit in the specified buffer + if self.total_size + self.waiting[available[0]].size <= self.buffer_size: + self._start(available.pop(0)) + else: + # Could not fit under limit, thus all the subsequent items in the queue will not either. + # Subsequent items will be added through the _finished() callback. + break + + # Ensure that at least 1 is in queue with single_override enabled + if self.single_override and len(available) != 0 and len(self.running) == 0: + self._start(available.pop(0)) + + def join(self) -> None: + """ + Joins running threads continuously until none are left. + """ + while True: + threads = [thread for fp, thread in self.running.values()] + for thread in threads: + thread.join() + + if len(self.running) == 0 and len(self.waiting) == 0: + break + class FileProcessor(object): """ @@ -218,11 +235,11 @@ class FileProcessor(object): # Performs label detection on the image file response = client.label_detection(image=image) labels = [label.description for label in response.label_annotations] - log.info("Keywords Identified: {}".format(", ".join(labels))) + logger.info("Keywords Identified: {}".format(", ".join(labels))) # XMP sidecar file specified, write to it using XML module if self.xmp: - log.info(f"Writing {len(labels)} tags to output XMP.") + logger.info(f"Writing {len(labels)} tags to output XMP.") parser = XMPParser(self.input_xmp) parser.add_keywords(labels) @@ -236,11 +253,11 @@ class FileProcessor(object): parser.save(self.input_xmp) # save the new file shutil.copystat(temp_name, self.input_xmp) # copy file metadata over os.remove(temp_name) # remove the renamed original file - log.debug("New XMP file saved with original file metadata. Old XMP file removed.") + logger.debug("New XMP file saved with original file metadata. Old XMP file removed.") # No XMP file is specified, using IPTC tagging else: - log.info("Writing {} tags to image IPTC".format(len(labels))) + logger.info("Writing {} tags to image IPTC".format(len(labels))) info = iptcinfo3.IPTCInfo(os.path.join(INPUT_PATH, self.file_name)) info["keywords"].extend(labels) info.save() @@ -267,8 +284,8 @@ class FileProcessor(object): @property def size(self) -> int: """ - Returns the size of the image in bytes that this FileProcessor objet shadows. + Returns the size of the image in bytes that this FileProcessor object shadows. :return: the number of bytes the shadowed image takes up on the disk """ - return os.path.getsize(self.file_name) + return os.path.getsize(os.path.join(INPUT_PATH, self.file_name))