properly change package internal name

This commit is contained in:
Xevion
2019-11-03 19:54:19 -06:00
parent fd1c617af5
commit dbf4ccfeb5
10 changed files with 292 additions and 2 deletions

2
.gitignore vendored
View File

@@ -1,5 +1,5 @@
# Custom .gitignore stuff
package/key/
phototag/config/
*.jpg
*.png
*.jpeg

30
phototag/__init__.py Normal file
View File

@@ -0,0 +1,30 @@
import os
import sys
import logging
import progressbar
# Logging and Progressbar work
progressbar.streams.wrap_stderr()
logging.basicConfig(level=logging.INFO)
log = logging.getLogger('init')
log.info('Progressbar/Logging ready.')
# Path Constants
ROOT = os.getcwd()
INPUT_PATH = ROOT
TEMP_PATH = os.path.join(ROOT, 'temp')
OUTPUT_PATH = os.path.join(ROOT, 'output')
log.info('Path Constants Built.')
# Extension Constants
RAW_EXTS = [
"3fr", "ari", "arw", "bay", "braw", "crw",
"cr2", "cr3", "cap", "data", "dcs", "dcr",
"dng", "drf", "eip", "erf", "fff", "gpr",
"iiq", "k25", "kdc", "mdc", "mef", "mos",
"mrw", "nef", "nrw", "obm", "orf", "pef",
"ptx", "pxn", "r3d", "raf", "raw", "rwl",
"rw2", "rwz", "sr2", "srf", "srw", "tif",
"x3f",
]
LOSSY_EXTS = ["jpeg", "jpg", "png"]

20
phototag/__main__.py Normal file
View File

@@ -0,0 +1,20 @@
import sys
import os
import logging
from .app import main
from . import INPUT_PATH, OUTPUT_PATH
# Ensure that 'input' and 'output' directories are created
# if not os.path.exists(INPUT_PATH):
# logging.fatal('Input directory did not exist, creating and quitting.')
# os.makedirs(INPUT_PATH)
# if not os.path.exists(OUTPUT_PATH):
# logging.info('Output directory did not exist. Creating...')
# os.makedirs(OUTPUT_PATH)
log = logging.getLogger('main')
if __name__ == "__main__":
main()

57
phototag/app.py Normal file
View File

@@ -0,0 +1,57 @@
import io
import sys
import os
import time
import rawpy
import imageio
import progressbar
import shutil
import logging
from google.cloud import vision
from package import xmp
from PIL import Image
from .xmp import XMPParser
from .process import FileProcessor
from . import INPUT_PATH, TEMP_PATH, OUTPUT_PATH
from . import RAW_EXTS, LOSSY_EXTS
log = logging.getLogger('app')
def run():
client = vision.ImageAnnotatorClient()
# Find files we want to process based on if they have a corresponding .XMP
log.info('Locating processable files...')
files = os.listdir(INPUT_PATH)
select = [file for file in files if os.path.splitext(file)[1][1:].lower() in (RAW_EXTS + LOSSY_EXTS)]
log.info(f'Found {len(select)} valid files')
# Create the 'temp' directory
if not os.path.exists(TEMP_PATH):
log.info('Creating temporary processing directory')
os.makedirs(TEMP_PATH)
if not os.path.exists(OUTPUT_PATH):
log.info('Creating output processing directory')
os.makedirs(OUTPUT_PATH)
try:
# Process files
for index, file in progressbar.progressbar(list(enumerate(select)), redirect_stdout=True, term_width=110):
_, ext = os.path.splitext(file)
ext = ext[1:].lower()
if ext in LOSSY_EXTS or ext in RAW_EXTS:
process = FileProcessor(file)
log.info(f"Processing file '{file}'...")
process.run(client)
except Exception as error:
log.error(str(error))
log.warning(
'Removing temporary directory before raising exception.')
os.rmdir(TEMP_PATH)
raise
# Remove the directory, we are done here
log.info('Removing temporary directory.')
os.rmdir(TEMP_PATH)

23
phototag/cli.py Normal file
View File

@@ -0,0 +1,23 @@
import logging
import click
import os
log = logging.getLogger('cli')
@click.group()
def cli():
pass
@cli.command()
def run():
log.info(f'CLI started tagging at {os.getcwd()}')
from .app import run
run()
@cli.command()
@click.argument('path')
def auth(path):
isrelative = click.confirm('Is this path relative to the current directory?')
if isrelative:
path = os.path.abspath(path)
log.info(f'Key file location changed to "{path}"')

0
phototag/config.py Normal file
View File

View File

99
phototag/process.py Normal file
View File

