mirror of
https://github.com/Xevion/phototag.git
synced 2025-12-09 18:07:51 -06:00
change package name to phototag, worked on cli command options to have globally accessible configuration change command
This commit is contained in:
@@ -1,30 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
import progressbar
|
||||
|
||||
# Logging and Progressbar work
|
||||
progressbar.streams.wrap_stderr()
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
log = logging.getLogger('init')
|
||||
log.info('Progressbar/Logging ready.')
|
||||
|
||||
# Path Constants
|
||||
ROOT = os.getcwd()
|
||||
INPUT_PATH = ROOT
|
||||
TEMP_PATH = os.path.join(ROOT, 'temp')
|
||||
OUTPUT_PATH = os.path.join(ROOT, 'output')
|
||||
log.info('Path Constants Built.')
|
||||
|
||||
# Extension Constants
|
||||
RAW_EXTS = [
|
||||
"3fr", "ari", "arw", "bay", "braw", "crw",
|
||||
"cr2", "cr3", "cap", "data", "dcs", "dcr",
|
||||
"dng", "drf", "eip", "erf", "fff", "gpr",
|
||||
"iiq", "k25", "kdc", "mdc", "mef", "mos",
|
||||
"mrw", "nef", "nrw", "obm", "orf", "pef",
|
||||
"ptx", "pxn", "r3d", "raf", "raw", "rwl",
|
||||
"rw2", "rwz", "sr2", "srf", "srw", "tif",
|
||||
"x3f",
|
||||
]
|
||||
LOSSY_EXTS = ["jpeg", "jpg", "png"]
|
||||
@@ -1,20 +0,0 @@
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
|
||||
from .app import main
|
||||
from . import INPUT_PATH, OUTPUT_PATH
|
||||
|
||||
# Ensure that 'input' and 'output' directories are created
|
||||
# if not os.path.exists(INPUT_PATH):
|
||||
# logging.fatal('Input directory did not exist, creating and quitting.')
|
||||
# os.makedirs(INPUT_PATH)
|
||||
|
||||
# if not os.path.exists(OUTPUT_PATH):
|
||||
# logging.info('Output directory did not exist. Creating...')
|
||||
# os.makedirs(OUTPUT_PATH)
|
||||
|
||||
log = logging.getLogger('main')
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,57 +0,0 @@
|
||||
import io
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import rawpy
|
||||
import imageio
|
||||
import progressbar
|
||||
import shutil
|
||||
import logging
|
||||
|
||||
from google.cloud import vision
|
||||
from package import xmp
|
||||
from PIL import Image
|
||||
|
||||
from .xmp import XMPParser
|
||||
from .process import FileProcessor
|
||||
from . import INPUT_PATH, TEMP_PATH, OUTPUT_PATH
|
||||
from . import RAW_EXTS, LOSSY_EXTS
|
||||
|
||||
log = logging.getLogger('app')
|
||||
|
||||
def run():
|
||||
client = vision.ImageAnnotatorClient()
|
||||
|
||||
# Find files we want to process based on if they have a corresponding .XMP
|
||||
log.info('Locating processable files...')
|
||||
files = os.listdir(INPUT_PATH)
|
||||
select = [file for file in files if os.path.splitext(file)[1][1:].lower() in (RAW_EXTS + LOSSY_EXTS)]
|
||||
log.info(f'Found {len(select)} valid files')
|
||||
|
||||
# Create the 'temp' directory
|
||||
if not os.path.exists(TEMP_PATH):
|
||||
log.info('Creating temporary processing directory')
|
||||
os.makedirs(TEMP_PATH)
|
||||
if not os.path.exists(OUTPUT_PATH):
|
||||
log.info('Creating output processing directory')
|
||||
os.makedirs(OUTPUT_PATH)
|
||||
|
||||
try:
|
||||
# Process files
|
||||
for index, file in progressbar.progressbar(list(enumerate(select)), redirect_stdout=True, term_width=110):
|
||||
_, ext = os.path.splitext(file)
|
||||
ext = ext[1:].lower()
|
||||
if ext in LOSSY_EXTS or ext in RAW_EXTS:
|
||||
process = FileProcessor(file)
|
||||
log.info(f"Processing file '{file}'...")
|
||||
process.run(client)
|
||||
except Exception as error:
|
||||
log.error(str(error))
|
||||
log.warning(
|
||||
'Removing temporary directory before raising exception.')
|
||||
os.rmdir(TEMP_PATH)
|
||||
raise
|
||||
|
||||
# Remove the directory, we are done here
|
||||
log.info('Removing temporary directory.')
|
||||
os.rmdir(TEMP_PATH)
|
||||
@@ -1,12 +0,0 @@
|
||||
import logging
|
||||
import click
|
||||
import os
|
||||
from .app import run
|
||||
from . import ROOT, INPUT_PATH, OUTPUT_PATH, TEMP_PATH
|
||||
|
||||
@click.command()
|
||||
def cli():
|
||||
print('\n'.join([os.getcwd(), ROOT, INPUT_PATH, OUTPUT_PATH, TEMP_PATH]))
|
||||
print('Executing phototag service')
|
||||
run()
|
||||
print('Phototag service executed')
|
||||
@@ -1,99 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
import rawpy
|
||||
import imageio
|
||||
import io
|
||||
import iptcinfo3
|
||||
import logging
|
||||
from PIL import Image
|
||||
from google.cloud.vision import types
|
||||
from google.cloud import vision
|
||||
|
||||
from . import TEMP_PATH, INPUT_PATH, OUTPUT_PATH, RAW_EXTS, LOSSY_EXTS
|
||||
from .xmp import XMPParser
|
||||
|
||||
log = logging.getLogger('process')
|
||||
|
||||
class FileProcessor(object):
|
||||
def __init__(self, file_name: str):
|
||||
self.file_name = file_name
|
||||
self.base, self.ext = os.path.splitext(self.file_name)
|
||||
self.ext = self.ext[1:]
|
||||
# Path to temporary file that will be optimized for upload to Google
|
||||
self.temp_file_path = os.path.join(TEMP_PATH, self.base + '.jpeg')
|
||||
# Decide whether a XMP file is available
|
||||
self.xmp = None
|
||||
if self.ext.lower() in RAW_EXTS:
|
||||
self.xmp = self.base + '.xmp'
|
||||
self.input_xmp = os.path.join(INPUT_PATH, self.xmp)
|
||||
self.output_xmp = os.path.join(OUTPUT_PATH, self.xmp)
|
||||
if not os.path.exists(self.input_xmp):
|
||||
raise Exception('Sidecar file for \'{}\' does not exist.'.format(self.xmp))
|
||||
|
||||
# Optimizes a file using JPEG thumbnailing and compression.
|
||||
def _optimize(self, file: str, size: tuple = (512, 512), quality : int = 85, copy : str = None):
|
||||
image = Image.open(file)
|
||||
image.thumbnail(size, resample=Image.ANTIALIAS)
|
||||
if copy:
|
||||
image.save(copy, format='jpeg', optimize=True, quality=quality)
|
||||
else:
|
||||
image.save(file, format='jpeg', optimize=True, quality=quality)
|
||||
|
||||
def optimize(self):
|
||||
if self.xmp:
|
||||
# Long runn
|
||||
rgb = rawpy.imread(os.path.join(INPUT_PATH, self.file_name))
|
||||
imageio.imsave(self.temp_file_path, rgb.postprocess())
|
||||
rgb.close()
|
||||
self._optimize(self.temp_file_path)
|
||||
else:
|
||||
self._optimize(os.path.join(
|
||||
INPUT_PATH, self.file_name), copy=self.temp_file_path)
|
||||
|
||||
def run(self, client: vision.ImageAnnotatorClient):
|
||||
try:
|
||||
self.optimize()
|
||||
|
||||
# Open the image, read as bytes, convert to types Image
|
||||
image = Image.open(self.temp_file_path)
|
||||
bytesIO = io.BytesIO()
|
||||
image.save(bytesIO, format='jpeg')
|
||||
image.close()
|
||||
image = vision.types.Image(content=bytesIO.getvalue())
|
||||
|
||||
# Performs label detection on the image file
|
||||
response = client.label_detection(image=image)
|
||||
labels = [label.description for label in response.label_annotations]
|
||||
log.info('Keywords Identified: {}'.format(', '.join(labels)))
|
||||
|
||||
# XMP sidecar file specified, write to it using XML module
|
||||
if self.xmp:
|
||||
log.info('Writing {} tags to output XMP.'.format(len(labels)))
|
||||
parser = XMPParser(self.input_xmp)
|
||||
parser.add_keywords(labels)
|
||||
# Save the new XMP file
|
||||
log.debug('Saving to new XMP file.')
|
||||
parser.save(self.output_xmp)
|
||||
log.debug('Removing old XMP file.')
|
||||
os.remove(self.input_xmp)
|
||||
# No XMP file is specified, using IPTC tagging
|
||||
else:
|
||||
log.info('Writing {} tags to image IPTC'.format(len(labels)))
|
||||
info = iptcinfo3.IPTCInfo(os.path.join(INPUT_PATH, self.file_name))
|
||||
info['keywords'].extend(labels)
|
||||
info.save()
|
||||
# Remove the weird ghsot file created by this iptc read/writer.
|
||||
os.remove(os.path.join(INPUT_PATH, self.file_name + '~'))
|
||||
|
||||
# Copy dry-run
|
||||
# shutil.copy2(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name))
|
||||
os.rename(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name))
|
||||
except:
|
||||
self._cleanup()
|
||||
raise
|
||||
self._cleanup()
|
||||
|
||||
# Remove the temporary file (if it exists)
|
||||
def _cleanup(self):
|
||||
if os.path.exists(self.temp_file_path):
|
||||
os.remove(self.temp_file_path)
|
||||
@@ -1,61 +0,0 @@
|
||||
import xml.etree.ElementTree as ET
|
||||
import pprint as pp
|
||||
import random, string
|
||||
|
||||
rnd = lambda length=10 : ''.join(random.choices(list(string.ascii_letters), k=length))
|
||||
toText = lambda items : list(map(lambda item : item.text, items))
|
||||
|
||||
# Constant Namespace Types
|
||||
RDF = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF'
|
||||
SUBJECT = '{http://purl.org/dc/elements/1.1/}subject'
|
||||
DESCRIPTION = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Description'
|
||||
DESCRIPTION_LOWER = '{http://purl.org/dc/elements/1.1/}description'
|
||||
ALT = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Alt'
|
||||
LI = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}li'
|
||||
BAG = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Bag'
|
||||
|
||||
class XMPParser(object):
|
||||
def __init__(self, path):
|
||||
# Root tag area
|
||||
self.path = path
|
||||
self.xmp = ET.parse(path)
|
||||
self.root = self.xmp.getroot()
|
||||
self.root = self.root.find(RDF)
|
||||
self.root = self.root.find(DESCRIPTION)
|
||||
|
||||
# Description Tag
|
||||
# self._ready_descrition()
|
||||
# self.description = self.root.find(DESCRIPTION_LOWER)
|
||||
# if self.description:
|
||||
# self.description = self.description.find(ALT)
|
||||
# self.description = self.description.find(LI)
|
||||
|
||||
# Keyword Tag
|
||||
self._ready_keywords()
|
||||
self.keywords = self.root.find(SUBJECT)
|
||||
self.keywords = self.keywords.find(BAG)
|
||||
|
||||
def _ready_keywords(self):
|
||||
subject = self.root.find(SUBJECT)
|
||||
if subject:
|
||||
bag = subject.find(BAG)
|
||||
if bag:
|
||||
self.keywords = bag
|
||||
else:
|
||||
subject.append(ET.Element(BAG))
|
||||
else:
|
||||
subject = ET.Element(SUBJECT)
|
||||
subject.append(ET.Element(BAG))
|
||||
self.root.append(subject)
|
||||
|
||||
def save(self, outpath=None):
|
||||
self.xmp.write(outpath or self.path)
|
||||
|
||||
def add_keywords(self, keywords):
|
||||
elements = [ET.Element(LI) for key in keywords]
|
||||
for i, key in enumerate(elements):
|
||||
key.text = keywords[i]
|
||||
self.keywords.extend(elements)
|
||||
|
||||
def add_keyword(self, keyword):
|
||||
self.add_keywords([keyword])
|
||||
Reference in New Issue
Block a user