#!/usr/bin/env python3 """Tokenizes, verbalizes, and phonemizes text and SSML""" import itertools import logging import re import typing import xml.etree.ElementTree as etree from decimal import Decimal from pathlib import Path import babel import babel.numbers import dateparser import networkx as nx from gruut_ipa import IPA from num2words import num2words from gruut.const import ( DATA_PROP, PHONEMES_TYPE, REGEX_PATTERN, BreakNode, BreakType, BreakWordNode, EndElement, GraphType, IgnoreNode, InlineLexicon, InterpretAs, InterpretAsFormat, Lexeme, MarkNode, Node, ParagraphNode, PunctuationWordNode, Sentence, SentenceNode, SpeakNode, SSMLParsingState, TextProcessorSettings, Word, WordNode, WordRole, ) from gruut.lang import get_settings from gruut.utils import ( attrib_no_namespace, leaves, load_lexicon, maybe_split_ipa, pipeline_split, pipeline_transform, resolve_lang, tag_no_namespace, text_and_elements, ) # ----------------------------------------------------------------------------- _LOGGER = logging.getLogger("gruut.text_processor") DEFAULT_LEXICON_ID = "" # ----------------------------------------------------------------------------- class TextProcessor: """Tokenizes, verbalizes, and phonemizes text and SSML""" def __init__( self, default_lang: str = "en_US", model_prefix: str = "", lang_dirs: typing.Optional[typing.Dict[str, typing.Union[str, Path]]] = None, search_dirs: typing.Optional[typing.Iterable[typing.Union[str, Path]]] = None, settings: typing.Optional[ typing.MutableMapping[str, TextProcessorSettings] ] = None, **kwargs, ): self.default_lang = default_lang self.default_settings_kwargs = kwargs self.model_prefix = model_prefix self.search_dirs = search_dirs if lang_dirs is None: lang_dirs = {} # Convert to Paths self.lang_dirs = { dir_lang: Path(dir_path) for dir_lang, dir_path in lang_dirs.items() } if settings is None: settings = {} self.settings = settings def sentences( self, graph: GraphType, root: Node, major_breaks: bool = True, minor_breaks: bool = True, punctuations: bool = True, explicit_lang: bool = True, phonemes: bool = True, break_phonemes: bool = True, pos: bool = True, ) -> typing.Iterable[Sentence]: """Processes text and returns each sentence""" def get_lang(lang: str) -> str: if explicit_lang or (lang != self.default_lang): return lang # Implicit default language return "" sentence: typing.Optional[Sentence] = None par_idx: int = -1 sent_idx: int = 0 sent_pause_before_ms: int = 0 sent_marks_before: typing.List[str] = [] word_pause_before_ms: int = 0 word_marks_before: typing.List[str] = [] sentences: typing.List[Sentence] = [] for dfs_node in nx.dfs_preorder_nodes(graph, root.node): node = graph.nodes[dfs_node][DATA_PROP] if isinstance(node, ParagraphNode): par_idx += 1 sent_idx = 0 elif isinstance(node, SentenceNode): # New sentence sentences.append( Sentence( idx=sent_idx, par_idx=par_idx, text="", text_with_ws="", text_spoken="", voice=node.voice, lang=get_lang(node.lang), pause_before_ms=sent_pause_before_ms, marks_before=(sent_marks_before if sent_marks_before else None), ) ) sent_pause_before_ms = 0 sent_marks_before = [] sent_idx += 1 elif graph.out_degree(dfs_node) == 0: if isinstance(node, WordNode): assert sentences, "No sentence" sentence = sentences[-1] word_node = typing.cast(WordNode, node) sentence.words.append( Word( idx=len(sentence.words), sent_idx=sentence.idx, par_idx=sentence.par_idx, text=word_node.text, text_with_ws=word_node.text_with_ws, phonemes=word_node.phonemes if phonemes else None, pos=word_node.pos if pos else None, lang=get_lang(node.lang), voice=node.voice, pause_before_ms=word_pause_before_ms, marks_before=( word_marks_before if word_marks_before else None ), ) ) word_pause_before_ms = 0 word_marks_before = [] elif isinstance(node, BreakWordNode): assert sentences, "No sentence" sentence = sentences[-1] break_word_node = typing.cast(BreakWordNode, node) is_minor_break = break_word_node.break_type == BreakType.MINOR is_major_break = break_word_node.break_type == BreakType.MAJOR if (minor_breaks and is_minor_break) or ( major_breaks and is_major_break ): sentence.words.append( Word( idx=len(sentence.words), sent_idx=sentence.idx, par_idx=sentence.par_idx, text=break_word_node.text, text_with_ws=break_word_node.text_with_ws, phonemes=self._phonemes_for_break( break_word_node.break_type, lang=break_word_node.lang, ) if phonemes and break_phonemes else None, is_minor_break=is_minor_break, is_major_break=is_major_break, lang=get_lang(node.lang), voice=node.voice, pause_before_ms=word_pause_before_ms, marks_before=( word_marks_before if word_marks_before else None ), ) ) word_pause_before_ms = 0 word_marks_before = [] elif punctuations and isinstance(node, PunctuationWordNode): assert sentences, "No sentence" sentence = sentences[-1] punct_word_node = typing.cast(PunctuationWordNode, node) sentence.words.append( Word( idx=len(sentence.words), sent_idx=sentence.idx, par_idx=sentence.par_idx, text=punct_word_node.text, text_with_ws=punct_word_node.text_with_ws, is_punctuation=True, lang=get_lang(punct_word_node.lang), pause_before_ms=word_pause_before_ms, marks_before=( word_marks_before if word_marks_before else None ), ) ) word_pause_before_ms = 0 word_marks_before = [] elif isinstance(node, BreakNode): # Pause for some time break_node = typing.cast(BreakNode, node) break_parent = self._find_parent( graph, node, (SentenceNode, ParagraphNode, SpeakNode) ) if break_parent is not None: break_ms = break_node.get_milliseconds() break_parent_edges = list(graph.out_edges(break_parent.node)) break_edge_idx = break_parent_edges.index( (break_parent.node, break_node.node) ) is_last_edge = break_edge_idx == (len(break_parent_edges) - 1) if isinstance(break_parent, SentenceNode): assert sentences sentence = sentences[-1] if is_last_edge: # End of sentence, add pause after sentence.pause_after_ms += break_ms elif sentence.words: # Between words, add pause after previous word sentence.words[-1].pause_after_ms += break_ms else: # Before first word, set pause for first word word_pause_before_ms += break_ms elif isinstance(break_parent, ParagraphNode): if sentences and (sentences[-1].par_idx == par_idx): # Between sentences in the same paragraph, add pause after previous sentence sentences[-1].pause_after_ms += break_ms else: # Add pause to beginning of next sentence sent_pause_before_ms += break_ms elif isinstance(break_parent, SpeakNode): if sentences: # After paragraphs or sentences sentences[-1].pause_after_ms += break_ms else: # Before any paragraphs or sentences sent_pause_before_ms += break_ms elif isinstance(node, MarkNode): # User-defined mark mark_node = typing.cast(MarkNode, node) mark_name = mark_node.name mark_parent = self._find_parent( graph, node, (SentenceNode, ParagraphNode, SpeakNode) ) if mark_parent is not None: mark_parent_edges = list(graph.out_edges(mark_parent.node)) mark_edge_idx = mark_parent_edges.index( (mark_parent.node, mark_node.node) ) is_last_edge = mark_edge_idx == (len(mark_parent_edges) - 1) if isinstance(mark_parent, SentenceNode): assert sentences sentence = sentences[-1] if is_last_edge: # End of sentence, add mark after if sentence.marks_after is None: sentence.marks_after = [] sentence.marks_after.append(mark_name) elif sentence.words: # Between words, add pause after previous word last_word = sentence.words[-1] if last_word.marks_after is None: last_word.marks_after = [] last_word.marks_after.append(mark_name) else: # Before first word, set pause for first word word_marks_before.append(mark_name) elif isinstance(mark_parent, ParagraphNode): if sentences and (sentences[-1].par_idx == par_idx): # Between sentences in the same paragraph, add pause after previous sentence last_sentence = sentences[-1] if last_sentence.marks_after is None: last_sentence.marks_after = [] last_sentence.marks_after.append(mark_name) else: # Add pause to beginning of next sentence sent_marks_before.append(mark_name) elif isinstance(mark_parent, SpeakNode): if sentences: # After paragraphs or sentences last_sentence = sentences[-1] if last_sentence.marks_after is None: last_sentence.marks_after = [] last_sentence.marks_after.append(mark_name) else: # Before any paragraphs or sentences sent_marks_before.append(mark_name) # Post-process sentences to fix up text, voice, etc. for sentence in sentences: settings = self.get_settings(sentence.lang) if settings.keep_whitespace: # Whitespace is preseved sentence.text_with_ws = "".join(w.text_with_ws for w in sentence.words) else: # Make a best guess. # The join string is used before spoken words (except the first word). # This should have the effect of keeping punctuation next to words. word_texts: typing.List[str] = [] for word in sentence.words: if word.is_spoken: if word_texts: word_texts.append(f"{settings.join_str}{word.text}") else: word_texts.append(word.text) else: word_texts.append(word.text) sentence.text_with_ws = "".join(word_texts) sentence.text = settings.normalize_whitespace(sentence.text_with_ws) sentence.text_spoken = settings.join_str.join( w.text for w in sentence.words if w.is_spoken ) # Normalize voice sent_voice = sentence.voice # Get voice used across all words for word in sentence.words: if word.voice: if sent_voice and (sent_voice != word.voice): # Multiple voices sent_voice = "" break sent_voice = word.voice if sent_voice: sentence.voice = sent_voice # Set voice on all words for word in sentence.words: word.voice = sent_voice return sentences def words(self, graph: GraphType, root: Node, **kwargs) -> typing.Iterable[Word]: """Processes text and returns each word""" for sent in self.sentences(graph, root, **kwargs): for word in sent: yield word def get_settings(self, lang: typing.Optional[str] = None) -> TextProcessorSettings: """Gets or creates settings for a language""" lang = lang or self.default_lang lang_settings = self.settings.get(lang) if lang_settings is not None: return lang_settings # Try again with resolved language resolved_lang = resolve_lang(lang) lang_settings = self.settings.get(resolved_lang) if lang_settings is not None: # Patch for the future self.settings[lang] = self.settings[resolved_lang] return lang_settings _LOGGER.debug( "No custom settings for language %s (%s). Creating default settings.", lang, resolved_lang, ) # Create default settings for language lang_dir = self.lang_dirs.get(lang) lang_settings = get_settings( lang, lang_dir=lang_dir, model_prefix=self.model_prefix, search_dirs=self.search_dirs, **self.default_settings_kwargs, ) self.settings[lang] = lang_settings self.settings[resolved_lang] = lang_settings return lang_settings # ------------------------------------------------------------------------- # Processing # ------------------------------------------------------------------------- def __call__(self, *args, **kwargs): """Processes text or SSML""" return self.process(*args, **kwargs) def process( self, text: str, lang: typing.Optional[str] = None, ssml: bool = False, pos: bool = True, phonemize: bool = True, post_process: bool = True, add_speak_tag: bool = True, detect_numbers: bool = True, detect_currency: bool = True, detect_dates: bool = True, detect_times: bool = True, verbalize_numbers: bool = True, verbalize_currency: bool = True, verbalize_dates: bool = True, verbalize_times: bool = True, max_passes: int = 5, ) -> typing.Tuple[GraphType, Node]: """ Processes text or SSML Args: text: input text or SSML (ssml=True) lang: default language of input text ssml: True if input text is SSML pos: False if part of speech tagging should be disabled phonemize: False if phonemization should be disabled post_process: False if sentence/graph post-processing should be disabled add_speak_tag: True if should be automatically added to input text when ssml=True detect_numbers: True if numbers should be annotated in text (interpret_as="number") detect_currency: True if currency amounts should be annotated in text (interpret_as="currency") detect_dates: True if dates should be annotated in text (interpret_as="date") detect_times: True if clock times should be annotated in text (interpret_as="time") verbalize_numbers: True if annotated numbers should be expanded into words verbalize_currency: True if annotated currency amounts should be expanded into words verbalize_dates: True if annotated dates should be expanded into words verbalize_times: True if annotated clock times should be expanded into words Returns: graph, root: text graph and root node """ if ssml: try: root_element = etree.fromstring(text) except Exception as e: if add_speak_tag: # Try wrapping text in and parsing again root_element = etree.fromstring(f"{text}") else: # Log and re-raise exception _LOGGER.exception("TextProcessor.process") raise e def iter_elements(): yield from text_and_elements(root_element) else: # Not XML def iter_elements(): yield text graph = typing.cast(GraphType, nx.DiGraph()) # Parse XML last_paragraph: typing.Optional[ParagraphNode] = None last_sentence: typing.Optional[SentenceNode] = None last_speak: typing.Optional[SpeakNode] = None root: typing.Optional[SpeakNode] = None parsing_state = SSMLParsingState.DEFAULT # [voice] voice_stack: typing.List[str] = [] # [(interpret_as, format)] say_as_stack: typing.List[typing.Tuple[str, str]] = [] # [(tag, lang)] lang_stack: typing.List[typing.Tuple[str, str]] = [] current_lang: str = lang or self.default_lang # [lexicon.id] lookup_stack: typing.List[str] = [] lexicon_id: typing.Optional[str] = None lexeme: typing.Optional[Lexeme] = None # id -> lexicon inline_lexicons: typing.Dict[str, InlineLexicon] = {} # True if current word is the last one is_last_word: bool = False # Current word's role word_role: typing.Optional[str] = None # Alias from last_alias: typing.Optional[str] = None # Used to skip skip_elements: bool = False # Phonemes to use for next word(s) word_phonemes: typing.Optional[typing.List[typing.List[str]]] = None # Create __init__ args for new Node def scope_kwargs(target_class): scope = {} if voice_stack: scope["voice"] = voice_stack[-1] scope["lang"] = current_lang if target_class is WordNode: if say_as_stack: scope["interpret_as"], scope["format"] = say_as_stack[-1] if word_role is not None: scope["role"] = word_role if lookup_stack: # Lexicon ids in order of look up scope["lexicon_ids"] = list(reversed(lookup_stack)) return scope def in_inline_lexicon( word_text: str, word_role: typing.Optional[str] = None ) -> bool: if inline_lexicons: for inline_lexicon_id in itertools.chain( lookup_stack, [DEFAULT_LEXICON_ID] ): maybe_lexicon = inline_lexicons.get(inline_lexicon_id) if maybe_lexicon is None: continue maybe_role_phonemes = maybe_lexicon.words.get(word_text) if maybe_role_phonemes is None: continue if (word_role is not None) and (word_role in maybe_role_phonemes): # Role-specific pronunciation return True if WordRole.DEFAULT in maybe_role_phonemes: # Default pronunciation return True # No inline pronunciation return False # Process sub-elements and text chunks for elem_or_text in iter_elements(): if isinstance(elem_or_text, str): if skip_elements: # Inside continue # Text chunk text = typing.cast(str, elem_or_text) # inside if parsing_state == SSMLParsingState.IN_LEXICON_GRAPHEME: assert lexeme is not None lexeme.grapheme = text.strip() continue # inside if parsing_state == SSMLParsingState.IN_LEXICON_PHONEME: assert lexeme is not None text = text.strip() # Phonemes will be split on whitespace if at least one # space is present, otherwise assume phonemes = # graphemes. lexeme.phonemes = maybe_split_ipa(text) continue if last_alias is not None: # Iniside a text = last_alias if last_speak is None: # Implicit last_speak = SpeakNode(node=len(graph), implicit=True) graph.add_node(last_speak.node, data=last_speak) if root is None: root = last_speak assert last_speak is not None if last_paragraph is None: # Implicit

