implement MasterFileProcessor in app.py, implement MasterFileProcessor thread loading and joining (tested), fix FileProcessor size property not using correct path

This commit is contained in:
Xevion
2020-08-28 16:35:44 -05:00
parent 68ded24c6f
commit 45a2b41520
2 changed files with 65 additions and 51 deletions
+18 -21
View File
@@ -6,15 +6,18 @@ Main app function file for running the program, delegating the tagging operation
import logging
import os
from threading import Thread
import colored_traceback
from google.cloud import vision
from . import INPUT_PATH, TEMP_PATH
from .helpers import valid_extension, get_extension
from .process import FileProcessor
from .helpers import valid_extension, get_extension, convert_to_bytes
from .process import MasterFileProcessor
log = logging.getLogger("app")
logger = logging.getLogger("app")
logger.setLevel(logging.DEBUG)
colored_traceback.add_hook()
def run():
@@ -25,33 +28,27 @@ def run():
select = list(filter(lambda file: valid_extension(get_extension(file)), files))
if len(select) == 0:
log.fatal("No valid files located.")
logger.fatal("No valid files located.")
return
else:
log.info(f"Found {len(select)} valid files")
logger.info(f"Found {len(select)} valid files")
# Create the 'temp' directory
if not os.path.exists(TEMP_PATH):
log.info("Creating temporary processing directory")
logger.info("Creating temporary processing directory")
os.makedirs(TEMP_PATH)
try:
# Process files with threading
processors = [FileProcessor(file) for file in select]
threads = [Thread(target=process.run, args=(client,)) for process in processors]
# Start threads for each file
for i, thread in enumerate(threads):
log.info(f"Processing file '{processors[i].file_name}'...")
thread.start()
# Wait for each thread to complete before stopping
for thread in threads:
thread.join()
mp = MasterFileProcessor(select, 8, convert_to_bytes("128 MB"), True, client=client)
logger.info('MasterFileProcessor created.')
mp.load()
logger.info('Finished loading/starting initial threads.')
mp.join()
logger.info('Finished joining threads, now quitting.')
except Exception as error:
log.error(str(error))
logger.error(str(error))
raise
finally:
os.rmdir(TEMP_PATH)
log.info("Temporary directory removed.")
logger.info("Temporary directory removed.")
+47 -30
View File
@@ -1,13 +1,15 @@
"""
process.py
Holds the FileProcessor object, used for working with images in order to label and edit/tag their metadata.
Holds the majority of the file processing logic, including processing for individual files, as well as
logic for queued threading for dozens of files in parallel.
"""
import io
import logging
import os
import shutil
from pprint import pprint
from threading import Thread
from typing import Tuple, AnyStr, Optional, List, Dict, Callable
@@ -20,7 +22,8 @@ from google.cloud import vision
from . import TEMP_PATH, INPUT_PATH, RAW_EXTS
from .xmp import XMPParser
log = logging.getLogger("process")
logger = logging.getLogger("process")
logger.setLevel(logging.DEBUG)
class MasterFileProcessor(object):
@@ -72,25 +75,6 @@ class MasterFileProcessor(object):
raise Exception("Invalid Configuration - the image_count is too low. Please set it to a positive "
"non-zero integer or enable single_override.")
def load(self) -> None:
"""
Starts FileProcessor threads, loading zero or more threads simultaneously based on configuration options.
"""
available: List[int] = list(self.waiting.keys())
# At least 1 FP in queue and # of
while len(available) > 0 and len(self.running) < self.image_count:
# check that the smallest FP can fit in the specified buffer
if self.total_size + self.waiting[available[0]].size <= self.buffer_size:
self._start(available[0])
else:
# Could not fit under limit, thus all the subsequent items in the queue will not either.
# Subsequent items will be added through the _finished() callback.
break
# Ensure that at least 1 is in queue with single_override enabled
if self.single_override and len(self.running) == 0:
self._start(available[0])
def _start(self, key: int) -> None:
"""
Starts a new FileProcessor Thread, moving the FP and creating it's thread in the running dict.
@@ -98,9 +82,8 @@ class MasterFileProcessor(object):
:param key: The integer key representing the FileProcessor being added.
"""
fp = self.waiting.pop(key)
thread = Thread(
target=fp.run, args=(self.client,), kwargs={'callback': lambda: self._finished(key)}
)
thread = Thread(target=fp.run, args=(self.client,), kwargs={'callback': lambda: self._finished(key)})
self.running[key] = (fp, thread)
thread.start()
def _finished(self, key: int) -> None:
@@ -133,6 +116,40 @@ class MasterFileProcessor(object):
"""
return sum(processor.size for processor, thread in self.running.values())
def load(self) -> None:
"""
Starts FileProcessor threads, loading zero or more threads simultaneously based on configuration options.
"""
available: List[int] = list(self.waiting.keys())
if len(available) == 0:
return
# At least 1 FP in queue and # of
while len(available) > 0 and len(self.running) < self.image_count:
# check that the smallest FP can fit in the specified buffer
if self.total_size + self.waiting[available[0]].size <= self.buffer_size:
self._start(available.pop(0))
else:
# Could not fit under limit, thus all the subsequent items in the queue will not either.
# Subsequent items will be added through the _finished() callback.
break
# Ensure that at least 1 is in queue with single_override enabled
if self.single_override and len(available) != 0 and len(self.running) == 0:
self._start(available.pop(0))
def join(self) -> None:
"""
Joins running threads continuously until none are left.
"""
while True:
threads = [thread for fp, thread in self.running.values()]
for thread in threads:
thread.join()
if len(self.running) == 0 and len(self.waiting) == 0:
break
class FileProcessor(object):
"""
@@ -218,11 +235,11 @@ class FileProcessor(object):
# Performs label detection on the image file
response = client.label_detection(image=image)
labels = [label.description for label in response.label_annotations]
log.info("Keywords Identified: {}".format(", ".join(labels)))
logger.info("Keywords Identified: {}".format(", ".join(labels)))
# XMP sidecar file specified, write to it using XML module
if self.xmp:
log.info(f"Writing {len(labels)} tags to output XMP.")
logger.info(f"Writing {len(labels)} tags to output XMP.")
parser = XMPParser(self.input_xmp)
parser.add_keywords(labels)
@@ -236,11 +253,11 @@ class FileProcessor(object):
parser.save(self.input_xmp) # save the new file
shutil.copystat(temp_name, self.input_xmp) # copy file metadata over
os.remove(temp_name) # remove the renamed original file
log.debug("New XMP file saved with original file metadata. Old XMP file removed.")
logger.debug("New XMP file saved with original file metadata. Old XMP file removed.")
# No XMP file is specified, using IPTC tagging
else:
log.info("Writing {} tags to image IPTC".format(len(labels)))
logger.info("Writing {} tags to image IPTC".format(len(labels)))
info = iptcinfo3.IPTCInfo(os.path.join(INPUT_PATH, self.file_name))
info["keywords"].extend(labels)
info.save()
@@ -267,8 +284,8 @@ class FileProcessor(object):
@property
def size(self) -> int:
"""
Returns the size of the image in bytes that this FileProcessor objet shadows.
Returns the size of the image in bytes that this FileProcessor object shadows.
:return: the number of bytes the shadowed image takes up on the disk
"""
return os.path.getsize(self.file_name)
return os.path.getsize(os.path.join(INPUT_PATH, self.file_name))