implement threading / queue management logic for MasterFileProcessor, FileProcessor.run() callback addition for threading, minor helper var name changes

This commit is contained in:
Xevion
2020-08-28 15:24:54 -05:00
parent bb88a218fd
commit 6e91f0f928
2 changed files with 43 additions and 11 deletions

View File

@@ -65,12 +65,12 @@ def get_temp_directory(directory: str, start: str = "temp", append_random: int =
return temp
def convert_to_bytes(display_size: str) -> int:
def convert_to_bytes(size_string: str) -> int:
"""
Converts the string representation of data into it's integer byte equivalent.
:param display_size: A string representation of data size, a integer followed by 1-2 letters indicating unit.
:param size_string: A string representation of data size, a integer followed by 1-2 letters indicating unit.
:return: The number of bytes the given string is equivalent to.
"""
match = re.match(r"(\d+)\s*(\w{1,2})", display_size)
match = re.match(r"(\d+)\s*(\w{1,2})", size_string)
return int(match.group(1)) * byte_magnitudes[match.group(2)]

View File

@@ -9,7 +9,7 @@ import logging
import os
import shutil
from threading import Thread
from typing import Tuple, AnyStr, Optional, List, Dict
from typing import Tuple, AnyStr, Optional, List, Dict, Callable
import imageio
import iptcinfo3
@@ -18,7 +18,6 @@ from PIL import Image
from google.cloud import vision
from . import TEMP_PATH, INPUT_PATH, RAW_EXTS
from .config import config
from .xmp import XMPParser
log = logging.getLogger("process")
@@ -29,7 +28,7 @@ class MasterFileProcessor(object):
Controls FileProcessor objects in the context of threading according to configuration options.
"""
def __init__(self, files: List[str], image_count: int, buffer_size: int, single_override: bool):
def __init__(self, files: List[str], image_count: int, buffer_size: int, single_override: bool, client=None):
"""
Initializes a MasterFileProcessor object.
@@ -40,9 +39,10 @@ class MasterFileProcessor(object):
"""
self.files, self.image_count = files, image_count
self.buffer_size, self.single_override = buffer_size, single_override
self.client = client if client is not None else vision.ImageAnnotatorClient()
self.waiting: Dict[int, FileProcessor] = {} # FileProcessors that are ready to process, but are not.
self.running: Dict[int, Tuple[FileProcessor, Thread]] = {} # FileProcessors that are currently being processed in threads.
self.running: Dict[int, Tuple[FileProcessor, Thread]] = {} # FPs that are currently being processed in threads.
self.finished: Dict[int, FileProcessor] = {} # FileProcessors that have finished processing.
processors = [FileProcessor(file) for file in files]
@@ -76,14 +76,44 @@ class MasterFileProcessor(object):
"""
Starts FileProcessor threads, loading zero or more threads simultaneously based on configuration options.
"""
available: List[int] = list(self.waiting.keys())
# At least 1 FP in queue and # of
while len(available) > 0 and len(self.running) < self.image_count:
# check that the smallest FP can fit in the specified buffer
if self.total_size + self.waiting[available[0]].size <= self.buffer_size:
self._start(available[0])
else:
# Could not fit under limit, thus all the subsequent items in the queue will not either.
# Subsequent items will be added through the _finished() callback.
break
# Ensure that at least 1 is in queue with single_override enabled
if self.single_override and len(self.running) == 0:
self._start(available[0])
def _start(self, key: int) -> None:
"""
Starts a new FileProcessor Thread, moving the FP and creating it's thread in the running dict.
:param key: The integer key representing the FileProcessor being added.
"""
fp = self.waiting.pop(key)
thread = Thread(
target=fp.run, args=(self.client,), kwargs={'callback': lambda: self._finished(key)}
)
thread.start()
def _finished(self, key: int) -> None:
"""
Called when a FileProcessor's thread has finished.
:param int key: The FileProcessor's integer key in the running array.
:param int key: The FileProcessor's integer key in the running dict.
"""
pass
# Remove the FileProcessor and the Thread fom the running dict.
fp, thread = self.running.pop(key)
self.finished[key] = fp
# Load FileProcessors if possible
self.load()
@property
def total_active(self) -> int:
@@ -101,7 +131,7 @@ class MasterFileProcessor(object):
:return: the total number of bytes the images in the buffer take up on the disk.
"""
return sum(processor.size for processor in self.running.values())
return sum(processor.size for processor, thread in self.running.values())
class FileProcessor(object):
@@ -167,11 +197,12 @@ class FileProcessor(object):
else:
self._optimize(os.path.join(INPUT_PATH, self.file_name), copy=self.temp_file_path)
def run(self, client: vision.ImageAnnotatorClient) -> None:
def run(self, client: vision.ImageAnnotatorClient, callback: Callable = None) -> None:
"""
Optimize, find labels for and tag the file.
:param client: The ImageAnnotatorClient to be used for interacting with the Google Vision API.
:param callback: Utility kwarg used for threading purposes.
"""
try:
@@ -224,6 +255,7 @@ class FileProcessor(object):
raise
finally:
self._cleanup()
callback()
def _cleanup(self) -> None:
"""