diff --git a/server/normalization/main.py b/server/normalization/main.py index 87e70fa..2ab3aa4 100644 --- a/server/normalization/main.py +++ b/server/normalization/main.py @@ -5,7 +5,7 @@ import sys import enlighten from collections import Counter, OrderedDict from pprint import pprint -from typing import List, Optional +from typing import List, Optional, Union import click from lxml import etree @@ -19,6 +19,7 @@ logger.setLevel(logging.DEBUG) CUR_DIR = os.path.dirname(os.path.abspath(__file__)) TRUTH_DIR = os.path.join(CUR_DIR, 'truth') +CHARACTERS_DIR = os.path.join(CUR_DIR, 'characters') RAW_DIR = os.path.abspath(os.path.join(CUR_DIR, '..', 'data', 'raw')) RAW_FILES = os.listdir(RAW_DIR) @@ -158,14 +159,90 @@ def ids(): """Builds an XML file for identifying character id mappings""" logger.info("Building ID Character mapping file...") - with open(os.path.join(TRUTH_DIR, Constants.SPEAKER_MAPPING_XML), 'r') as mapping_file: - root: etree.ElementBase = etree.parse(mapping_file) + IDENTIFIER_FILE: str = os.path.join(CHARACTERS_DIR, 'identifiers.xml') - root = etree.Element("IdentifierList") - # mappings = - # for speaker in speakers: - # if speaker + with open(os.path.join(TRUTH_DIR, 'characters.xml'), 'r') as characters_file: + characters: List[str] = etree.parse(characters_file).xpath('//CharacterList/Character/text()') + logger.debug('Characters parsed.') + logger.debug(f'{len(characters)} characters parsed.') + + if not os.path.exists(CHARACTERS_DIR): + os.makedirs(CHARACTERS_DIR) + logger.info('`characters` directory created.') + + pre_existing: Dict[str, etree.Element] = None + if os.path.exists(IDENTIFIER_FILE): + logger.debug('Identifier file exists already. Pre-existing Speakers will be kept.') + + with open(IDENTIFIER_FILE, 'r') as identifier_file: + preidentifiers: etree.ElementBase = etree.parse(identifier_file) + + pre_existing = OrderedDict() + for speaker in preidentifiers.xpath('//SpeakerList/Speaker'): + speakerName = speaker.xpath('./RawText/text()')[0] + pre_existing[speakerName] = speaker + + root = etree.Element('SpeakerList') + splitPatterns: List[str] = [r'\s*,\s*', + r'\s*&\s*', + r'\s+and,?(?:\s+|$)', + r'\s*[\\/]\s*'] + splitPattern: str = '|'.join(splitPatterns) + + existing_characters_count: int = 0 + new_characters_count: int = 0 + + # Pre-existing character identifiers are kept at the top, in order. + for speakerName in characters: + if pre_existing is not None: + if speakerName in pre_existing.keys(): + root.append(pre_existing[speakerName]) + del pre_existing[speakerName] + existing_characters_count += 1 + continue + else: + logger.debug(f'New speaker: `{speakerName}`') + new_characters_count += 1 + + # New speaker to insert + speaker_element = etree.SubElement(root, 'Speaker', annotated="false") + raw_text_element = etree.SubElement(speaker_element, "RawText") + raw_text_element.text = speakerName + + split_text: List[str] = re.split(splitPattern, speakerName) + split_text = [split for split in split_text if re.match(r'\w{2,}', split) is not None] + + isCompound: bool = len(split_text) > 1 + isBackground: bool = re.search(r'#\d', speakerName) is not None # Not fool-proof, but filters some out. + + if isCompound: + speaker_element.attrib['annotated'] = "true" + annotated_text_element = etree.SubElement(speaker_element, 'AnnotatedText') + characters_element = etree.SubElement(speaker_element, 'Characters') + annotated_text_element.text = speakerName + for sub_character in split_text: + subcharacter_element = etree.SubElement(characters_element, 'Character') + subcharacter_element.text = valuify(sub_character) + subcharacter_element.attrib['type'] = 'null' + else: + character_element = etree.SubElement(speaker_element, 'Character') + character_element.attrib['type'] = 'background' if isBackground else 'null' + character_element.text = valuify(speakerName) + + logger.debug(f'{new_characters_count} new speaker elements added. {existing_characters_count} speaker elements preserved.') + + if pre_existing is not None: + unseen_chars = list(pre_existing.keys()) + if len(unseen_chars) > 0: + for unseen in unseen_chars: + root.append(pre_existing[unseen]) + logger.debug(f'Character preserved but not seen: {unseen}') + + logger.debug('Exporting identifiers file.') + with open(IDENTIFIER_FILE, 'w') as identifier_file: + etree.indent(root, space=" " * 4) + identifier_file.write(etree.tostring(root, encoding=str, pretty_print=True)) @cli.command('all') @@ -191,7 +268,8 @@ def similar(text: str, destination: Optional[bool], results: int, reversed: bool if destination: mappingType = "Destination" - counts: List[int] | List[str] = list(map(int, root.xpath('//SpeakerMappings/Mapping/@count'))) # Parse counts into integers for merge + counts: Union[List[int], List[str]] = list( + map(int, root.xpath('//SpeakerMappings/Mapping/@count'))) # Parse counts into integers for merge speakers = root.xpath(f"//SpeakerMappings/Mapping/{mappingType}/text()") if not no_merge: speakers, counts = marked_item_merge(speakers, counts) # Merge identical speakers together if results == -1: