fixed threading issues with proper locking mechanism, add thread name to logging messages, minor logging message changes

This commit is contained in:
Xevion
2020-08-29 03:00:25 -05:00
parent c6821b96d3
commit fd1c5b7698
3 changed files with 46 additions and 28 deletions

View File

@@ -14,7 +14,7 @@ from . import config
# noinspection PyArgumentList
logging.basicConfig(
format='%(message)s',
format='[bold deep_pink2]%(threadName)s[/bold deep_pink2] %(message)s',
level=logging.ERROR,
handlers=[RichHandler(markup=True, rich_tracebacks=True)]
)

View File

@@ -43,7 +43,7 @@ def run():
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.0f}%",
) as progress:
mp = MasterFileProcessor(select, 3, convert_to_bytes("128 MB"), True, client=client, progress=progress)
mp = MasterFileProcessor(select, 3, convert_to_bytes("1280 KB"), True, client=client, progress=progress)
logger.info('MasterFileProcessor created.')
mp.load()
logger.info('Finished loading/starting initial threads.')

View File

@@ -11,7 +11,7 @@ import os
import random
import shutil
import time
from threading import Thread
from threading import Thread, Lock
from typing import Tuple, AnyStr, Optional, List, Dict, Callable
import imageio
@@ -52,6 +52,8 @@ class MasterFileProcessor(object):
self.running: Dict[int, Tuple[FileProcessor, Thread]] = {} # FPs that are currently being processed in threads.
self.finished: Dict[int, FileProcessor] = {} # FileProcessors that have finished processing.
self.lock = Lock()
self.progress = progress
self.tasks = [
progress.add_task("[blue]Waiting", total=len(self.files), completed=len(self.files)),
@@ -59,14 +61,18 @@ class MasterFileProcessor(object):
progress.add_task("[green]Finished", total=len(self.files))
] if self.progress else None
self.previous_state = [len(self.files), 0, 0]
if self.progress:
logger.debug(f'Progress tasks created. Task IDs: {self.tasks}')
processors = [FileProcessor(file) for file in files]
processors.sort(key=lambda fp: fp.size)
processors.sort(key=lambda processor: processor.size)
for index, fp in enumerate(processors):
self.waiting[index] = fp
logger.debug('FileProcessors created & sorted, index keys assigned.')
self._precheck()
logger.debug('Precheck passed.')
def _precheck(self) -> None:
"""
@@ -94,9 +100,12 @@ class MasterFileProcessor(object):
:param key: The integer key representing the FileProcessor being added.
"""
fp = self.waiting.pop(key)
thread = Thread(target=fp.run, args=(self.client,), kwargs={'callback': lambda: self._finished(key)})
logger.debug(f'Claimed FileProcessor {key} from queue.')
thread = Thread(name=f'FP-{key}', target=fp.run, args=(self.client,),
kwargs={'callback': lambda: self._finished(key)})
self.running[key] = (fp, thread)
thread.start()
logger.info(f'FileProcessor {key}\'s Thread created and started.')
def _finished(self, key: int) -> None:
"""
@@ -107,6 +116,7 @@ class MasterFileProcessor(object):
# Remove the FileProcessor and the Thread fom the running dict.
fp, thread = self.running.pop(key)
self.finished[key] = fp
logger.info(f'FileProcessor {key} ("{fp.file_name}") has finished.')
# Load FileProcessors if possible
self.load()
@@ -134,27 +144,35 @@ class MasterFileProcessor(object):
"""
self._update_tasks()
available: List[int] = list(self.waiting.keys())
if len(available) == 0:
return
if not self.lock.acquire(False):
pass
else:
try:
available: List[int] = list(self.waiting.keys())
if len(available) == 0:
return
# At least 1 FP in queue and # of
while len(available) > 0 and len(self.running) < self.image_count:
# check that the smallest FP can fit in the specified buffer
if self.total_size + self.waiting[available[0]].size <= self.buffer_size:
logger.debug(f'Looking for FileProcessors to load ({len(available)} available)...')
# At least 1 FP in queue and # of
while len(available) > 0 and len(self.running) < self.image_count:
logger.debug(f'Trying to claim FileProcessor {available[0]}')
# check that the smallest FP can fit in the specified buffer
if self.total_size + self.waiting[available[0]].size <= self.buffer_size:
self._start(available.pop(0))
else:
# Could not fit under limit, thus all the subsequent items in the queue will not either.
# Subsequent items will be added through the _finished() callback.
break
finally:
self.lock.release()
# Ensure that at least 1 is in queue with single_override enabled
if self.single_override and len(available) != 0 and len(self.running) == 0:
self._start(available.pop(0))
else:
# Could not fit under limit, thus all the subsequent items in the queue will not either.
# Subsequent items will be added through the _finished() callback.
break
# Ensure that at least 1 is in queue with single_override enabled
if self.single_override and len(available) != 0 and len(self.running) == 0:
self._start(available.pop(0))
self._update_tasks()
def join(self) -> None:
"""
Joins running threads continuously until none are left.
@@ -169,7 +187,7 @@ class MasterFileProcessor(object):
def _update_tasks(self) -> None:
"""
If a rich.Progress[bar] was provided, the tasks in it shall be updated accordingly.
If a rich.Progress[bar] was provided in __init__, the tasks in it will be updated accordingly.
"""
# Return immediately if progressbar was not supplied
if not self.progress:
@@ -268,15 +286,15 @@ class FileProcessor(object):
# Performs label detection on the image file
# response = client.label_detection(image=image)
# labels = [label.description for label in response.label_annotations]
# time.sleep(random.random())
time.sleep(random.random() * 3)
labels = [random_characters(8) for _ in range(random.randint(4, 20))]
logger.info("Keywords Identified: {}".format(", ".join(
[f'[cyan]{label}[/cyan]' for label in labels]
)))
logger.info("{} Keywords Identified: {}".format(
len(labels), ", ".join([f'[cyan]{label}[/cyan]' for label in labels])
))
# XMP sidecar file specified, write to it using XML module
if self.xmp:
logger.info(f"Writing {len(labels)} tags to output XMP.")
logger.debug(f"Writing {len(labels)} tags to output XMP.")
parser = XMPParser(self.input_xmp)
parser.add_keywords(labels)
@@ -294,7 +312,7 @@ class FileProcessor(object):
# No XMP file is specified, using IPTC tagging
else:
logger.info("Writing {} tags to image IPTC".format(len(labels)))
logger.debug(f"Writing {len(labels)} tags to image IPTC")
info = iptcinfo3.IPTCInfo(os.path.join(INPUT_PATH, self.file_name))
info["keywords"].extend(labels)
info.save()