XMP restructuring, mostly ready for production usage

This commit is contained in:
Xevion
2019-08-08 16:01:52 -05:00
parent ec3e7c0fd4
commit 62e5f08deb
4 changed files with 30 additions and 37 deletions

View File

Binary file not shown.

View File

Binary file not shown.

View File

@@ -2,6 +2,7 @@ import io, sys, os, time, rawpy, imageio, progressbar
from google.cloud import vision
from google.cloud.vision import types
from PIL import Image
from package.xmp import writeTags
# The name of the image file to annotate
input_path = os.path.join(sys.path[0], 'package', 'processing', 'input')
@@ -14,7 +15,7 @@ output_path = os.path.join(sys.path[0], 'package', 'processing', 'output')
# 3) Read XMP, then write new tags to it
# 4) Delete temporary file, move NEF/JPEG and XMP
def process_file(file_name):
def process_file(file_name, xmp):
global client
# Remove the temporary file
@@ -35,7 +36,7 @@ def process_file(file_name):
image.thumbnail(size, resample=Image.ANTIALIAS)
image.save(file_path, format='jpeg', optimize=True, quality=quality)
base, ext = os.path.splitext(file_name)
base = os.path.splitext(file_name)[0]
temp_file_path = os.path.join(temp_path, base + '.jpeg')
try:
@@ -59,8 +60,14 @@ def process_file(file_name):
# Performs label detection on the image file
response = client.label_detection(image=image)
labels = response.label_annotations
print('\tLabels: {}'.format(', '.join([label.description for label in labels])))
labels = [label.description for label in response.label_annotations]
print('\tLabels: {}'.format(', '.join(labels)))
print('\tWriting {} tags to output folder...')
writeTags(os.path.join(input_path, xmp), os.path.join(output_path, xmp), labels)
print('\tMoving associated original image file...')
os.rename(os.path.join(input_path, file_name), os.path.join(output_path, file_name))
except:
_cleanup()
raise
@@ -70,6 +77,16 @@ def process_file(file_name):
def run():
global client
# Ensure that 'input' and 'output' directories are created
if not os.path.exists(input_path):
print('Input directory did not exist, creating and quitting.')
os.makedirs(input_path)
return
if not os.path.exists(output_path):
print('Output directory did not exist. Creating...')
os.makedirs(output_path)
# Clients
client = vision.ImageAnnotatorClient()
@@ -80,7 +97,7 @@ def run():
# Create the 'temp' directory
print(f'Initializing file processing for {len(select)} files...')
os.makedirs(temp_path)
try:
# Process files
for index, file in progressbar.progressbar(list(enumerate(select)), redirect_stdout=True, term_width=110):
@@ -99,7 +116,7 @@ def run():
# Process individual file
else:
print('Processing file {}, \'{}\''.format(index + 1, possibles[0]), end=' | ')
process_file(possibles[0])
process_file(file_name=possibles[0], xmp=file)
time.sleep(0.3)
except:
os.rmdir(temp_path)

View File

@@ -1,57 +1,33 @@
# from libxmp import file_to_dict
import os, sys, re
basepath = "E:\\Photography\\Colorado 2019\\"
ext_filter = lambda file : file.endswith('.NEF')
mainPattern = r'<dc:subject>\n\s+<rdf:Bag>\n(?: <rdf:li>.+<\/rdf:li>\n)*\s+<\/rdf:Bag>\n\s+<\/dc:subject>\n'
subPattern = r'<rdf:li>(.*)</rdf:li>\n'
subFormatPattern = ' <rdf:li>{}</rdf:li>\n'
def getXMP(file, basepath=None):
xmpFile = file.rfind('.')
xmpFile = file[:xmpFile] + '.xmp'
if basepath:
xmpFile = os.path.join(basepath, xmpFile)
return xmpFile
def writeTags(tags, filename):
fullpath = os.path.join(basepath, getXMP(filename))
if not os.path.exists(fullpath):
raise FileNotFoundError(f'No XMP file found for {filename}.')
data = open(fullpath, 'r').read()
def writeTags(inputPath, outputPath, tags):
data = open(inputPath, 'r').read()
# Detect and find the <dc:Subject: part
match = re.search(mainPattern, data)
# If it's not found, we just add the tags area right below the creator tag
if not match:
addition = """\n <dc:subject>\n <rdf:Bag>\n </rdf:Bag>\n </dc:subject>"""
look = data.find('</dc:creator>') + 13
data = data[:look] + addition + data[look:]
match = re.search(mainPattern, data)
print(os.path.join(basepath, filename))
print(fullpath)
print('. . .')
# Get last matching tag
submatch = None
for submatch in re.finditer(subPattern, match.group(0)):
pass
# Calculate very end
spanend = match.span()[0] + (submatch.span()[1] if submatch else 0)
# add tags to end
data = data[:spanend] + ''.join(subFormatPattern.format(tag) for tag in tags) + data[spanend:]
# Write file to disk
open(fullpath, 'w').write( data)
files = os.listdir(basepath)
print(f'Total Files: {len(files)}')
files = list(filter(ext_filter, files))
print(f'Total Files after .NEF filter: {len(files)}')
xmps = [file for file in files if os.path.exists(getXMP(file, basepath=basepath))]
print(f'Total .NEF files with .XMP pair: {len(xmps)}')
print(f'{len(xmps)} x 2 == {len(xmps) * 2}')
select = files[43]
writeTags(['helpgod', 'ohfuck'], select)
print(select)
open(outputPath, 'w+').write( data)