@@ -0,0 +1,99 @@
import os
import sys
import rawpy
import imageio
import io
import iptcinfo3
import logging
from PIL import Image
from google.cloud.vision import types
from google.cloud import vision
from . import TEMP_PATH, INPUT_PATH, OUTPUT_PATH, RAW_EXTS, LOSSY_EXTS
from .xmp import XMPParser
log = logging.getLogger('process')
class FileProcessor(object):
def __init__(self, file_name: str):
self.file_name = file_name
self.base, self.ext = os.path.splitext(self.file_name)
self.ext = self.ext[1:]
# Path to temporary file that will be optimized for upload to Google
self.temp_file_path = os.path.join(TEMP_PATH, self.base + '.jpeg')
# Decide whether a XMP file is available
self.xmp = None
if self.ext.lower() in RAW_EXTS:
self.xmp = self.base + '.xmp'
self.input_xmp = os.path.join(INPUT_PATH, self.xmp)
self.output_xmp = os.path.join(OUTPUT_PATH, self.xmp)
if not os.path.exists(self.input_xmp):
raise Exception('Sidecar file for \'{}\' does not exist.'.format(self.xmp))
# Optimizes a file using JPEG thumbnailing and compression.
def _optimize(self, file: str, size: tuple = (512, 512), quality : int = 85, copy : str = None):
image = Image.open(file)
image.thumbnail(size, resample=Image.ANTIALIAS)
if copy:
image.save(copy, format='jpeg', optimize=True, quality=quality)
else:
image.save(file, format='jpeg', optimize=True, quality=quality)
def optimize(self):
if self.xmp:
# Long runn
rgb = rawpy.imread(os.path.join(INPUT_PATH, self.file_name))
imageio.imsave(self.temp_file_path, rgb.postprocess())
rgb.close()
self._optimize(self.temp_file_path)
else:
self._optimize(os.path.join(
INPUT_PATH, self.file_name), copy=self.temp_file_path)
def run(self, client: vision.ImageAnnotatorClient):
try:
self.optimize()
# Open the image, read as bytes, convert to types Image
image = Image.open(self.temp_file_path)
bytesIO = io.BytesIO()
image.save(bytesIO, format='jpeg')
image.close()
image = vision.types.Image(content=bytesIO.getvalue())
# Performs label detection on the image file
response = client.label_detection(image=image)
labels = [label.description for label in response.label_annotations]
log.info('Keywords Identified: {}'.format(', '.join(labels)))
# XMP sidecar file specified, write to it using XML module
if self.xmp:
log.info('Writing {} tags to output XMP.'.format(len(labels)))
parser = XMPParser(self.input_xmp)
parser.add_keywords(labels)
# Save the new XMP file
log.debug('Saving to new XMP file.')
parser.save(self.output_xmp)
log.debug('Removing old XMP file.')
os.remove(self.input_xmp)
# No XMP file is specified, using IPTC tagging
else:
log.info('Writing {} tags to image IPTC'.format(len(labels)))
info = iptcinfo3.IPTCInfo(os.path.join(INPUT_PATH, self.file_name))
info['keywords'].extend(labels)
info.save()
# Remove the weird ghsot file created by this iptc read/writer.
os.remove(os.path.join(INPUT_PATH, self.file_name + '~'))
# Copy dry-run
# shutil.copy2(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name))
os.rename(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name))
except:
self._cleanup()
raise
self._cleanup()
# Remove the temporary file (if it exists)
def _cleanup(self):
if os.path.exists(self.temp_file_path):
os.remove(self.temp_file_path)

61
phototag/xmp.py Normal file
View File

@@ -0,0 +1,61 @@
import xml.etree.ElementTree as ET
import pprint as pp
import random, string
rnd = lambda length=10 : ''.join(random.choices(list(string.ascii_letters), k=length))
toText = lambda items : list(map(lambda item : item.text, items))
# Constant Namespace Types
RDF = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF'
SUBJECT = '{http://purl.org/dc/elements/1.1/}subject'
DESCRIPTION = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Description'
DESCRIPTION_LOWER = '{http://purl.org/dc/elements/1.1/}description'
ALT = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Alt'
LI = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}li'
BAG = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Bag'
class XMPParser(object):
def __init__(self, path):
# Root tag area
self.path = path
self.xmp = ET.parse(path)
self.root = self.xmp.getroot()
self.root = self.root.find(RDF)
self.root = self.root.find(DESCRIPTION)
# Description Tag
# self._ready_descrition()
# self.description = self.root.find(DESCRIPTION_LOWER)
# if self.description:
# self.description = self.description.find(ALT)
# self.description = self.description.find(LI)
# Keyword Tag
self._ready_keywords()
self.keywords = self.root.find(SUBJECT)
self.keywords = self.keywords.find(BAG)
def _ready_keywords(self):
subject = self.root.find(SUBJECT)
if subject:
bag = subject.find(BAG)
if bag:
self.keywords = bag
else:
subject.append(ET.Element(BAG))
else:
subject = ET.Element(SUBJECT)
subject.append(ET.Element(BAG))
self.root.append(subject)
def save(self, outpath=None):
self.xmp.write(outpath or self.path)
def add_keywords(self, keywords):
elements = [ET.Element(LI) for key in keywords]
for i, key in enumerate(elements):
key.text = keywords[i]
self.keywords.extend(elements)
def add_keyword(self, keyword):
self.add_keywords([keyword])

View File

@@ -28,7 +28,7 @@ setup(
description="",
long_description=README,
long_description_content_type="text/markdown",
url="https://github.com/xevion/photo-tagging",
url="https://github.com/xevion/phototag",
packages=find_packages(exclude=EXCLUDE_FROM_PACKAGES),
include_package_data=True,
keywords=[],