diff --git a/phototag/__init__.py b/phototag/__init__.py index 91b3c70..9bcdcd7 100644 --- a/phototag/__init__.py +++ b/phototag/__init__.py @@ -14,7 +14,7 @@ from . import config # noinspection PyArgumentList logging.basicConfig( - format='%(message)s', + format='[bold deep_pink2]%(threadName)s[/bold deep_pink2] %(message)s', level=logging.ERROR, handlers=[RichHandler(markup=True, rich_tracebacks=True)] ) diff --git a/phototag/app.py b/phototag/app.py index bb9a62c..b3ca572 100644 --- a/phototag/app.py +++ b/phototag/app.py @@ -43,7 +43,7 @@ def run(): BarColumn(bar_width=None), "[progress.percentage]{task.percentage:>3.0f}%", ) as progress: - mp = MasterFileProcessor(select, 3, convert_to_bytes("128 MB"), True, client=client, progress=progress) + mp = MasterFileProcessor(select, 3, convert_to_bytes("1280 KB"), True, client=client, progress=progress) logger.info('MasterFileProcessor created.') mp.load() logger.info('Finished loading/starting initial threads.') diff --git a/phototag/process.py b/phototag/process.py index d9f16c3..c9a11d0 100644 --- a/phototag/process.py +++ b/phototag/process.py @@ -11,7 +11,7 @@ import os import random import shutil import time -from threading import Thread +from threading import Thread, Lock from typing import Tuple, AnyStr, Optional, List, Dict, Callable import imageio @@ -52,6 +52,8 @@ class MasterFileProcessor(object): self.running: Dict[int, Tuple[FileProcessor, Thread]] = {} # FPs that are currently being processed in threads. self.finished: Dict[int, FileProcessor] = {} # FileProcessors that have finished processing. + self.lock = Lock() + self.progress = progress self.tasks = [ progress.add_task("[blue]Waiting", total=len(self.files), completed=len(self.files)), @@ -59,14 +61,18 @@ class MasterFileProcessor(object): progress.add_task("[green]Finished", total=len(self.files)) ] if self.progress else None self.previous_state = [len(self.files), 0, 0] + if self.progress: + logger.debug(f'Progress tasks created. Task IDs: {self.tasks}') processors = [FileProcessor(file) for file in files] - processors.sort(key=lambda fp: fp.size) + processors.sort(key=lambda processor: processor.size) for index, fp in enumerate(processors): self.waiting[index] = fp + logger.debug('FileProcessors created & sorted, index keys assigned.') self._precheck() + logger.debug('Precheck passed.') def _precheck(self) -> None: """ @@ -94,9 +100,12 @@ class MasterFileProcessor(object): :param key: The integer key representing the FileProcessor being added. """ fp = self.waiting.pop(key) - thread = Thread(target=fp.run, args=(self.client,), kwargs={'callback': lambda: self._finished(key)}) + logger.debug(f'Claimed FileProcessor {key} from queue.') + thread = Thread(name=f'FP-{key}', target=fp.run, args=(self.client,), + kwargs={'callback': lambda: self._finished(key)}) self.running[key] = (fp, thread) thread.start() + logger.info(f'FileProcessor {key}\'s Thread created and started.') def _finished(self, key: int) -> None: """ @@ -107,6 +116,7 @@ class MasterFileProcessor(object): # Remove the FileProcessor and the Thread fom the running dict. fp, thread = self.running.pop(key) self.finished[key] = fp + logger.info(f'FileProcessor {key} ("{fp.file_name}") has finished.') # Load FileProcessors if possible self.load() @@ -134,27 +144,35 @@ class MasterFileProcessor(object): """ self._update_tasks() - available: List[int] = list(self.waiting.keys()) - if len(available) == 0: - return + if not self.lock.acquire(False): + pass + else: + try: + available: List[int] = list(self.waiting.keys()) + if len(available) == 0: + return - # At least 1 FP in queue and # of - while len(available) > 0 and len(self.running) < self.image_count: - # check that the smallest FP can fit in the specified buffer - if self.total_size + self.waiting[available[0]].size <= self.buffer_size: + logger.debug(f'Looking for FileProcessors to load ({len(available)} available)...') + + # At least 1 FP in queue and # of + while len(available) > 0 and len(self.running) < self.image_count: + logger.debug(f'Trying to claim FileProcessor {available[0]}') + # check that the smallest FP can fit in the specified buffer + if self.total_size + self.waiting[available[0]].size <= self.buffer_size: + self._start(available.pop(0)) + else: + # Could not fit under limit, thus all the subsequent items in the queue will not either. + # Subsequent items will be added through the _finished() callback. + break + finally: + self.lock.release() + + # Ensure that at least 1 is in queue with single_override enabled + if self.single_override and len(available) != 0 and len(self.running) == 0: self._start(available.pop(0)) - else: - # Could not fit under limit, thus all the subsequent items in the queue will not either. - # Subsequent items will be added through the _finished() callback. - break - - # Ensure that at least 1 is in queue with single_override enabled - if self.single_override and len(available) != 0 and len(self.running) == 0: - self._start(available.pop(0)) self._update_tasks() - def join(self) -> None: """ Joins running threads continuously until none are left. @@ -169,7 +187,7 @@ class MasterFileProcessor(object): def _update_tasks(self) -> None: """ - If a rich.Progress[bar] was provided, the tasks in it shall be updated accordingly. + If a rich.Progress[bar] was provided in __init__, the tasks in it will be updated accordingly. """ # Return immediately if progressbar was not supplied if not self.progress: @@ -268,15 +286,15 @@ class FileProcessor(object): # Performs label detection on the image file # response = client.label_detection(image=image) # labels = [label.description for label in response.label_annotations] - # time.sleep(random.random()) + time.sleep(random.random() * 3) labels = [random_characters(8) for _ in range(random.randint(4, 20))] - logger.info("Keywords Identified: {}".format(", ".join( - [f'[cyan]{label}[/cyan]' for label in labels] - ))) + logger.info("{} Keywords Identified: {}".format( + len(labels), ", ".join([f'[cyan]{label}[/cyan]' for label in labels]) + )) # XMP sidecar file specified, write to it using XML module if self.xmp: - logger.info(f"Writing {len(labels)} tags to output XMP.") + logger.debug(f"Writing {len(labels)} tags to output XMP.") parser = XMPParser(self.input_xmp) parser.add_keywords(labels) @@ -294,7 +312,7 @@ class FileProcessor(object): # No XMP file is specified, using IPTC tagging else: - logger.info("Writing {} tags to image IPTC".format(len(labels))) + logger.debug(f"Writing {len(labels)} tags to image IPTC") info = iptcinfo3.IPTCInfo(os.path.join(INPUT_PATH, self.file_name)) info["keywords"].extend(labels) info.save()