diff --git a/phototag/helpers.py b/phototag/helpers.py index 84bbb15..41fb508 100644 --- a/phototag/helpers.py +++ b/phototag/helpers.py @@ -65,12 +65,12 @@ def get_temp_directory(directory: str, start: str = "temp", append_random: int = return temp -def convert_to_bytes(display_size: str) -> int: +def convert_to_bytes(size_string: str) -> int: """ Converts the string representation of data into it's integer byte equivalent. - :param display_size: A string representation of data size, a integer followed by 1-2 letters indicating unit. + :param size_string: A string representation of data size, a integer followed by 1-2 letters indicating unit. :return: The number of bytes the given string is equivalent to. """ - match = re.match(r"(\d+)\s*(\w{1,2})", display_size) + match = re.match(r"(\d+)\s*(\w{1,2})", size_string) return int(match.group(1)) * byte_magnitudes[match.group(2)] diff --git a/phototag/process.py b/phototag/process.py index 062f68a..39ce3bb 100644 --- a/phototag/process.py +++ b/phototag/process.py @@ -9,7 +9,7 @@ import logging import os import shutil from threading import Thread -from typing import Tuple, AnyStr, Optional, List, Dict +from typing import Tuple, AnyStr, Optional, List, Dict, Callable import imageio import iptcinfo3 @@ -18,7 +18,6 @@ from PIL import Image from google.cloud import vision from . import TEMP_PATH, INPUT_PATH, RAW_EXTS -from .config import config from .xmp import XMPParser log = logging.getLogger("process") @@ -29,7 +28,7 @@ class MasterFileProcessor(object): Controls FileProcessor objects in the context of threading according to configuration options. """ - def __init__(self, files: List[str], image_count: int, buffer_size: int, single_override: bool): + def __init__(self, files: List[str], image_count: int, buffer_size: int, single_override: bool, client=None): """ Initializes a MasterFileProcessor object. @@ -40,9 +39,10 @@ class MasterFileProcessor(object): """ self.files, self.image_count = files, image_count self.buffer_size, self.single_override = buffer_size, single_override + self.client = client if client is not None else vision.ImageAnnotatorClient() self.waiting: Dict[int, FileProcessor] = {} # FileProcessors that are ready to process, but are not. - self.running: Dict[int, Tuple[FileProcessor, Thread]] = {} # FileProcessors that are currently being processed in threads. + self.running: Dict[int, Tuple[FileProcessor, Thread]] = {} # FPs that are currently being processed in threads. self.finished: Dict[int, FileProcessor] = {} # FileProcessors that have finished processing. processors = [FileProcessor(file) for file in files] @@ -76,14 +76,44 @@ class MasterFileProcessor(object): """ Starts FileProcessor threads, loading zero or more threads simultaneously based on configuration options. """ + available: List[int] = list(self.waiting.keys()) + # At least 1 FP in queue and # of + while len(available) > 0 and len(self.running) < self.image_count: + # check that the smallest FP can fit in the specified buffer + if self.total_size + self.waiting[available[0]].size <= self.buffer_size: + self._start(available[0]) + else: + # Could not fit under limit, thus all the subsequent items in the queue will not either. + # Subsequent items will be added through the _finished() callback. + break + + # Ensure that at least 1 is in queue with single_override enabled + if self.single_override and len(self.running) == 0: + self._start(available[0]) + + def _start(self, key: int) -> None: + """ + Starts a new FileProcessor Thread, moving the FP and creating it's thread in the running dict. + + :param key: The integer key representing the FileProcessor being added. + """ + fp = self.waiting.pop(key) + thread = Thread( + target=fp.run, args=(self.client,), kwargs={'callback': lambda: self._finished(key)} + ) + thread.start() def _finished(self, key: int) -> None: """ Called when a FileProcessor's thread has finished. - :param int key: The FileProcessor's integer key in the running array. + :param int key: The FileProcessor's integer key in the running dict. """ - pass + # Remove the FileProcessor and the Thread fom the running dict. + fp, thread = self.running.pop(key) + self.finished[key] = fp + # Load FileProcessors if possible + self.load() @property def total_active(self) -> int: @@ -101,7 +131,7 @@ class MasterFileProcessor(object): :return: the total number of bytes the images in the buffer take up on the disk. """ - return sum(processor.size for processor in self.running.values()) + return sum(processor.size for processor, thread in self.running.values()) class FileProcessor(object): @@ -167,11 +197,12 @@ class FileProcessor(object): else: self._optimize(os.path.join(INPUT_PATH, self.file_name), copy=self.temp_file_path) - def run(self, client: vision.ImageAnnotatorClient) -> None: + def run(self, client: vision.ImageAnnotatorClient, callback: Callable = None) -> None: """ Optimize, find labels for and tag the file. :param client: The ImageAnnotatorClient to be used for interacting with the Google Vision API. + :param callback: Utility kwarg used for threading purposes. """ try: @@ -224,6 +255,7 @@ class FileProcessor(object): raise finally: self._cleanup() + callback() def _cleanup(self) -> None: """