mirror of
https://github.com/Xevion/phototag.git
synced 2025-12-06 05:15:52 -06:00
Merge pull request #5 from Xevion/revert-4-refactor
Revert "Refactor job"
This commit is contained in:
@@ -1,22 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Path Constants
|
||||
ROOT = sys.path, [0]
|
||||
PROCESSING_PATH = os.path.join(ROOT, 'package', 'processing')
|
||||
INPUT_PATH = os.path.join(PROCESSING_PATH, 'input')
|
||||
TEMP_PATH = os.path.join(PROCESSING_PATH, 'temp')
|
||||
OUTPUT_PATH = os.path.join(PROCESSING_PATH, 'output')
|
||||
|
||||
# Extension Constants
|
||||
RAW_EXTS = [
|
||||
"3fr", "ari", "arw", "bay", "braw", "crw",
|
||||
"cr2", "cr3", "cap", "data", "dcs", "dcr",
|
||||
"dng", "drf", "eip", "erf", "fff", "gpr",
|
||||
"iiq", "k25", "kdc", "mdc", "mef", "mos",
|
||||
"mrw", "nef", "nrw", "obm", "orf", "pef",
|
||||
"ptx", "pxn", "r3d", "raf", "raw", "rwl",
|
||||
"rw2", "rwz", "sr2", "srf", "srw", "tif",
|
||||
"x3f",
|
||||
]
|
||||
LOSSY_EXTS = ["JPEG", "JPG", "PNG"]
|
||||
124
package/app.py
124
package/app.py
@@ -1,10 +1,13 @@
|
||||
import io, sys, os, time, rawpy, imageio, progressbar, shutil
|
||||
|
||||
from .xmp import XMPParser
|
||||
import io, sys, os, time, rawpy, imageio, progressbar, shutil, iptcinfo3
|
||||
from google.cloud.vision import types
|
||||
from google.cloud import vision
|
||||
from package import xmp
|
||||
from PIL import Image
|
||||
|
||||
from .process import FileProcessor
|
||||
from . import INPUT_PATH, TEMP_PATH, OUTPUT_PATH, PROCESSING_PATH
|
||||
# The name of the image file to annotate
|
||||
input_path = os.path.join(sys.path[0], 'package', 'processing', 'input')
|
||||
temp_path = os.path.join(sys.path[0], 'package', 'processing', 'temp')
|
||||
output_path = os.path.join(sys.path[0], 'package', 'processing', 'output')
|
||||
|
||||
# Process a single file in these steps:
|
||||
# 1) Create a temporary file
|
||||
@@ -12,28 +15,115 @@ from . import INPUT_PATH, TEMP_PATH, OUTPUT_PATH, PROCESSING_PATH
|
||||
# 3) Read XMP, then write new tags to it
|
||||
# 4) Delete temporary file, move NEF/JPEG and XMP
|
||||
|
||||
def process_file(file_name, xmp_name=None):
|
||||
global client
|
||||
|
||||
# Remove the temporary file
|
||||
def _cleanup():
|
||||
if os.path.exists(temp_file_path):
|
||||
# Deletes the temporary file
|
||||
os.remove(temp_file_path)
|
||||
|
||||
# Get the size of the file. Is concerned with filesize type. 1024KiB -> 1MiB
|
||||
def _size(file_path):
|
||||
size, type = os.path.getsize(file_path) / 1024, 'KiB'
|
||||
if size >= 1024: size /= 1024; type = 'MiB'
|
||||
return round(size, 2), type
|
||||
|
||||
# Optimizes a file using JPEG thumbnailing and compression.
|
||||
def _optimize(file_path, size=(512, 512), quality=85, copy=None):
|
||||
image = Image.open(file_path)
|
||||
image.thumbnail(size, resample=Image.ANTIALIAS)
|
||||
if copy:
|
||||
image.save(copy, format='jpeg', optimize=True, quality=quality)
|
||||
else:
|
||||
image.save(file_path, format='jpeg', optimize=True, quality=quality)
|
||||
|
||||
base, ext = os.path.splitext(file_name)
|
||||
temp_file_path = os.path.join(temp_path, base + '.jpeg')
|
||||
|
||||
try:
|
||||
if xmp_name:
|
||||
# Process the file into a JPEG
|
||||
rgb = rawpy.imread(os.path.join(input_path, file_name))
|
||||
imageio.imsave(temp_file_path, rgb.postprocess())
|
||||
rgb.close()
|
||||
|
||||
# Information on file sizes
|
||||
print("Raw Size: {} {}".format(*_size(os.path.join(input_path, file_name))), end=' | ')
|
||||
print("Resave Size: {} {}".format(*_size(temp_file_path)), end=' | ')
|
||||
pre = os.path.getsize(temp_file_path)
|
||||
_optimize(temp_file_path)
|
||||
post = os.path.getsize(temp_file_path)
|
||||
print("Optimized Size: {} {} ({}% savings)".format(*_size(temp_file_path), round((1.0 - (post / pre)) * 100), 2) )
|
||||
else:
|
||||
pre = os.path.getsize(os.path.join(input_path, file_name))
|
||||
_optimize(os.path.join(input_path, file_name), copy=temp_file_path)
|
||||
post = os.path.getsize(temp_file_path)
|
||||
print("Optimized Size: {} {} ({}% savings)".format(*_size(temp_file_path), round((1.0 - (post / pre)) * 100), 2) )
|
||||
|
||||
# Open the image, read as bytes, convert to types Image
|
||||
image = Image.open(temp_file_path)
|
||||
bytesIO = io.BytesIO()
|
||||
image.save(bytesIO, format='jpeg')
|
||||
image.close()
|
||||
image = vision.types.Image(content=bytesIO.getvalue())
|
||||
|
||||
# Performs label detection on the image file
|
||||
response = client.label_detection(image=image)
|
||||
labels = [label.description for label in response.label_annotations]
|
||||
print('\tLabels: {}'.format(', '.join(labels)))
|
||||
|
||||
# XMP sidecar file specified, write to it using XML module
|
||||
if xmp_name:
|
||||
print('\tWriting {} tags to output XMP...'.format(len(labels)))
|
||||
parser = xmp.XMPParser(os.path.join(input_path, xmp_name))
|
||||
parser.add_keywords(labels)
|
||||
# Save the new XMP file
|
||||
parser.save(os.path.join(output_path, xmp_name))
|
||||
# Remove the old XMP file
|
||||
os.remove(os.path.join(input_path, xmp_name))
|
||||
# No XMP file is specified, using IPTC tagging
|
||||
else:
|
||||
print('\tWriting {} tags to output {}'.format(len(labels), ext[1:].upper()))
|
||||
info = iptcinfo3.IPTCInfo(os.path.join(input_path, file_name))
|
||||
info['keywords'].extend(labels)
|
||||
info.save()
|
||||
# Remove the weird ghsot file created by this iptc read/writer.
|
||||
os.remove(os.path.join(input_path, file_name + '~'))
|
||||
|
||||
# Copy dry-run
|
||||
# shutil.copy2(os.path.join(input_path, file_name), os.path.join(output_path, file_name))
|
||||
os.rename(os.path.join(input_path, file_name), os.path.join(output_path, file_name))
|
||||
except:
|
||||
_cleanup()
|
||||
raise
|
||||
_cleanup()
|
||||
|
||||
# Driver code for the package
|
||||
def run(client):
|
||||
def run():
|
||||
global client
|
||||
|
||||
# Ensure that 'input' and 'output' directories are created
|
||||
if not os.path.exists(INPUT_PATH):
|
||||
if not os.path.exists(input_path):
|
||||
print('Input directory did not exist, creating and quitting.')
|
||||
os.makedirs(INPUT_PATH)
|
||||
os.makedirs(input_path)
|
||||
return
|
||||
|
||||
if not os.path.exists(OUTPUT_PATH):
|
||||
if not os.path.exists(output_path):
|
||||
print('Output directory did not exist. Creating...')
|
||||
os.makedirs(OUTPUT_PATH)
|
||||
os.makedirs(output_path)
|
||||
|
||||
# Clients
|
||||
client = vision.ImageAnnotatorClient()
|
||||
|
||||
# Find files we want to process based on if they have a corresponding .XMP
|
||||
files = os.listdir(INPUT_PATH)
|
||||
files = os.listdir(input_path)
|
||||
select = [file for file in files if os.path.splitext(file)[1] != '.xmp']
|
||||
|
||||
# Create the 'temp' directory
|
||||
print(f'Initializing file processing for {len(select)} files...')
|
||||
os.makedirs(TEMP_PATH)
|
||||
os.makedirs(temp_path)
|
||||
|
||||
try:
|
||||
# Process files
|
||||
@@ -67,15 +157,15 @@ def run(client):
|
||||
# Process individual file
|
||||
else:
|
||||
print('Processing file {}, \'{}\''.format(index + 1, xmps[0]), end=' | ')
|
||||
file = FileProcessor(file, xmps[0])
|
||||
elif ext in BASIC_EXTENSIONS:
|
||||
process_file(file_name=file, xmp_name=xmps[0])
|
||||
elif ext in ['.JPEG', '.JPG', '.PNG']:
|
||||
print('Processing file {}, \'{}\''.format(index + 1, file), end=' | ')
|
||||
file = FileProcessor(file, xmps[0])
|
||||
process_file(file_name=file)
|
||||
|
||||
except:
|
||||
os.rmdir(TEMP_PATH)
|
||||
os.rmdir(temp_path)
|
||||
raise
|
||||
|
||||
# Remove the directory, we are done here
|
||||
print('Cleaning up temporary directory...')
|
||||
os.rmdir(TEMP_PATH)
|
||||
os.rmdir(temp_path)
|
||||
@@ -1,105 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
import rawpy
|
||||
import imageio
|
||||
import io
|
||||
import iptcinfo3
|
||||
from PIL import Image
|
||||
from google.cloud.vision import types
|
||||
from google.cloud import vision
|
||||
|
||||
from . import TEMP_PATH, INPUT_PATH, OUTPUT_PATH
|
||||
from .xmp import XMPParser
|
||||
|
||||
class FileProcessor(object):
|
||||
def __init__(self, file_name, xmp_name=None):
|
||||
self.file_name, self.xmp_name = file_name, xmp_name
|
||||
self.base, self.ext = os.path.splitext(self.file_name)
|
||||
self.temp_file_path = os.path.join(TEMP_PATH, self.base + '.jpeg')
|
||||
|
||||
def rawOptimize(self):
|
||||
rgb = rawpy.imread(os.path.join(INPUT_PATH, self.file_name))
|
||||
imageio.imsave(temp_file_path, rgb.postprocess())
|
||||
rgb.close()
|
||||
|
||||
# Information on file sizes
|
||||
print("Raw Size: {} {}".format(*_size(os.path.join(INPUT_PATH, self.file_name))), end=' | ')
|
||||
print("Resave Size: {} {}".format(*_size(temp_file_path)), end=' | ')
|
||||
pre = os.path.getsize(temp_file_path)
|
||||
_optimize(temp_file_path)
|
||||
post = os.path.getsize(temp_file_path)
|
||||
print("Optimized Size: {} {} ({}% savings)".format(*_size(temp_file_path), round((1.0 - (post / pre)) * 100), 2) )
|
||||
|
||||
def basicOptimize(self):
|
||||
pre = os.path.getsize(os.path.join(INPUT_PATH, self.file_name))
|
||||
_optimize(os.path.join(INPUT_PATH, self.file_name), copy=temp_file_path)
|
||||
post = os.path.getsize(temp_file_path)
|
||||
print("Optimized Size: {} {} ({}% savings)".format(*_size(temp_file_path), round((1.0 - (post / pre)) * 100), 2) )
|
||||
|
||||
|
||||
def run(self, client):
|
||||
try:
|
||||
if self.xmp_name:
|
||||
# Process the file into a JPEG
|
||||
self.rawOptimize()
|
||||
else:
|
||||
self.basicOptimize()
|
||||
|
||||
# Open the image, read as bytes, convert to types Image
|
||||
image = Image.open(temp_file_path)
|
||||
bytesIO = io.BytesIO()
|
||||
image.save(bytesIO, format='jpeg')
|
||||
image.close()
|
||||
image = vision.types.Image(content=bytesIO.getvalue())
|
||||
|
||||
# Performs label detection on the image file
|
||||
response = client.label_detection(image=image)
|
||||
labels = [label.description for label in response.label_annotations]
|
||||
print('\tLabels: {}'.format(', '.join(labels)))
|
||||
|
||||
# XMP sidecar file specified, write to it using XML module
|
||||
if self.xmp_name:
|
||||
print('\tWriting {} tags to output XMP...'.format(len(labels)))
|
||||
parser = XMPParser(os.path.join(INPUT_PATH, self.xmp_name))
|
||||
parser.add_keywords(labels)
|
||||
# Save the new XMP file
|
||||
parser.save(os.path.join(OUTPUT_PATH, self.xmp_name))
|
||||
# Remove the old XMP file
|
||||
os.remove(os.path.join(INPUT_PATH, self.xmp_name))
|
||||
# No XMP file is specified, using IPTC tagging
|
||||
else:
|
||||
print('\tWriting {} tags to output {}'.format(len(labels), ext[1:].upper()))
|
||||
info = iptcinfo3.IPTCInfo(os.path.join(INPUT_PATH, self.file_name))
|
||||
info['keywords'].extend(labels)
|
||||
info.save()
|
||||
# Remove the weird ghsot file created by this iptc read/writer.
|
||||
os.remove(os.path.join(INPUT_PATH, self.file_name + '~'))
|
||||
|
||||
# Copy dry-run
|
||||
# shutil.copy2(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name))
|
||||
os.rename(os.path.join(INPUT_PATH, self.file_name), os.path.join(OUTPUT_PATH, self.file_name))
|
||||
except:
|
||||
self._cleanup()
|
||||
raise
|
||||
self._cleanup()
|
||||
|
||||
# Remove the temporary file
|
||||
def _cleanup(self):
|
||||
if os.path.exists(self.temp_file_path):
|
||||
# Deletes the temporary file
|
||||
os.remove(self.temp_file_path)
|
||||
|
||||
# Get the size of the file. Is concerned with filesize type. 1024KiB -> 1MiB
|
||||
def _size(self, file_path):
|
||||
size, type = os.path.getsize(file_path) / 1024, 'KiB'
|
||||
if size >= 1024: size /= 1024; type = 'MiB'
|
||||
return round(size, 2), type
|
||||
|
||||
# Optimizes a file using JPEG thumbnailing and compression.
|
||||
def _optimize(self, file_path, size=(512, 512), quality=85, copy=None):
|
||||
image = Image.open(file_path)
|
||||
image.thumbnail(size, resample=Image.ANTIALIAS)
|
||||
if copy:
|
||||
image.save(copy, format='jpeg', optimize=True, quality=quality)
|
||||
else:
|
||||
image.save(file_path, format='jpeg', optimize=True, quality=quality)
|
||||
Reference in New Issue
Block a user