mirror of
https://github.com/Xevion/phototag.git
synced 2026-01-31 12:25:07 -06:00
Merge branch 'refactor'
This commit is contained in:
@@ -1,7 +0,0 @@
|
||||
import sys, os
|
||||
from package import app
|
||||
|
||||
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]=os.path.join(sys.path[0], 'package', 'key', 'photo_tagging_service.json')
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(app.run())
|
||||
@@ -0,0 +1,31 @@
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
import progressbar
|
||||
|
||||
# Logging and Progressbar work
|
||||
progressbar.streams.wrap_stderr()
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
log = logging.getLogger('init')
|
||||
log.info('Progressbar/Logging ready.')
|
||||
|
||||
|
||||
# Path Constants
|
||||
ROOT = os.path.dirname(os.path.realpath(__file__))
|
||||
INPUT_PATH = ROOT
|
||||
TEMP_PATH = os.path.join(ROOT, 'temp')
|
||||
OUTPUT_PATH = os.path.join(ROOT, 'output')
|
||||
log.info('Path Constants Built.')
|
||||
|
||||
# Extension Constants
|
||||
RAW_EXTS = [
|
||||
"3fr", "ari", "arw", "bay", "braw", "crw",
|
||||
"cr2", "cr3", "cap", "data", "dcs", "dcr",
|
||||
"dng", "drf", "eip", "erf", "fff", "gpr",
|
||||
"iiq", "k25", "kdc", "mdc", "mef", "mos",
|
||||
"mrw", "nef", "nrw", "obm", "orf", "pef",
|
||||
"ptx", "pxn", "r3d", "raf", "raw", "rwl",
|
||||
"rw2", "rwz", "sr2", "srf", "srw", "tif",
|
||||
"x3f",
|
||||
]
|
||||
LOSSY_EXTS = ["jpeg", "jpg", "png"]
|
||||
@@ -0,0 +1,13 @@
|
||||
import os
|
||||
import logging
|
||||
|
||||
from . import INPUT_PATH, OUTPUT_PATH
|
||||
|
||||
# Ensure that 'input' and 'output' directories are created
|
||||
if not os.path.exists(INPUT_PATH):
|
||||
logging.fatal('Input directory did not exist, creating and quitting.')
|
||||
os.makedirs(INPUT_PATH)
|
||||
|
||||
if not os.path.exists(OUTPUT_PATH):
|
||||
logging.info('Output directory did not exist. Creating...')
|
||||
os.makedirs(OUTPUT_PATH)
|
||||
+39
-155
@@ -1,171 +1,55 @@
|
||||
import io, sys, os, time, rawpy, imageio, progressbar, shutil, iptcinfo3
|
||||
from google.cloud.vision import types
|
||||
import io
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import rawpy
|
||||
import imageio
|
||||
import progressbar
|
||||
import shutil
|
||||
import logging
|
||||
|
||||
from google.cloud import vision
|
||||
from package import xmp
|
||||
from PIL import Image
|
||||
|
||||
# The name of the image file to annotate
|
||||
input_path = os.path.join(sys.path[0], 'package', 'processing', 'input')
|
||||
temp_path = os.path.join(sys.path[0], 'package', 'processing', 'temp')
|
||||
output_path = os.path.join(sys.path[0], 'package', 'processing', 'output')
|
||||
from .xmp import XMPParser
|
||||
from .process import FileProcessor
|
||||
from . import INPUT_PATH, TEMP_PATH, OUTPUT_PATH
|
||||
from . import RAW_EXTS, LOSSY_EXTS
|
||||
|
||||
# Process a single file in these steps:
|
||||
# 1) Create a temporary file
|
||||
# 2) Send it to GoogleAPI
|
||||
# 3) Read XMP, then write new tags to it
|
||||
# 4) Delete temporary file, move NEF/JPEG and XMP
|
||||
log = logging.getLogger('app')
|
||||
|
||||
def process_file(file_name, xmp_name=None):
|
||||
global client
|
||||
|
||||
# Remove the temporary file
|
||||
def _cleanup():
|
||||
if os.path.exists(temp_file_path):
|
||||
# Deletes the temporary file
|
||||
os.remove(temp_file_path)
|
||||
|
||||
# Get the size of the file. Is concerned with filesize type. 1024KiB -> 1MiB
|
||||
def _size(file_path):
|
||||
size, type = os.path.getsize(file_path) / 1024, 'KiB'
|
||||
if size >= 1024: size /= 1024; type = 'MiB'
|
||||
return round(size, 2), type
|
||||
|
||||
# Optimizes a file using JPEG thumbnailing and compression.
|
||||
def _optimize(file_path, size=(512, 512), quality=85, copy=None):
|
||||
image = Image.open(file_path)
|
||||
image.thumbnail(size, resample=Image.ANTIALIAS)
|
||||
if copy:
|
||||
image.save(copy, format='jpeg', optimize=True, quality=quality)
|
||||
else:
|
||||
image.save(file_path, format='jpeg', optimize=True, quality=quality)
|
||||
|
||||
base, ext = os.path.splitext(file_name)
|
||||
temp_file_path = os.path.join(temp_path, base + '.jpeg')
|
||||
|
||||
try:
|
||||
if xmp_name:
|
||||
# Process the file into a JPEG
|
||||
rgb = rawpy.imread(os.path.join(input_path, file_name))
|
||||
imageio.imsave(temp_file_path, rgb.postprocess())
|
||||
rgb.close()
|
||||
|
||||
# Information on file sizes
|
||||
print("Raw Size: {} {}".format(*_size(os.path.join(input_path, file_name))), end=' | ')
|
||||
print("Resave Size: {} {}".format(*_size(temp_file_path)), end=' | ')
|
||||
pre = os.path.getsize(temp_file_path)
|
||||
_optimize(temp_file_path)
|
||||
post = os.path.getsize(temp_file_path)
|
||||
print("Optimized Size: {} {} ({}% savings)".format(*_size(temp_file_path), round((1.0 - (post / pre)) * 100), 2) )
|
||||
else:
|
||||
pre = os.path.getsize(os.path.join(input_path, file_name))
|
||||
_optimize(os.path.join(input_path, file_name), copy=temp_file_path)
|
||||
post = os.path.getsize(temp_file_path)
|
||||
print("Optimized Size: {} {} ({}% savings)".format(*_size(temp_file_path), round((1.0 - (post / pre)) * 100), 2) )
|
||||
|
||||
# Open the image, read as bytes, convert to types Image
|
||||
image = Image.open(temp_file_path)
|
||||
bytesIO = io.BytesIO()
|
||||
image.save(bytesIO, format='jpeg')
|
||||
image.close()
|
||||
image = vision.types.Image(content=bytesIO.getvalue())
|
||||
|
||||
# Performs label detection on the image file
|
||||
response = client.label_detection(image=image)
|
||||
labels = [label.description for label in response.label_annotations]
|
||||
print('\tLabels: {}'.format(', '.join(labels)))
|
||||
|
||||
# XMP sidecar file specified, write to it using XML module
|
||||
if xmp_name:
|
||||
print('\tWriting {} tags to output XMP...'.format(len(labels)))
|
||||
parser = xmp.XMPParser(os.path.join(input_path, xmp_name))
|
||||
parser.add_keywords(labels)
|
||||
# Save the new XMP file
|
||||
parser.save(os.path.join(output_path, xmp_name))
|
||||
# Remove the old XMP file
|
||||
os.remove(os.path.join(input_path, xmp_name))
|
||||
# No XMP file is specified, using IPTC tagging
|
||||
else:
|
||||
print('\tWriting {} tags to output {}'.format(len(labels), ext[1:].upper()))
|
||||
info = iptcinfo3.IPTCInfo(os.path.join(input_path, file_name))
|
||||
info['keywords'].extend(labels)
|
||||
info.save()
|
||||
# Remove the weird ghsot file created by this iptc read/writer.
|
||||
os.remove(os.path.join(input_path, file_name + '~'))
|
||||
|
||||
# Copy dry-run
|
||||
# shutil.copy2(os.path.join(input_path, file_name), os.path.join(output_path, file_name))
|
||||
os.rename(os.path.join(input_path, file_name), os.path.join(output_path, file_name))
|
||||
except:
|
||||
_cleanup()
|
||||
raise
|
||||
_cleanup()
|
||||
|
||||
# Driver code for the package
|
||||
def run():
|
||||
global client
|
||||
|
||||
# Ensure that 'input' and 'output' directories are created
|
||||
if not os.path.exists(input_path):
|
||||
print('Input directory did not exist, creating and quitting.')
|
||||
os.makedirs(input_path)
|
||||
return
|
||||
|
||||
if not os.path.exists(output_path):
|
||||
print('Output directory did not exist. Creating...')
|
||||
os.makedirs(output_path)
|
||||
|
||||
# Clients
|
||||
client = vision.ImageAnnotatorClient()
|
||||
|
||||
# Find files we want to process based on if they have a corresponding .XMP
|
||||
files = os.listdir(input_path)
|
||||
select = [file for file in files if os.path.splitext(file)[1] != '.xmp']
|
||||
log.info('Locating processable files...')
|
||||
files = os.listdir(INPUT_PATH)
|
||||
select = [file for file in files if os.path.splitext(file)[1][1:].lower() in (RAW_EXTS + LOSSY_EXTS)]
|
||||
log.info(f'Found {len(select)} valid files')
|
||||
|
||||
# Create the 'temp' directory
|
||||
print(f'Initializing file processing for {len(select)} files...')
|
||||
os.makedirs(temp_path)
|
||||
|
||||
if not os.path.exists(TEMP_PATH):
|
||||
log.info('Creating temporary processing directory')
|
||||
os.makedirs(TEMP_PATH)
|
||||
if not os.path.exists(OUTPUT_PATH):
|
||||
log.info('Creating output processing directory')
|
||||
os.makedirs(OUTPUT_PATH)
|
||||
|
||||
try:
|
||||
# Process files
|
||||
for index, file in progressbar.progressbar(list(enumerate(select)), redirect_stdout=True, term_width=110):
|
||||
name, ext = os.path.splitext(file)
|
||||
ext = ext.upper()
|
||||
# Raw files contain their metadata in an XMP file usually
|
||||
if ext in ['.NEF', '.CR2']:
|
||||
# Get all possible files
|
||||
identicals = [possible for possible in files
|
||||
if possible.startswith(os.path.splitext(file)[0])
|
||||
and not possible.endswith(os.path.splitext(file)[1])
|
||||
and not possible.upper().endswith('.XMP')]
|
||||
|
||||
# Alert the user that there are duplicates in the directory and ask whether or not to continue
|
||||
if len(identicals) > 0:
|
||||
print('Identical files were found in the directory, continue?')
|
||||
print(',\n\t'.join(identicals))
|
||||
|
||||
xmps = [possible for possible in files
|
||||
if possible.startswith(os.path.splitext(file)[0])
|
||||
and possible.upper().endswith('.XMP')]
|
||||
|
||||
# Skip and warn if more than 1 possible files, user error
|
||||
if len(xmps) > 1:
|
||||
print('More than 1 possible XMP metadata file for \'{}\'...'.format(file))
|
||||
print(',\n'.join(['\t{}'.format(possible) for possible in xmps]))
|
||||
# Zero possible files, user error, likely
|
||||
elif len(xmps) <= 0:
|
||||
print('No matching XMP metadata file for \'{}\'. skipping...'.format(file))
|
||||
# Process individual file
|
||||
else:
|
||||
print('Processing file {}, \'{}\''.format(index + 1, xmps[0]), end=' | ')
|
||||
process_file(file_name=file, xmp_name=xmps[0])
|
||||
elif ext in ['.JPEG', '.JPG', '.PNG']:
|
||||
print('Processing file {}, \'{}\''.format(index + 1, file), end=' | ')
|
||||
process_file(file_name=file)
|
||||
|
||||
except:
|
||||
os.rmdir(temp_path)
|
||||
_, ext = os.path.splitext(file)
|
||||
ext = ext[1:].lower()
|
||||
if ext in LOSSY_EXTS or ext in RAW_EXTS:
|
||||
process = FileProcessor(file)
|
||||
log.info(f"Processing file '{file}'...")
|
||||
process.run(client)
|
||||
except Exception as error:
|
||||
log.error(str(error))
|
||||
log.warning(
|
||||
'Removing temporary directory before raising exception.')
|
||||
os.rmdir(TEMP_PATH)
|
||||
raise
|
||||
|
||||
# Remove the directory, we are done here
|
||||
print('Cleaning up temporary directory...')
|
||||
os.rmdir(temp_path)
|
||||
log.info('Removing temporary directory.')
|
||||
os.rmdir(TEMP_PATH)
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
import os
|
||||
import sys
|
||||
import rawpy
|
||||
import imageio
|
||||
import io
|
||||
import iptcinfo3
|
||||
import logging
|
||||
from PIL import Image
|
||||
from google.cloud.vision import types
|
||||
from google.cloud import vision
|
||||
|
||||
from . import TEMP_PATH, INPUT_PATH, OUTPUT_PATH, RAW_EXTS, LOSSY_EXTS
|
||||
from .xmp import XMPParser
|
||||
|
||||
log = logging.getLogger('process')
|
||||
|
||||
class FileProcessor(object):
|
||||
def __init__(self, file_name: str):
|
||||
self.file_name = file_name
|
||||
self.base, self.ext = os.path.splitext(self.file_name)
|
||||
self.ext = self.ext[1:]
|
||||
# Path to temporary file that will be optimized for upload to Google
|
||||
self.temp_file_path = os.path.join(TEMP_PATH, self.base + '.jpeg')
|
||||
# Decide whether a XMP file is available
|
||||
self.xmp = None
|
||||
if self.ext.lower() in RAW_EXTS:
|
||||
self.xmp = self.base + '.xmp'
|
||||
self.input_xmp = os.path.join(INPUT_PATH, self.xmp)
|
||||
self.output_xmp = os.path.join(OUTPUT_PATH, self.xmp)
|
||||
if not os.path.exists(self.input_xmp):
|
||||
raise Exception('Sidecar file for \'{}\' does not exist.'.format(self.xmp))
|
||||
|
||||
# Optimizes a file using JPEG thumbnailing and compression.
|
||||
def _optimize(self, file: str, size: tuple = (512, 512), quality : int = 85, copy : str = None):
|
||||
image = Image.open(file)
|
||||
image.thumbnail(size, resample=Image.ANTIALIAS)
|
||||
if copy:
|
||||
image.save(copy, format='jpeg', optimize=True, quality=quality)
|
||||
else:
|
||||
image.save(file, format='jpeg', optimize=True, quality=quality)
|
||||
|
||||
def optimize(self):
|
||||
if self.xmp:
|
||||
# Long runn
|
||||
rgb = rawpy.imread(os.path.join(INPUT_PATH, self.file_name))
|
||||
imageio.imsave(self.temp_file_path, rgb.postprocess())
|
||||
rgb.close()
|
||||
self._optimize(self.temp_file_path)
|
||||
else:
|
||||
self._optimize(os.path.join(
|
||||
INPUT_PATH, self.file_name), copy=self.temp_file_path)
|
||||
|
||||
def run(self, client: vision.ImageAnnotatorClient):
|
||||
try:
|
||||
self.optimize()
|
||||
|
||||
# Open the image, read as bytes, convert to types Image
|
||||
image = Image.open(self.temp_file_path)
|
||||
bytesIO = io.BytesIO()
|
||||
image.save(bytesIO, format='jpeg')
|
||||
image.close()
|
||||
image = vision.types.Image(content=bytesIO.getvalue())
|
||||
|
||||
# Performs label detection on the image file
|
||||
response = client.label_detection(image=image)
|
||||
labels = [label.description for label in response.label_annotations]
|
||||
log.info('Keywords Identified: {}'.format(', '.join(labels)))
|
||||
|
||||
# XMP sidecar file specified, write to it using XML module
|
||||
if self.xmp:
|
||||
log.info('Writing {} tags to output XMP.'.format(len(labels)))
|
||||
parser = XMPParser(self.input_xmp)
|
||||
parser.add_keywords(labels)
|
||||
# Save the new XMP file
|
||||
log.debug('Saving to new XMP file.')
|
||||
parser.save(self.output_xmp)
|
||||
log.debug('Removing old XMP file.')
|
||||
os.remove(self.input_xmp)
|
||||
# No XMP file is specified, using IPTC tagging
|
||||
else:
|
||||
log.info('Writing {} tags to image IPTC'.format(len(labels)))
|
||||
info = iptcinfo3.IPTCInfo(os.path.join(INPUT_PATH, self.file_name))
|
||||
info['keywords'].extend(labels)
|
||||
info.save()
|
||||
# Remove the weird ghsot file created by this iptc read/writer.
|
||||
os.remove(os.path.join(INPUT_PATH, self.file_name + '~'))
|
||||
|
||||
# Copy dry-run
|
||||
# shutil.copy2(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name))
|
||||
os.rename(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name))
|
||||
except:
|
||||
self._cleanup()
|
||||
raise
|
||||
self._cleanup()
|
||||
|
||||
# Remove the temporary file (if it exists)
|
||||
def _cleanup(self):
|
||||
if os.path.exists(self.temp_file_path):
|
||||
os.remove(self.temp_file_path)
|
||||
+20
@@ -0,0 +1,20 @@
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
import click
|
||||
from package import app
|
||||
|
||||
log = logging.getLogger('main')
|
||||
|
||||
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.join(
|
||||
sys.path[0], 'package', 'key', 'photo_tagging_service.json')
|
||||
|
||||
|
||||
@click.command()
|
||||
def cli():
|
||||
log.info('Executing package...')
|
||||
sys.exit(app.run())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,40 @@
|
||||
import sys
|
||||
import os
|
||||
import io
|
||||
from setuptools import find_packages, setup
|
||||
|
||||
DEPENDENCIES = ['Click']
|
||||
EXCLUDE_FROM_PACKAGES = []
|
||||
CURDIR = sys.path[0]
|
||||
|
||||
with open(os.path.join(CURDIR, 'README.md')) as file:
|
||||
README = file.read()
|
||||
|
||||
setup(
|
||||
name="phototag",
|
||||
version="1.0.0",
|
||||
author="Xevion",
|
||||
author_email="xevion@xevion.dev",
|
||||
description="",
|
||||
long_description=README,
|
||||
long_description_content_type="text/markdown",
|
||||
url="https://github.com/xevion/photo-tagging",
|
||||
packages=find_packages(exclude=EXCLUDE_FROM_PACKAGES),
|
||||
include_package_data=True,
|
||||
keywords=[],
|
||||
scripts=[],
|
||||
entry_points='''
|
||||
[console_scripts]
|
||||
phototag=phototag.phototag:cli
|
||||
''',
|
||||
zip_safe=False,
|
||||
install_requires=DEPENDENCIES,
|
||||
python_requires=">=3.6",
|
||||
# license and classifier list:
|
||||
# https://pypi.org/pypi?%3Aaction=list_classifiers
|
||||
license="License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
|
||||
classifiers=[
|
||||
"Programming Language :: Python :: 3",
|
||||
"Operating System :: OS Independent",
|
||||
],
|
||||
)
|
||||
Reference in New Issue
Block a user