mirror of
https://github.com/Xevion/phototag.git
synced 2025-12-15 10:12:37 -06:00
finish reformatting and improving original code with helpers, better logging messages, add jpe to lossy extensions
This commit is contained in:
@@ -29,4 +29,4 @@ os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.join(SCRIPT_ROOT, "config
|
||||
RAW_EXTS = ["3fr", "ari", "arw", "bay", "braw", "crw", "cr2", "cr3", "cap", "data", "dcs", "dcr", "dng", "drf", "eip",
|
||||
"erf", "fff", "gpr", "iiq", "k25", "kdc", "mdc", "mef", "mos", "mrw", "nef", "nrw", "obm", "orf", "pef",
|
||||
"ptx", "pxn", "r3d", "raf", "raw", "rwl", "rw2", "rwz", "sr2", "srf", "srw", "tif", "x3f", ]
|
||||
LOSSY_EXTS = ["jpeg", "jpg", "png"]
|
||||
LOSSY_EXTS = ["jpeg", "jpg", "jpe", "png"]
|
||||
|
||||
@@ -5,7 +5,7 @@ from threading import Thread
|
||||
from google.cloud import vision
|
||||
|
||||
from . import INPUT_PATH, TEMP_PATH
|
||||
from . import RAW_EXTS, LOSSY_EXTS
|
||||
from .helpers import valid_extension, get_extension
|
||||
from .process import FileProcessor
|
||||
|
||||
log = logging.getLogger("app")
|
||||
@@ -14,18 +14,15 @@ log = logging.getLogger("app")
|
||||
def run():
|
||||
client = vision.ImageAnnotatorClient()
|
||||
|
||||
# Find files we want to process based on if they have a corresponding .XMP
|
||||
log.info("Locating processable files...")
|
||||
# Locate valid files
|
||||
files = os.listdir(INPUT_PATH)
|
||||
select = [
|
||||
file
|
||||
for file in files
|
||||
if os.path.splitext(file)[1][1:].lower() in (RAW_EXTS + LOSSY_EXTS)
|
||||
]
|
||||
log.info(f"Found {len(select)} valid files")
|
||||
if len(select) <= 0:
|
||||
log.fatal("No valid files found, exiting early")
|
||||
select = list(filter(lambda file: valid_extension(get_extension(file)), files))
|
||||
|
||||
if len(select) == 0:
|
||||
log.fatal("No valid files located.")
|
||||
return
|
||||
else:
|
||||
log.info(f"Found {len(select)} valid files")
|
||||
|
||||
# Create the 'temp' directory
|
||||
if not os.path.exists(TEMP_PATH):
|
||||
@@ -33,24 +30,22 @@ def run():
|
||||
os.makedirs(TEMP_PATH)
|
||||
|
||||
try:
|
||||
# Process files via Threading
|
||||
# Process files with threading
|
||||
processors = [FileProcessor(file) for file in select]
|
||||
threads = [Thread(target=process.run, args=(client,)) for process in processors]
|
||||
# Start
|
||||
|
||||
# Start threads for each file
|
||||
for i, thread in enumerate(threads):
|
||||
log.info(f"Processing file '{processors[i].file_name}'...")
|
||||
thread.start()
|
||||
# Wait
|
||||
|
||||
# Wait for each thread to complete before stopping
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
# for process in progressbar.progressbar(processors, redirect_stdout=True, term_width=110):
|
||||
|
||||
except Exception as error:
|
||||
log.error(str(error))
|
||||
log.warning("Removing temporary directory before raising exception.")
|
||||
os.rmdir(TEMP_PATH)
|
||||
raise
|
||||
|
||||
# Remove the directory, we are done here
|
||||
log.info("Removing temporary directory.")
|
||||
os.rmdir(TEMP_PATH)
|
||||
finally:
|
||||
os.rmdir(TEMP_PATH)
|
||||
log.info("Temporary directory removed.")
|
||||
|
||||
@@ -17,7 +17,10 @@ log = logging.getLogger("process")
|
||||
|
||||
|
||||
class FileProcessor(object):
|
||||
"""A FileProcessor object is for tagging """
|
||||
"""
|
||||
A FileProcessor object shadows a given file, providing methods for optimizing, labeling
|
||||
and tagging Raw (.NEF, .CR2) and Lossy (.JPEG, .PNG) format pictures.
|
||||
"""
|
||||
|
||||
def __init__(self, file_name: str):
|
||||
"""
|
||||
@@ -95,24 +98,22 @@ class FileProcessor(object):
|
||||
|
||||
# XMP sidecar file specified, write to it using XML module
|
||||
if self.xmp:
|
||||
log.info("Writing {} tags to output XMP.".format(len(labels)))
|
||||
log.info(f"Writing {len(labels)} tags to output XMP.")
|
||||
parser = XMPParser(self.input_xmp)
|
||||
parser.add_keywords(labels)
|
||||
# Save the new XMP file
|
||||
log.debug("Moving old XMP to temp XMP")
|
||||
|
||||
# Generate a temporary XMP file name
|
||||
head, tail = os.path.split(self.input_xmp)
|
||||
name, ext = os.path.splitext(tail)
|
||||
name += " temp"
|
||||
temp_name = os.path.join(head, name + ext)
|
||||
# Begin the process of copying stats (happens in an instant)
|
||||
os.rename(self.input_xmp, temp_name)
|
||||
log.debug("Saving new XMP")
|
||||
parser.save(self.input_xmp)
|
||||
log.debug("Copying old stats to new XMP")
|
||||
shutil.copystat(temp_name, self.input_xmp)
|
||||
log.debug("Removing temp file")
|
||||
os.remove(temp_name)
|
||||
temp_name = os.path.join(head, f'{name} temp{ext}')
|
||||
|
||||
# Finish up processing XMP file
|
||||
os.rename(self.input_xmp, temp_name) # rename the original file
|
||||
parser.save(self.input_xmp) # save the new file
|
||||
shutil.copystat(temp_name, self.input_xmp) # copy file metadata over
|
||||
os.remove(temp_name) # remove the renamed original file
|
||||
log.debug("New XMP file saved with original file metadata. Old XMP file removed.")
|
||||
|
||||
# No XMP file is specified, using IPTC tagging
|
||||
else:
|
||||
log.info("Writing {} tags to image IPTC".format(len(labels)))
|
||||
@@ -130,7 +131,9 @@ class FileProcessor(object):
|
||||
raise
|
||||
self._cleanup()
|
||||
|
||||
# Remove the temporary file (if it exists)
|
||||
def _cleanup(self):
|
||||
def _cleanup(self) -> None:
|
||||
"""
|
||||
Cleanup function. Removes the temporary file generated.
|
||||
"""
|
||||
if os.path.exists(self.temp_file_path):
|
||||
os.remove(self.temp_file_path)
|
||||
|
||||
Reference in New Issue
Block a user