p_node = ParagraphNode( node=len(graph), implicit=True, **scope_kwargs(ParagraphNode) ) graph.add_node(p_node.node, data=p_node) graph.add_edge(last_speak.node, p_node.node) last_paragraph = p_node assert last_paragraph is not None if last_sentence is None: # Implicit s_node = SentenceNode( node=len(graph), implicit=True, **scope_kwargs(SentenceNode) ) graph.add_node(s_node.node, data=s_node) graph.add_edge(last_paragraph.node, s_node.node) last_sentence = s_node assert last_sentence is not None if parsing_state == SSMLParsingState.IN_WORD: # No splitting word_text = text settings = self.get_settings(current_lang) if ( settings.keep_whitespace and (not is_last_word) and (not word_text.endswith(settings.join_str)) ): word_text += settings.join_str word_kwargs = scope_kwargs(WordNode) if word_phonemes: word_kwargs["phonemes"] = word_phonemes.pop() word_text_norm = settings.normalize_whitespace(word_text) word_node = WordNode( node=len(graph), text=word_text_norm, text_with_ws=word_text, in_lexicon=( in_inline_lexicon(word_text_norm, word_role) or self._is_word_in_lexicon(word_text_norm, settings) ), **word_kwargs, ) graph.add_node(word_node.node, data=word_node) graph.add_edge(last_sentence.node, word_node.node) else: # Split by whitespace self._pipeline_tokenize( graph, last_sentence, text, word_phonemes=word_phonemes, scope_kwargs=scope_kwargs(WordNode), in_inline_lexicon=in_inline_lexicon, ) elif isinstance(elem_or_text, EndElement): # End of an element (e.g., ) end_elem = typing.cast(EndElement, elem_or_text) end_tag = tag_no_namespace(end_elem.element.tag) if end_tag == "voice": if voice_stack: voice_stack.pop() elif end_tag == "say-as": if say_as_stack: say_as_stack.pop() elif end_tag == "lookup": if lookup_stack: lookup_stack.pop() elif end_tag == "lexicon": # Done parsing parsing_state = SSMLParsingState.DEFAULT lexicon_id = None elif (end_tag == "grapheme") and ( parsing_state == SSMLParsingState.IN_LEXICON_GRAPHEME ): # Done with lexicon grapheme parsing_state = SSMLParsingState.IN_LEXICON elif (end_tag == "phoneme") and ( parsing_state == SSMLParsingState.IN_LEXICON_PHONEME ): # Done with lexicon phoneme parsing_state = SSMLParsingState.IN_LEXICON elif (end_tag == "lexeme") and ( parsing_state == SSMLParsingState.IN_LEXICON ): # Done with lexicon entry assert lexeme is not None, "No lexeme" assert ( lexeme.phonemes is not None ), f"No phoneme for lexeme: {lexeme}" assert lexicon_id is not None, "No lexicon id" lexicon = inline_lexicons.get(lexicon_id) assert lexicon is not None, f"No lexicon for id {lexicon_id}" # Get or create role -> phonemes map role_phonemes: typing.Dict[str, PHONEMES_TYPE] = lexicon.words.get( lexeme.grapheme, {} ) if lexeme.roles: # Add phonemes for each role for role in lexeme.roles: role_phonemes[role] = lexeme.phonemes else: # Default (empty) role only role_phonemes[WordRole.DEFAULT] = lexeme.phonemes lexicon.words[lexeme.grapheme] = role_phonemes # Reset state lexeme = None else: if lang_stack and (lang_stack[-1][0] == end_tag): lang_stack.pop() if lang_stack: current_lang = lang_stack[-1][1] # tag, lang else: current_lang = self.default_lang if end_tag in {"w", "token"}: # End of word parsing_state = SSMLParsingState.DEFAULT is_last_word = False word_role = None elif end_tag == "s": # End of sentence last_sentence = None elif end_tag == "p": # End of paragraph last_paragraph = None elif end_tag == "speak": # End of speak last_speak = root elif end_tag == "sub": # End of sub last_alias = None elif end_tag in {"metadata", "meta"}: # End of metadata skip_elements = False elif end_tag == "phoneme": # End of phoneme word_phonemes = None else: if skip_elements: # Inside continue # Start of an element (e.g.,

) elem, elem_metadata = elem_or_text elem = typing.cast(etree.Element, elem) # Optional metadata for the element elem_metadata = typing.cast( typing.Optional[typing.Dict[str, typing.Any]], elem_metadata ) elem_tag = tag_no_namespace(elem.tag) if elem_tag == "speak": # Explicit maybe_lang = attrib_no_namespace(elem, "lang") if maybe_lang: lang_stack.append((elem_tag, maybe_lang)) current_lang = maybe_lang speak_node = SpeakNode( node=len(graph), element=elem, **scope_kwargs(SpeakNode) ) if root is None: root = speak_node graph.add_node(speak_node.node, data=root) last_speak = root elif elem_tag == "voice": # Set voice scope voice_name = attrib_no_namespace(elem, "name") voice_stack.append(voice_name) elif elem_tag == "p": # Explicit paragraph if last_speak is None: # Implicit last_speak = SpeakNode(node=len(graph), implicit=True) graph.add_node(last_speak.node, data=last_speak) if root is None: root = last_speak assert last_speak is not None maybe_lang = attrib_no_namespace(elem, "lang") if maybe_lang: lang_stack.append((elem_tag, maybe_lang)) current_lang = maybe_lang p_node = ParagraphNode( node=len(graph), element=elem, **scope_kwargs(ParagraphNode) ) graph.add_node(p_node.node, data=p_node) graph.add_edge(last_speak.node, p_node.node) last_paragraph = p_node # Force a new sentence to begin last_sentence = None elif elem_tag == "s": # Explicit sentence if last_speak is None: # Implicit last_speak = SpeakNode(node=len(graph), implicit=True) graph.add_node(last_speak.node, data=last_speak) if root is None: root = last_speak assert last_speak is not None if last_paragraph is None: # Implicit paragraph p_node = ParagraphNode( node=len(graph), **scope_kwargs(ParagraphNode) ) graph.add_node(p_node.node, data=p_node) graph.add_edge(last_speak.node, p_node.node) last_paragraph = p_node maybe_lang = attrib_no_namespace(elem, "lang") if maybe_lang: lang_stack.append((elem_tag, maybe_lang)) current_lang = maybe_lang s_node = SentenceNode( node=len(graph), element=elem, **scope_kwargs(SentenceNode) ) graph.add_node(s_node.node, data=s_node) graph.add_edge(last_paragraph.node, s_node.node) last_sentence = s_node elif elem_tag in {"w", "token"}: # Explicit word parsing_state = SSMLParsingState.IN_WORD is_last_word = ( elem_metadata.get("is_last", False) if elem_metadata else False ) maybe_lang = attrib_no_namespace(elem, "lang") if maybe_lang: lang_stack.append((elem_tag, maybe_lang)) current_lang = maybe_lang word_role = attrib_no_namespace(elem, "role") elif elem_tag == "break": # Break last_target = last_sentence or last_paragraph or last_speak assert last_target is not None break_node = BreakNode( node=len(graph), element=elem, time=attrib_no_namespace(elem, "time", ""), ) graph.add_node(break_node.node, data=break_node) graph.add_edge(last_target.node, break_node.node) elif elem_tag == "mark": # Mark last_target = last_sentence or last_paragraph or last_speak assert last_target is not None mark_node = MarkNode( node=len(graph), element=elem, name=attrib_no_namespace(elem, "name", ""), ) graph.add_node(mark_node.node, data=mark_node) graph.add_edge(last_target.node, mark_node.node) elif elem_tag == "say-as": say_as_stack.append( ( attrib_no_namespace(elem, "interpret-as", ""), attrib_no_namespace(elem, "format", ""), ) ) elif elem_tag == "sub": # Sub last_alias = attrib_no_namespace(elem, "alias", "") elif elem_tag in {"metadata", "meta"}: # Metadata skip_elements = True elif (elem_tag == "phoneme") and ( parsing_state != SSMLParsingState.IN_LEXICON ): # Phonemes word_phonemes_strs = attrib_no_namespace(elem, "ph", "").split() if word_phonemes_strs: # Phonemes will be split on whitespace if at least one # space is present, otherwise assume phonemes = # graphemes. word_phonemes = [ maybe_split_ipa(phoneme_str) for phoneme_str in word_phonemes_strs ] else: word_phonemes = None elif elem_tag == "lang": # Set language maybe_lang = attrib_no_namespace(elem, "lang", "") if maybe_lang: lang_stack.append((elem_tag, maybe_lang)) current_lang = maybe_lang elif elem_tag == "lookup": lookup_id = attrib_no_namespace(elem, "ref") assert lookup_id is not None, f"Lookup id required ({elem})" lookup_stack.append(lookup_id) elif elem_tag == "lexicon": # Inline pronunciaton lexicon # NOTE: Empty lexicon id means the "default" inline lexicon ( not required) lexicon_id = attrib_no_namespace(elem, "id", DEFAULT_LEXICON_ID) assert lexicon_id is not None lexicon_alphabet = ( attrib_no_namespace(elem, "alphabet", "").strip().lower() ) inline_lexicons[lexicon_id] = InlineLexicon( lexicon_id=lexicon_id, alphabet=lexicon_alphabet ) lexicon_uri = attrib_no_namespace(elem, "uri", "") if lexicon_uri: # Lexicon defined externally _LOGGER.debug( "Loading pronunciation lexicon from %s", lexicon_uri ) load_lexicon(lexicon_uri, inline_lexicons[lexicon_id]) else: # Lexicon defined within this document parsing_state = SSMLParsingState.IN_LEXICON elif (elem_tag == "lexeme") and ( parsing_state == SSMLParsingState.IN_LEXICON ): if lexeme is None: lexeme = Lexeme() role_str = attrib_no_namespace(elem, "role") if role_str: lexeme.roles = set(role_str.strip().split()) elif (elem_tag == "grapheme") and ( parsing_state == SSMLParsingState.IN_LEXICON ): # Inline pronunciaton lexicon (grapheme) parsing_state = SSMLParsingState.IN_LEXICON_GRAPHEME if lexeme is None: lexeme = Lexeme() elif (elem_tag == "phoneme") and ( parsing_state == SSMLParsingState.IN_LEXICON ): # Inline pronunciaton lexicon (phoneme) parsing_state = SSMLParsingState.IN_LEXICON_PHONEME if lexeme is None: lexeme = Lexeme() assert root is not None # Do multiple passes over the graph num_passes_left = max_passes while num_passes_left > 0: was_changed = False # Do replacements before minor/major breaks if pipeline_split(self._split_replacements, graph, root): was_changed = True # Split punctuations (quotes, etc.) before breaks if pipeline_split(self._split_punctuations, graph, root): was_changed = True # Split on minor breaks (commas, etc.) if pipeline_split(self._split_minor_breaks, graph, root): was_changed = True # Expand abbrevations before major breaks if pipeline_split(self._split_abbreviations, graph, root): was_changed = True # Break apart initialisms (e.g., TTS or T.T.S.) before major breaks if pipeline_split(self._split_initialism, graph, root): was_changed = True # Split on major breaks (periods, etc.) if pipeline_split(self._split_major_breaks, graph, root): was_changed = True # Break apart sentences using BreakWordNodes if self._break_sentences(graph, root): was_changed = True # spell-out (e.g., abc -> a b c) before number expansion if pipeline_split(self._split_spell_out, graph, root): was_changed = True # Transform text into known classes. # # The order here is very important, since words with "interpret_as" # set will be skipped by later transformations. # # Dates are detected first so words like "1.1.2000" are not parsed # as numbers by Babel (the de_DE locale will parse this as 112000). # if detect_dates: if pipeline_transform(self._transform_date, graph, root): was_changed = True if detect_currency: if pipeline_transform(self._transform_currency, graph, root): was_changed = True if detect_numbers: if pipeline_transform(self._transform_number, graph, root): was_changed = True if detect_times: if pipeline_transform(self._transform_time, graph, root): was_changed = True # Verbalize known classes if verbalize_dates: if pipeline_transform(self._verbalize_date, graph, root): was_changed = True if verbalize_times: if pipeline_transform(self._verbalize_time, graph, root): was_changed = True if verbalize_numbers: if pipeline_transform(self._verbalize_number, graph, root): was_changed = True if verbalize_currency: if pipeline_transform(self._verbalize_currency, graph, root): was_changed = True # Break apart words if pipeline_split(self._break_words, graph, root): was_changed = True # Ignore non-words if pipeline_split(self._split_ignore_non_words, graph, root): was_changed = True if not was_changed: # No changes, so we can stop break num_passes_left -= 1 # Gather words from leaves of the tree, group by sentence def process_sentence(words: typing.List[WordNode]): if pos: pos_settings = self.get_settings(node.lang) if pos_settings.get_parts_of_speech is not None: pos_tags = pos_settings.get_parts_of_speech( [word.text for word in words] ) for word, pos_tag in zip(words, pos_tags): word.pos = pos_tag if not word.role: word.role = f"gruut:{pos_tag}" if phonemize: # Add phonemes to word for word in words: if word.phonemes: # Word already has phonemes continue lexicon_ids: typing.List[str] = [] if word.lexicon_ids: lexicon_ids.extend(word.lexicon_ids) lexicon_ids.append(DEFAULT_LEXICON_ID) # Look up phonemes from inline for lexicon_id in lexicon_ids: lexicon = inline_lexicons.get(lexicon_id) if lexicon is None: continue maybe_role_phonemes = lexicon.words.get(word.text) if maybe_role_phonemes is None: continue maybe_phonemes = maybe_role_phonemes.get(word.role) if (maybe_phonemes is None) and (word.role != WordRole.DEFAULT): # Try again with default role maybe_phonemes = maybe_role_phonemes.get(WordRole.DEFAULT) if maybe_phonemes is not None: # Found inline pronunciation word.phonemes = maybe_phonemes break if word.phonemes: # Got phonemes from inline lexicon continue phonemize_settings = self.get_settings(word.lang) if phonemize_settings.lookup_phonemes is not None: word.phonemes = phonemize_settings.lookup_phonemes( word.text, word.role ) if (not word.phonemes) and ( phonemize_settings.guess_phonemes is not None ): word.phonemes = phonemize_settings.guess_phonemes( word.text, word.role ) # Process tree leaves sentence_words: typing.List[WordNode] = [] for dfs_node in nx.dfs_preorder_nodes(graph, root.node): node = graph.nodes[dfs_node][DATA_PROP] if isinstance(node, SentenceNode): if sentence_words: process_sentence(sentence_words) sentence_words = [] elif graph.out_degree(dfs_node) == 0: if isinstance(node, WordNode): word_node = typing.cast(WordNode, node) sentence_words.append(word_node) if sentence_words: # Final sentence process_sentence(sentence_words) sentence_words = [] if post_process: # Post-process sentences for dfs_node in nx.dfs_preorder_nodes(graph, root.node): node = graph.nodes[dfs_node][DATA_PROP] if isinstance(node, SentenceNode): sent_node = typing.cast(SentenceNode, node) sent_settings = self.get_settings(sent_node.lang) if sent_settings.post_process_sentence is not None: sent_settings.post_process_sentence( graph, sent_node, sent_settings ) # Post process entire graph self.post_process_graph(graph, root) return graph, root def post_process_graph(self, graph: GraphType, root: Node): """User-defined post-processing of entire graph""" pass # ------------------------------------------------------------------------- # Pipeline (custom) # ------------------------------------------------------------------------- def _break_sentences(self, graph: GraphType, root: Node) -> bool: """Break sentences apart at BreakWordNode(break_type="major") nodes.""" was_changed = False # This involves: # 1. Identifying where in the edge list of sentence the break occurs # 2. Creating a new sentence next to the existing one in the parent paragraph # 3. Moving everything after the break into the new sentence for leaf_node in list(leaves(graph, root)): if not isinstance(leaf_node, BreakWordNode): # Not a break continue break_word_node = typing.cast(BreakWordNode, leaf_node) if break_word_node.break_type != BreakType.MAJOR: # Not a major break continue # Get the path from the break up to the nearest sentence parent_node: int = next(iter(graph.predecessors(break_word_node.node))) parent: Node = graph.nodes[parent_node][DATA_PROP] s_path: typing.List[Node] = [parent] while not isinstance(parent, SentenceNode): parent_node = next(iter(graph.predecessors(parent_node))) parent = graph.nodes[parent_node][DATA_PROP] s_path.append(parent) # Should at least be [WordNode, SentenceNode] assert len(s_path) >= 2 s_node = s_path[-1] assert isinstance(s_node, SentenceNode) if not s_node.implicit: # Don't break apart explicit sentences continue # Probably a WordNode below_s_node = s_path[-2] # Edges after the break will need to be moved to the new sentence s_edges = list(graph.out_edges(s_node.node)) break_edge_idx = s_edges.index((s_node.node, below_s_node.node)) edges_to_move = s_edges[break_edge_idx + 1 :] if not edges_to_move: # Final sentence, nothing to move continue # Locate parent paragraph so we can create a new sentence p_node = self._find_parent(graph, s_node, ParagraphNode) assert p_node is not None # Find the index of the edge between the paragraph and the current sentence p_s_edge = (p_node.node, s_node.node) p_edges = list(graph.out_edges(p_node.node)) s_edge_idx = p_edges.index(p_s_edge) # Remove existing edges from the paragraph graph.remove_edges_from(p_edges) # Create a sentence and add an edge to it right after the current sentence new_s_node = SentenceNode(node=len(graph), implicit=True) graph.add_node(new_s_node.node, data=new_s_node) p_edges.insert(s_edge_idx + 1, (p_node.node, new_s_node.node)) # Insert paragraph edges with new sentence graph.add_edges_from(p_edges) # Move edges from current sentence to new sentence graph.remove_edges_from(edges_to_move) graph.add_edges_from([(new_s_node.node, v) for (u, v) in edges_to_move]) was_changed = True return was_changed def _break_words(self, graph: GraphType, node: Node): """Break apart words according to work breaks pattern""" if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if word.interpret_as or word.in_lexicon or (not word.implicit): # Don't interpret words that are spoken for or explicit words () return settings = self.get_settings(word.lang) if settings.word_breaks_pattern is None: # No pattern set for this language return parts = settings.word_breaks_pattern.split(word.text) if len(parts) < 2: # Didn't split return # Preserve whitespace first_ws, last_ws = settings.get_whitespace(word.text_with_ws) last_part_idx = len(parts) - 1 for part_idx, part_text in enumerate(parts): part_text_norm = settings.normalize_whitespace(part_text) if not part_text_norm: continue if settings.keep_whitespace: if part_idx == 0: part_text = first_ws + part_text if part_idx == last_part_idx: part_text += last_ws else: part_text += settings.join_str yield WordNode, { "text": part_text_norm, "text_with_ws": part_text, "implicit": True, "lang": word.lang, "voice": word.voice, "in_lexicon": self._is_word_in_lexicon(part_text_norm, settings), "is_from_broken_word": True, } def _split_punctuations(self, graph: GraphType, node: Node): if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if word.interpret_as or word.in_lexicon: # Don't interpret words that are spoken for return settings = self.get_settings(word.lang) if (settings.begin_punctuations_pattern is None) and ( settings.end_punctuations_pattern is None ): # No punctuation patterns return word_text = word.text first_ws, last_ws = settings.get_whitespace(word.text_with_ws) has_punctuation = False # Punctuations at the beginning of the word if settings.begin_punctuations_pattern is not None: # Split into begin punctuation and rest of word parts = list( filter( None, settings.begin_punctuations_pattern.split(word_text, maxsplit=1), ) ) first_word = True while word_text and (len(parts) == 2): punct_text, word_text = parts if first_word: # Preserve leadingwhitespace punct_text = first_ws + punct_text first_word = False punct_text_norm = settings.normalize_whitespace(punct_text) has_punctuation = True yield PunctuationWordNode, { "text": punct_text_norm, "text_with_ws": punct_text, "implicit": True, "lang": word.lang, "voice": word.voice, } parts = list( filter( None, settings.begin_punctuations_pattern.split( word_text, maxsplit=1 ), ) ) # Punctuations at the end of the word end_punctuations: typing.List[str] = [] if settings.end_punctuations_pattern is not None: # Split into rest of word and end punctuation parts = list( filter( None, settings.end_punctuations_pattern.split(word_text, maxsplit=1) ) ) while word_text and (len(parts) == 2): word_text, punct_text = parts has_punctuation = True end_punctuations.append(punct_text) parts = list( filter( None, settings.end_punctuations_pattern.split(word_text, maxsplit=1), ) ) if not has_punctuation: # Leave word as-is return if settings.keep_whitespace and (not end_punctuations): # Preserve trailing whitespace word_text = word_text + last_ws word_text_norm = settings.normalize_whitespace(word_text) if word_text: yield WordNode, { "text": word_text_norm, "text_with_ws": word_text, "implicit": True, "lang": word.lang, "voice": word.voice, "in_lexicon": self._is_word_in_lexicon(word_text_norm, settings), } last_punct_idx = len(end_punctuations) - 1 for punct_idx, punct_text in enumerate(reversed(end_punctuations)): if settings.keep_whitespace and (punct_idx == last_punct_idx): # Preserve trailing whitespace punct_text += last_ws yield PunctuationWordNode, { "text": punct_text.strip(), "text_with_ws": punct_text, "implicit": True, "lang": word.lang, "voice": word.voice, } def _split_major_breaks(self, graph: GraphType, node: Node): if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if word.interpret_as or word.in_lexicon: # Don't interpret words that are spoken for return settings = self.get_settings(word.lang) if settings.major_breaks_pattern is None: # No pattern set for this language return parts = settings.major_breaks_pattern.split(word.text_with_ws) if len(parts) < 2: return word_part = parts[0] break_part = parts[1] if word_part.strip(): # Only yield word if there's anything but whitespace word_part_norm = settings.normalize_whitespace(word_part) yield WordNode, { "text": word_part_norm, "text_with_ws": word_part, "implicit": True, "lang": word.lang, "voice": word.voice, "in_lexicon": self._is_word_in_lexicon(word_part_norm, settings), } else: # Keep leading whitespace break_part = word_part + break_part yield BreakWordNode, { "break_type": BreakType.MAJOR, "text": settings.normalize_whitespace(break_part), "text_with_ws": break_part, "implicit": True, "lang": word.lang, "voice": word.voice, } def _split_minor_breaks(self, graph: GraphType, node: Node): if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if word.interpret_as or word.in_lexicon: # Don't interpret words that are spoken for return settings = self.get_settings(word.lang) if settings.minor_breaks_pattern is None: # No pattern set for this language return parts = settings.minor_breaks_pattern.split(word.text_with_ws) if len(parts) < 2: return word_part = parts[0] if word_part.strip(): # Only yield word if there's anything but whitespace word_part_norm = settings.normalize_whitespace(word_part) yield WordNode, { "text": word_part_norm, "text_with_ws": word_part, "implicit": True, "lang": word.lang, "voice": word.voice, "in_lexicon": self._is_word_in_lexicon(word_part_norm, settings), } break_part = parts[1] yield BreakWordNode, { "break_type": BreakType.MINOR, "text": settings.normalize_whitespace(break_part), "text_with_ws": break_part, "implicit": True, "lang": word.lang, "voice": word.voice, } def _find_parent(self, graph, node, *classes): """Tries to find a node whose type is in classes in the tree above node""" parents = [] for parent_node in graph.predecessors(node.node): parent = graph.nodes[parent_node][DATA_PROP] if isinstance(parent, classes): return parent parents.append(parent) for parent in parents: match = self._find_parent(graph, parent, classes) if match is not None: return match return None # pylint: disable=no-self-use def _phonemes_for_break( self, break_type: typing.Union[str, BreakType], lang: typing.Optional[str] = None, ) -> typing.Optional[PHONEMES_TYPE]: if break_type == BreakType.MAJOR: return [IPA.BREAK_MAJOR.value] if break_type == BreakType.MINOR: return [IPA.BREAK_MINOR.value] return None # ------------------------------------------------------------------------- def _pipeline_tokenize( self, graph, parent_node, text, word_phonemes: typing.Optional[typing.List[typing.List[str]]] = None, scope_kwargs=None, in_inline_lexicon: typing.Optional[ typing.Callable[[str, typing.Optional[str]], bool] ] = None, ): """Splits text into word nodes""" if scope_kwargs is None: scope_kwargs = {} lang = self.default_lang if scope_kwargs is not None: lang = scope_kwargs.get("lang", lang) settings = self.get_settings(lang) assert settings is not None, f"No settings for {lang}" if settings.pre_process_text is not None: # Pre-process text text = settings.pre_process_text(text) # Split into separate words (preseving whitespace). for word_text in settings.split_words(text): word_text_norm = settings.normalize_whitespace(word_text) if not word_text_norm: continue if not settings.keep_whitespace: word_text = word_text_norm word_kwargs = scope_kwargs if word_phonemes: word_kwargs = {**scope_kwargs, "phonemes": word_phonemes.pop()} # Determine if word is in a lexicon. # If so, it will not be interpreted as an initialism, split apart, etc. in_lexicon: typing.Optional[bool] = None if in_inline_lexicon is not None: # Check inline first in_lexicon = in_inline_lexicon( word_text_norm, scope_kwargs.get("word_role") ) if not in_lexicon: # Check main language lexicon in_lexicon = self._is_word_in_lexicon(word_text_norm, settings) word_node = WordNode( node=len(graph), text=word_text_norm, text_with_ws=word_text, implicit=True, in_lexicon=in_lexicon, **word_kwargs, ) graph.add_node(word_node.node, data=word_node) graph.add_edge(parent_node.node, word_node.node) # ------------------------------------------------------------------------- # Pipeline Splits # ------------------------------------------------------------------------- def _split_spell_out(self, graph: GraphType, node: Node): """Expand spell-out (a-1 -> a dash one)""" if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if word.interpret_as != InterpretAs.SPELL_OUT: return settings = self.get_settings(word.lang) # Preserve whitespace first_ws, last_ws = settings.get_whitespace(word.text_with_ws) last_char_idx = len(word.text) - 1 for i, c in enumerate(word.text): # Look up in settings first ("." -> "dot") word_text = settings.spell_out_words.get(c) role = WordRole.DEFAULT if word_text is None: if c.isalpha(): # Assume this is a letter word_text = c role = WordRole.LETTER else: # Leave as is (expand later in pipeline if digit, etc.) word_text = c if not word_text: continue if settings.keep_whitespace: if i == 0: word_text = first_ws + word_text if i == last_char_idx: word_text += last_ws else: word_text += settings.join_str yield WordNode, { "text": settings.normalize_whitespace(word_text), "text_with_ws": word_text, "implicit": True, "lang": word.lang, "role": role, } def _split_replacements(self, graph: GraphType, node: Node): """Do regex replacements on word text""" if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if word.interpret_as or word.in_lexicon: # Don't interpret words that are spoken for return settings = self.get_settings(word.lang) if not settings.replacements: # No replacements return matched = False new_text = word.text_with_ws for pattern, template in settings.replacements: assert isinstance(pattern, REGEX_PATTERN) new_text, num_subs = pattern.subn(template, new_text) if num_subs > 0: matched = True if matched: # Tokenize new text (whitespace is preserved by regex) for part_text in settings.split_words(new_text): part_text_norm = settings.normalize_whitespace(part_text) if not settings.keep_whitespace: part_text = part_text_norm if not part_text_norm: # Ignore empty words continue yield WordNode, { "text": part_text_norm, "text_with_ws": part_text, "implicit": True, "lang": word.lang, "in_lexicon": self._is_word_in_lexicon(part_text_norm, settings), } def _split_abbreviations(self, graph: GraphType, node: Node): """Expand abbreviations""" if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if word.interpret_as or word.in_lexicon: # Don't interpret words that are spoken for return settings = self.get_settings(word.lang) if not settings.abbreviations: # No abbreviations return new_text: typing.Optional[str] = None for pattern, template in settings.abbreviations.items(): assert isinstance(pattern, REGEX_PATTERN), pattern match = pattern.match(word.text_with_ws) if match is not None: new_text = match.expand(template) break if new_text is not None: # Tokenize new text (whitespace should be preserved by regex) for part_text in settings.split_words(new_text): part_text_norm = settings.normalize_whitespace(part_text) if not part_text_norm: continue if not settings.keep_whitespace: part_text = part_text_norm yield WordNode, { "text": part_text_norm, "text_with_ws": part_text, "implicit": True, "lang": word.lang, "in_lexicon": self._is_word_in_lexicon(part_text_norm, settings), } def _split_initialism(self, graph: GraphType, node: Node): """Split apart ABC or A.B.C.""" if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if word.interpret_as or word.in_lexicon or (len(word.text) < 2): # Don't interpret words that are spoken for or are too short return settings = self.get_settings(word.lang) if (settings.is_initialism is None) or (settings.split_initialism is None): # Can't do anything without these functions return if not settings.is_initialism(word.text): # Not an initialism return first_ws, last_ws = settings.get_whitespace(word.text_with_ws) parts = settings.split_initialism(word.text) last_part_idx = len(parts) - 1 # Split according to language-specific function for part_idx, part_text in enumerate(parts): part_text_norm = settings.normalize_whitespace(part_text) if not part_text_norm: continue if settings.keep_whitespace: if part_idx == 0: part_text = first_ws + part_text if 0 <= part_idx < last_part_idx: part_text += settings.join_str elif part_idx == last_part_idx: part_text += last_ws yield WordNode, { "text": part_text_norm, "text_with_ws": part_text, "implicit": True, "lang": word.lang, "role": WordRole.LETTER, } def _split_ignore_non_words(self, graph: GraphType, node: Node): """Mark non-words as ignored""" if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if word.interpret_as or word.in_lexicon: # Don't interpret words that are spoken for return settings = self.get_settings(word.lang) if settings.is_non_word is None: # No function for this language return if settings.is_non_word(word.text): yield (IgnoreNode, {}) # ------------------------------------------------------------------------- # Pipeline Transformations # ------------------------------------------------------------------------- def _transform_number(self, graph: GraphType, node: Node) -> bool: if not isinstance(node, WordNode): return False word = typing.cast(WordNode, node) if (not word.is_maybe_number) or ( word.interpret_as and (word.interpret_as != InterpretAs.NUMBER) ): return False settings = self.get_settings(word.lang) assert settings.babel_locale if settings.get_ordinal is not None: # Try to parse as an ordinal (e.g., 1st -> 1) ordinal_num = settings.get_ordinal(word.text) if ordinal_num is not None: word.interpret_as = InterpretAs.NUMBER word.format = InterpretAsFormat.NUMBER_ORDINAL word.number = Decimal(ordinal_num) return False try: # Try to parse as a number # This is important to handle thousand/decimal separators correctly. number = babel.numbers.parse_decimal( word.text, locale=settings.babel_locale ) if not number.is_finite(): raise ValueError("Not parsing nan or inf") word.interpret_as = InterpretAs.NUMBER if not word.format: # Retain ordinal, etc. word.format = InterpretAsFormat.NUMBER_CARDINAL word.number = number if (1000 < number < 3000) and (re.match(r"^\d+$", word.text) is not None): # Interpret numbers in this range as years by default, but only # if the text was entirely digits. # # So "2020" will become "twenty twenty", but "2,020" will become # "two thousand and twenty". word.format = InterpretAsFormat.NUMBER_YEAR except ValueError: # Probably not a number word.is_maybe_number = False return True def _transform_currency(self, graph: GraphType, node: Node,) -> bool: if not isinstance(node, WordNode): return False word = typing.cast(WordNode, node) if (not word.is_maybe_currency) or ( word.interpret_as and (word.interpret_as != InterpretAs.CURRENCY) ): return False settings = self.get_settings(word.lang) if (settings.is_maybe_currency is not None) and ( not settings.is_maybe_currency(word.text) ): # Probably not currency word.is_maybe_currency = False return False assert settings.babel_locale # Try to parse with known currency symbols parsed = False for currency_symbol in settings.currency_symbols: if word.text.startswith(currency_symbol): num_str = word.text[len(currency_symbol) :] try: # Try to parse as a number # This is important to handle thousand/decimal separators correctly. number = babel.numbers.parse_decimal( num_str, locale=settings.babel_locale ) word.interpret_as = InterpretAs.CURRENCY word.currency_symbol = currency_symbol word.number = number parsed = True break except ValueError: pass # If this *must* be a currency value, use the default currency if (not parsed) and (word.interpret_as == InterpretAs.CURRENCY): default_currency = settings.default_currency if default_currency: # Forced interpretation using default currency try: number = babel.numbers.parse_decimal( word.text, locale=settings.babel_locale ) word.interpret_as = InterpretAs.CURRENCY word.currency_name = default_currency word.number = number except ValueError: pass return True def _transform_date(self, graph: GraphType, node: Node): if not isinstance(node, WordNode): return False word = typing.cast(WordNode, node) if (not word.is_maybe_date) or ( word.interpret_as and (word.interpret_as != InterpretAs.DATE) ): return False settings = self.get_settings(word.lang) try: if (settings.is_maybe_date is not None) and not settings.is_maybe_date( word.text ): # Probably not a date word.is_maybe_date = False return False assert settings.dateparser_lang dateparser_kwargs: typing.Dict[str, typing.Any] = { "settings": {"STRICT_PARSING": True}, "languages": [settings.dateparser_lang], } date = dateparser.parse(word.text, **dateparser_kwargs) if date is not None: word.interpret_as = InterpretAs.DATE word.date = date elif word.interpret_as == InterpretAs.DATE: # Try again without strict parsing dateparser_kwargs["settings"]["STRICT_PARSING"] = False date = dateparser.parse(word.text, **dateparser_kwargs) if date is not None: word.date = date except Exception: _LOGGER.exception("transform_date") # Not a date word.is_maybe_date = False return False return True def _transform_time(self, graph: GraphType, node: Node): if not isinstance(node, WordNode): return False word = typing.cast(WordNode, node) if (not word.is_maybe_time) or ( word.interpret_as and (word.interpret_as != InterpretAs.TIME) ): return False settings = self.get_settings(word.lang) if settings.parse_time is None: # Can't parse a time anyways return False try: if (settings.is_maybe_time is not None) and not settings.is_maybe_time( word.text ): # Probably not a time word.is_maybe_time = False return False time = settings.parse_time(word.text) if time is not None: word.interpret_as = InterpretAs.TIME word.time = time except Exception: _LOGGER.exception("transform_time") # Not a time word.is_maybe_time = False return False return True def _is_word_in_lexicon( self, word: str, settings: TextProcessorSettings ) -> typing.Optional[bool]: """True if word is in the lexicon""" if settings.lookup_phonemes is None: return None return bool(settings.lookup_phonemes(word, do_transforms=False)) # ------------------------------------------------------------------------- # Verbalization # ------------------------------------------------------------------------- def _verbalize_number(self, graph: GraphType, node: Node): """Split numbers into words""" if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if (word.interpret_as != InterpretAs.NUMBER) or (word.number is None): return settings = self.get_settings(word.lang) if (settings.is_maybe_number is not None) and not settings.is_maybe_number( word.text ): # Probably not a number return assert settings.num2words_lang num2words_kwargs = {"lang": settings.num2words_lang} decimal_nums = [word.number] if word.format == InterpretAsFormat.NUMBER_CARDINAL: num2words_kwargs["to"] = "cardinal" elif word.format == InterpretAsFormat.NUMBER_ORDINAL: num2words_kwargs["to"] = "ordinal" elif word.format == InterpretAsFormat.NUMBER_YEAR: num2words_kwargs["to"] = "year" elif word.format == InterpretAsFormat.NUMBER_DIGITS: num2words_kwargs["to"] = "cardinal" decimal_nums = [Decimal(d) for d in str(word.number.to_integral_value())] for decimal_num in decimal_nums: num_has_frac = (decimal_num % 1) != 0 # num2words uses the number as an index sometimes, so it *has* to be # an integer, unless we're doing currency. if num_has_frac: final_num = float(decimal_num) else: final_num = int(decimal_num) try: # Convert to words (e.g., 100 -> one hundred) num_str = num2words(final_num, **num2words_kwargs) except NotImplementedError: _LOGGER.exception( "Failed to convert number %s to words for language %s", word.text, word.lang, ) return # Add original whitespace back in first_ws, last_ws = settings.get_whitespace(word.text_with_ws) num_str = first_ws + num_str + last_ws # Split into separate words for number_word_text in settings.split_words(num_str): number_word_text_norm = settings.normalize_whitespace(number_word_text) if not number_word_text_norm: continue if not settings.keep_whitespace: number_word_text = number_word_text_norm number_word = WordNode( node=len(graph), implicit=True, lang=word.lang, text=number_word_text_norm, text_with_ws=number_word_text, ) graph.add_node(number_word.node, data=number_word) graph.add_edge(word.node, number_word.node) def _verbalize_date(self, graph: GraphType, node: Node): """Split dates into words""" if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if (word.interpret_as != InterpretAs.DATE) or (word.date is None): return settings = self.get_settings(word.lang) assert settings.babel_locale assert settings.num2words_lang date = word.date date_format = word.format or settings.default_date_format if "{" not in date_format: # Transform into Python format string date_format = date_format.strip().upper() # MDY -> {M} {D} {Y} date_format_str = settings.join_str.join(f"{{{c}}}" for c in date_format) else: # Assumed to be a Python format string already date_format_str = date_format day_card_str = "" day_ord_str = "" month_str = "" year_str = "" try: if ("{M}" in date_format_str) or ("{m}" in date_format_str): month_str = babel.dates.format_date( date, "MMMM", locale=settings.babel_locale ) num2words_kwargs = {"lang": settings.num2words_lang} if ("{D}" in date_format_str) or ("{d}" in date_format_str): # Cardinal day (1 -> one) num2words_kwargs["to"] = "cardinal" day_card_str = num2words(date.day, **num2words_kwargs) if ("{O}" in date_format_str) or ("{o}" in date_format_str): # Ordinal day (1 -> first) num2words_kwargs["to"] = "ordinal" day_ord_str = num2words(date.day, **num2words_kwargs) if ("{Y}" in date_format_str) or ("{y}" in date_format_str): try: num2words_kwargs["to"] = "year" year_str = num2words(date.year, **num2words_kwargs) except Exception: # Fall back to use cardinal number for year num2words_kwargs["to"] = "cardinal" year_str = num2words(date.year, **num2words_kwargs) except Exception: _LOGGER.exception( "Failed to format date %s for language %s", word.text, word.lang ) return date_str = date_format_str.format( **{ "M": month_str, "m": month_str, "D": day_card_str, "d": day_card_str, "O": day_ord_str, "o": day_ord_str, "Y": year_str, "y": year_str, } ) first_ws, last_ws = settings.get_whitespace(word.text_with_ws) date_str = first_ws + date_str + last_ws # Split into separate words for date_word_text in settings.split_words(date_str): date_word_text_norm = settings.normalize_whitespace(date_word_text) if not date_word_text_norm: continue if not settings.keep_whitespace: date_word_text = date_word_text_norm if not date_word_text: continue date_word = WordNode( node=len(graph), implicit=True, lang=word.lang, text=date_word_text_norm, text_with_ws=date_word_text, ) graph.add_node(date_word.node, data=date_word) graph.add_edge(word.node, date_word.node) def _verbalize_time(self, graph: GraphType, node: Node): """Split times into words""" if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if (word.interpret_as != InterpretAs.TIME) or (word.time is None): return settings = self.get_settings(word.lang) if settings.verbalize_time is None: # Can't verbalize return first_ws, last_ws = settings.get_whitespace(word.text_with_ws) time_words = list(settings.verbalize_time(word.time)) last_idx = len(time_words) - 1 # Split into words for word_idx, time_word_text in enumerate(time_words): if word_idx == 0: time_word_text = first_ws + time_word_text if word_idx == last_idx: time_word_text += last_ws else: time_word_text += settings.join_str time_word_text_norm = settings.normalize_whitespace(time_word_text) if not time_word_text_norm: continue if not settings.keep_whitespace: time_word_text = time_word_text_norm if not time_word_text: continue time_word = WordNode( node=len(graph), implicit=True, lang=word.lang, text=time_word_text_norm, text_with_ws=time_word_text, ) graph.add_node(time_word.node, data=time_word) graph.add_edge(word.node, time_word.node) # May contain numbers or initialisms self._transform_number(graph, time_word) for node_class, node_kwargs in self._split_initialism(graph, time_word): new_node = node_class(node=len(graph), **node_kwargs) graph.add_node(new_node.node, data=new_node) graph.add_edge(time_word.node, new_node.node) def _verbalize_currency( self, graph: GraphType, node: Node, ): """Split currency amounts into words""" if not isinstance(node, WordNode): return word = typing.cast(WordNode, node) if ( (word.interpret_as != InterpretAs.CURRENCY) or ((word.currency_symbol is None) and (word.currency_name is None)) or (word.number is None) ): return settings = self.get_settings(word.lang) assert settings.num2words_lang decimal_num = word.number # True if number has non-zero fractional part num_has_frac = (decimal_num % 1) != 0 num2words_kwargs = {"lang": settings.num2words_lang, "to": "currency"} # Name of currency (e.g., USD) if not word.currency_name: currency_name = settings.default_currency if settings.currencies: # Look up currency in locale currency_name = settings.currencies.get( word.currency_symbol or "", settings.default_currency ) word.currency_name = currency_name num2words_kwargs["currency"] = word.currency_name # Custom separator so we can remove 'zero cents' num2words_kwargs["separator"] = "|" try: num_str = num2words(float(decimal_num), **num2words_kwargs) except Exception: _LOGGER.exception( "Failed to verbalize currency %s for language %s", word, word.lang ) return # Post-process currency words if num_has_frac: # Discard num2words separator num_str = num_str.replace("|", "") else: # Remove 'zero cents' part num_str = num_str.split("|", maxsplit=1)[0] # Add original whitespace back in first_ws, last_ws = settings.get_whitespace(word.text_with_ws) num_str = first_ws + num_str + last_ws # Split into separate words for currency_word_text in settings.split_words(num_str): currency_word_text_norm = settings.normalize_whitespace(currency_word_text) if not currency_word_text_norm: continue if not settings.keep_whitespace: currency_word_text = currency_word_text_norm currency_word = WordNode( node=len(graph), implicit=True, lang=word.lang, text=currency_word_text_norm, text_with_ws=currency_word_text, ) graph.add_node(currency_word.node, data=currency_word) graph.add_edge(word.node, currency_word.node)