module wide black reformatting (probably a mistake)

This commit is contained in:
Xevion
2020-02-07 01:58:28 -06:00
parent 33dc480bb4
commit c77c6eb00b
8 changed files with 157 additions and 100 deletions
+51 -14
View File
@@ -7,30 +7,67 @@ from . import config
# Logging and Progressbar work
progressbar.streams.wrap_stderr()
log = logging.getLogger('init')
log = logging.getLogger("init")
log.setLevel(logging.INFO)
log.info('Progressbar/Logging ready.')
log.info("Progressbar/Logging ready.")
# Path Constants
ROOT = os.getcwd()
INPUT_PATH = ROOT
SCRIPT_ROOT = os.path.dirname(os.path.realpath(__file__))
TEMP_PATH = os.path.join(ROOT, 'temp')
OUTPUT_PATH = os.path.join(ROOT, 'output')
log.info('Path Constants Built.')
TEMP_PATH = os.path.join(ROOT, "temp")
OUTPUT_PATH = os.path.join(ROOT, "output")
log.info("Path Constants Built.")
# Enviroment Variables
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = os.path.join(SCRIPT_ROOT, 'config', config.config['google']['credentials'])
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.join(
SCRIPT_ROOT, "config", config.config["google"]["credentials"]
)
# Extension Constants
RAW_EXTS = [
"3fr", "ari", "arw", "bay", "braw", "crw",
"cr2", "cr3", "cap", "data", "dcs", "dcr",
"dng", "drf", "eip", "erf", "fff", "gpr",
"iiq", "k25", "kdc", "mdc", "mef", "mos",
"mrw", "nef", "nrw", "obm", "orf", "pef",
"ptx", "pxn", "r3d", "raf", "raw", "rwl",
"rw2", "rwz", "sr2", "srf", "srw", "tif",
"3fr",
"ari",
"arw",
"bay",
"braw",
"crw",
"cr2",
"cr3",
"cap",
"data",
"dcs",
"dcr",
"dng",
"drf",
"eip",
"erf",
"fff",
"gpr",
"iiq",
"k25",
"kdc",
"mdc",
"mef",
"mos",
"mrw",
"nef",
"nrw",
"obm",
"orf",
"pef",
"ptx",
"pxn",
"r3d",
"raf",
"raw",
"rwl",
"rw2",
"rwz",
"sr2",
"srf",
"srw",
"tif",
"x3f",
]
LOSSY_EXTS = ["jpeg", "jpg", "png"]
LOSSY_EXTS = ["jpeg", "jpg", "png"]
+2 -2
View File
@@ -14,7 +14,7 @@ from . import INPUT_PATH, OUTPUT_PATH
# logging.info('Output directory did not exist. Creating...')
# os.makedirs(OUTPUT_PATH)
log = logging.getLogger('main')
log = logging.getLogger("main")
if __name__ == "__main__":
main()
main()
+16 -14
View File
@@ -8,7 +8,7 @@ import progressbar
import shutil
import logging
from threading import Thread
from threading import Thread
from google.cloud import vision
from package import xmp
from PIL import Image
@@ -18,31 +18,34 @@ from .process import FileProcessor
from . import INPUT_PATH, TEMP_PATH, OUTPUT_PATH
from . import RAW_EXTS, LOSSY_EXTS
log = logging.getLogger('app')
log = logging.getLogger("app")
def run():
client = vision.ImageAnnotatorClient()
# Find files we want to process based on if they have a corresponding .XMP
log.info('Locating processable files...')
log.info("Locating processable files...")
files = os.listdir(INPUT_PATH)
select = [file for file in files if os.path.splitext(file)[1][1:].lower() in (RAW_EXTS + LOSSY_EXTS)]
log.info(f'Found {len(select)} valid files')
select = [
file
for file in files
if os.path.splitext(file)[1][1:].lower() in (RAW_EXTS + LOSSY_EXTS)
]
log.info(f"Found {len(select)} valid files")
if len(select) <= 0:
log.fatal('No vald files found, exiting early')
log.fatal("No vald files found, exiting early")
return
# Create the 'temp' directory
if not os.path.exists(TEMP_PATH):
log.info('Creating temporary processing directory')
log.info("Creating temporary processing directory")
os.makedirs(TEMP_PATH)
try:
# Process files via Threading
processors = [FileProcessor(file) for file in select]
threads = [
Thread(target=process.run, args=(client,)) for process in processors
]
threads = [Thread(target=process.run, args=(client,)) for process in processors]
# Start
for i, thread in enumerate(threads):
log.info(f"Processing file '{processors[i].file_name}'...")
@@ -51,14 +54,13 @@ def run():
for thread in threads:
thread.join()
# for process in progressbar.progressbar(processors, redirect_stdout=True, term_width=110):
except Exception as error:
log.error(str(error))
log.warning(
'Removing temporary directory before raising exception.')
log.warning("Removing temporary directory before raising exception.")
os.rmdir(TEMP_PATH)
raise
# Remove the directory, we are done here
log.info('Removing temporary directory.')
log.info("Removing temporary directory.")
os.rmdir(TEMP_PATH)
+28 -17
View File
@@ -6,46 +6,57 @@ import os
from . import config
log = logging.getLogger('cli')
log = logging.getLogger("cli")
@click.group()
def cli():
pass
@cli.command()
def run():
log.info(f'CLI started tagging at {os.getcwd()}')
from .app import run
run()
@cli.command()
@click.argument('path')
@click.option('-m', '--move', default=False, show_default=True, prompt=True, help='Move instead of copying the credentials file')
def run():
log.info(f"CLI started tagging at {os.getcwd()}")
from .app import run
run()
@cli.command()
@click.argument("path")
@click.option(
"-m",
"--move",
default=False,
show_default=True,
prompt=True,
help="Move instead of copying the credentials file",
)
def auth(path, move):
if not os.path.isabs(path):
path = os.path.abspath(path)
# Verify that the file eixsts
if os.path.isfile(path):
log.info('Specifed path is file and exists')
log.info("Specifed path is file and exists")
else:
if os.path.isdir(path):
log.warning('Specified path is directory, not file!')
log.warning("Specified path is directory, not file!")
else:
log.warning('Specified path doesn\'t exist!')
log.warning('Please correct the path before trying again.')
log.warning("Specified path doesn't exist!")
log.warning("Please correct the path before trying again.")
click.exit()
# Identify the final location of the file in the config directory
_, head = os.path.split(path)
new_path = os.path.join(config.SCRIPT_ROOT, 'config', head)
new_path = os.path.join(config.SCRIPT_ROOT, "config", head)
# MOVE the file
if move:
shutil.move(path, new_path)
log.info('Successfully moved file to configuration file.')
log.info("Successfully moved file to configuration file.")
# COPY the file
elif not move:
# May be something to think about - should we copy metadata, permissions, etc? Probably not.
shutil.copy(path, new_path)
log.info('Successfully copied file to configuration folder.')
config.config['google']['credentials'] = head
log.info("Successfully copied file to configuration folder.")
config.config["google"]["credentials"] = head
config.quicksave()
log.info(f'Key file configuration updated.')
log.info(f"Key file configuration updated.")
+8 -8
View File
@@ -3,21 +3,21 @@ import sys
import configparser
SCRIPT_ROOT = os.path.dirname(os.path.realpath(__file__))
CONFIG_DIR = os.path.join(SCRIPT_ROOT, 'config')
CONFIG_PATH = os.path.join(CONFIG_DIR, 'config.ini')
CONFIG_DIR = os.path.join(SCRIPT_ROOT, "config")
CONFIG_PATH = os.path.join(CONFIG_DIR, "config.ini")
config = configparser.ConfigParser()
def quicksave():
with open(CONFIG_PATH, 'w+') as file:
with open(CONFIG_PATH, "w+") as file:
config.write(file)
if not os.path.exists(CONFIG_PATH):
if not os.path.exists(CONFIG_DIR):
os.makedirs(CONFIG_DIR)
config['google'] = {
'credentials' : ''
}
config["google"] = {"credentials": ""}
quicksave()
else:
with open(CONFIG_PATH, 'r') as file:
config.read_file(file)
with open(CONFIG_PATH, "r") as file:
config.read_file(file)
+27 -21
View File
@@ -15,7 +15,8 @@ from google.cloud import vision
from . import TEMP_PATH, INPUT_PATH, OUTPUT_PATH, RAW_EXTS, LOSSY_EXTS
from .xmp import XMPParser
log = logging.getLogger('process')
log = logging.getLogger("process")
class FileProcessor(object):
def __init__(self, file_name: str):
@@ -23,23 +24,27 @@ class FileProcessor(object):
self.base, self.ext = os.path.splitext(self.file_name)
self.ext = self.ext[1:]
# Path to temporary file that will be optimized for upload to Google
self.temp_file_path = os.path.join(TEMP_PATH, self.base + '.jpeg')
self.temp_file_path = os.path.join(TEMP_PATH, self.base + ".jpeg")
# Decide whether a XMP file is available
self.xmp = None
if self.ext.lower() in RAW_EXTS:
self.xmp = self.base + '.xmp'
self.xmp = self.base + ".xmp"
self.input_xmp = os.path.join(INPUT_PATH, self.xmp)
if not os.path.exists(self.input_xmp):
raise Exception('Sidecar file for \'{}\' does not exist.'.format(self.xmp))
raise Exception(
"Sidecar file for '{}' does not exist.".format(self.xmp)
)
# Optimizes a file using JPEG thumbnailing and compression.
def _optimize(self, file: str, size: tuple = (512, 512), quality : int = 85, copy : str = None):
def _optimize(
self, file: str, size: tuple = (512, 512), quality: int = 85, copy: str = None
):
image = Image.open(file)
image.thumbnail(size, resample=Image.ANTIALIAS)
if copy:
image.save(copy, format='jpeg', optimize=True, quality=quality)
image.save(copy, format="jpeg", optimize=True, quality=quality)
else:
image.save(file, format='jpeg', optimize=True, quality=quality)
image.save(file, format="jpeg", optimize=True, quality=quality)
def optimize(self):
if self.xmp:
@@ -49,8 +54,9 @@ class FileProcessor(object):
rgb.close()
self._optimize(self.temp_file_path)
else:
self._optimize(os.path.join(
INPUT_PATH, self.file_name), copy=self.temp_file_path)
self._optimize(
os.path.join(INPUT_PATH, self.file_name), copy=self.temp_file_path
)
def run(self, client: vision.ImageAnnotatorClient):
try:
@@ -59,44 +65,44 @@ class FileProcessor(object):
# Open the image, read as bytes, convert to types Image
image = Image.open(self.temp_file_path)
bytesIO = io.BytesIO()
image.save(bytesIO, format='jpeg')
image.save(bytesIO, format="jpeg")
image.close()
image = vision.types.Image(content=bytesIO.getvalue())
# Performs label detection on the image file
response = client.label_detection(image=image)
labels = [label.description for label in response.label_annotations]
log.info('Keywords Identified: {}'.format(', '.join(labels)))
log.info("Keywords Identified: {}".format(", ".join(labels)))
# XMP sidecar file specified, write to it using XML module
if self.xmp:
log.info('Writing {} tags to output XMP.'.format(len(labels)))
log.info("Writing {} tags to output XMP.".format(len(labels)))
parser = XMPParser(self.input_xmp)
parser.add_keywords(labels)
# Save the new XMP file
log.debug('Moving old XMP to temp XMP')
log.debug("Moving old XMP to temp XMP")
# Generate a temporary XMP file name
head, tail = os.path.split(self.input_xmp)
name, ext = os.path.splitext(tail)
name += ' temp'
name += " temp"
temp_name = os.path.join(head, name + ext)
# Begin the process of copying stats (happens in an instant)
os.rename(self.input_xmp, temp_name)
log.debug('Saving new XMP')
log.debug("Saving new XMP")
parser.save(self.input_xmp)
log.debug('Copying old stats to new XMP')
log.debug("Copying old stats to new XMP")
shutil.copystat(temp_name, self.input_xmp)
log.debug('Removing temp file')
log.debug("Removing temp file")
os.remove(temp_name)
# No XMP file is specified, using IPTC tagging
else:
log.info('Writing {} tags to image IPTC'.format(len(labels)))
log.info("Writing {} tags to image IPTC".format(len(labels)))
info = iptcinfo3.IPTCInfo(os.path.join(INPUT_PATH, self.file_name))
info['keywords'].extend(labels)
info["keywords"].extend(labels)
info.save()
# Remove the weird ghsot file created by this iptc read/writer.
os.remove(os.path.join(INPUT_PATH, self.file_name + '~'))
os.remove(os.path.join(INPUT_PATH, self.file_name + "~"))
# Copy dry-run
# shutil.copy2(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name))
# os.rename(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name))
+13 -12
View File
@@ -2,17 +2,18 @@ import xml.etree.ElementTree as ET
import pprint as pp
import random, string
rnd = lambda length=10 : ''.join(random.choices(list(string.ascii_letters), k=length))
toText = lambda items : list(map(lambda item : item.text, items))
rnd = lambda length=10: "".join(random.choices(list(string.ascii_letters), k=length))
toText = lambda items: list(map(lambda item: item.text, items))
# Constant Namespace Types
RDF = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF'
SUBJECT = '{http://purl.org/dc/elements/1.1/}subject'
DESCRIPTION = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Description'
DESCRIPTION_LOWER = '{http://purl.org/dc/elements/1.1/}description'
ALT = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Alt'
LI = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}li'
BAG = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Bag'
RDF = "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF"
SUBJECT = "{http://purl.org/dc/elements/1.1/}subject"
DESCRIPTION = "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Description"
DESCRIPTION_LOWER = "{http://purl.org/dc/elements/1.1/}description"
ALT = "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Alt"
LI = "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}li"
BAG = "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Bag"
class XMPParser(object):
def __init__(self, path):
@@ -29,7 +30,7 @@ class XMPParser(object):
# if self.description:
# self.description = self.description.find(ALT)
# self.description = self.description.find(LI)
# Keyword Tag
self._ready_keywords()
self.keywords = self.root.find(SUBJECT)
@@ -47,7 +48,7 @@ class XMPParser(object):
subject = ET.Element(SUBJECT)
subject.append(ET.Element(BAG))
self.root.append(subject)
def save(self, outpath=None):
self.xmp.write(outpath or self.path)
@@ -58,4 +59,4 @@ class XMPParser(object):
self.keywords.extend(elements)
def add_keyword(self, keyword):
self.add_keywords([keyword])
self.add_keywords([keyword])
+12 -12
View File
@@ -4,20 +4,20 @@ import io
from setuptools import find_packages, setup
DEPENDENCIES = [
'Click',
'rawpy',
'imageio',
'progressbar2',
'iptcinfo3',
'google-api-python-client',
'google-cloud',
'google-cloud-vision',
'Pillow'
"Click",
"rawpy",
"imageio",
"progressbar2",
"iptcinfo3",
"google-api-python-client",
"google-cloud",
"google-cloud-vision",
"Pillow",
]
EXCLUDE_FROM_PACKAGES = []
CURDIR = sys.path[0]
with open(os.path.join(CURDIR, 'README.md')) as file:
with open(os.path.join(CURDIR, "README.md")) as file:
README = file.read()
setup(
@@ -33,10 +33,10 @@ setup(
include_package_data=True,
keywords=[],
scripts=[],
entry_points='''
entry_points="""
[console_scripts]
phototag=phototag.cli:cli
''',
""",
zip_safe=False,
install_requires=DEPENDENCIES,
python_requires=">=3.6",