# -*- coding: utf-8 -*-
# This code is a part of BELA package: https://github.com/letuananh/bela
# :developer: Le Tuan Anh <tuananh.ke@gmail.com>
# :license: MIT, see LICENSE for more details.
"""
BELA version 2.x convention
"""
import logging
import re
import types
from collections import deque
from chirptext import DataObject
from speach import elan
from .common import TIER_PARSER
from .common import tokenize
from .common import LanguageMix
from .common import _find_invalid_characters
def getLogger():
return logging.getLogger(__name__)
SPECIAL_TIERS = ['ActivityMarkers']
SPECIAL_SPEAKER_NAME = 'Transcriber'
SPECIAL_SPEAKER = ':transcriber:'
DEFAULT_TURN_THRESHOLD = 1500
PTN_ANSI_CHUNK = re.compile(r"^\s*(([A-Za-z\'\:\?#\=_\.\-~\+@]+|\:v\:l\:iso\d{3}_\d_[a-z]{3})\s*)+\s*$")
ANSI_CHECK_LANGUAGES = ('English', 'Vocal Sounds', 'Malay', 'Red Dot', ':v:airstream', ':v:crying', ':v:vocalizations')
def parse_tier_name(name):
''' Parse BELA tier name convention '''
m = TIER_PARSER.match(name)
if m:
return m.group('person'), m.group('tier')
else:
return None, None
def time_map(tier):
_time_map = {}
for u in tier:
_time_map[(u.from_ts, u.to_ts)] = u
if u.from_ts is not None and u.from_ts.value is not None \
and u.to_ts is not None and u.to_ts.value is not None:
_time_map[(u.from_ts.value, u.to_ts.value)] = u
return _time_map
def _enforce_ansi_alphabets(cu, languages=ANSI_CHECK_LANGUAGES):
''' [Internal] Ensure that chunks of certain languages (e.g. English, Malay, etc.)
only contains English alphabets & BELA's special characters
End users should NOT use this function.
It is not a part of BELA's standard APIs and may be removed in the future.
'''
if cu.language in languages:
if cu.text:
parts = cu.text.split()
for p in parts:
m = PTN_ANSI_CHUNK.match(p)
if m is None:
cu.warnings.append(f"Unusual character was found in chunk '{cu.text}' ('{cu.language}')")
return
def _validate_baby_language(cu, person_name='unknown_participant'):
''' [Internal] Validate baby language annotations
Show warnings when babies' chunks are tagged with adult languages
End users should NOT use this function.
It is not a part of BELA's standard APIs and may be removed in the future.
'''
if cu.text.strip() == '###':
_name = person_name.strip()
_lang = cu.language.strip()
if _name.startswith('Baby'):
if _lang not in (':v:vocalizations', ':v:airstream', ':v:crying', ':v:laughter'):
cu.errors.append(f"'###' is tagged as '{cu.language}' for {_name} (should be in (':v:vocalizations', ':v:airstream', ':v:crying', ':v:laughter'))")
elif _name.startswith('Sibling'):
if _lang not in (':v:vocalizations', ':v:airstream', ':v:crying', ':v:laughter'):
if cu.warnings is None:
cu.warnings = []
cu.warnings.append(f"'###' is tagged as '{cu.language}' for {_name} (should be in (':v:vocalizations', ':v:airstream', ':v:crying', ':v:laughter'))")
elif _lang != 'Vocal Sounds':
cu.errors.append(f"'###' is tagged as '{cu.language}' for {_name} (should be 'Vocal Sounds' for non-Baby participants)")
def _utterance_tokenize(self, *args, **kwargs):
""" [Internal] Tokenize an utterance or a chunk
End users should NOT use this function.
It is not a part of BELA's standard APIs and may be removed in the future.
"""
return tokenize(self.value, *args, **kwargs)
def _map_children(parent_tier, child_tier, errors=None, tier_class='chunks', is_many=True):
''' map all children chunks into parent's utterances '''
_parents = sorted(parent_tier, key=lambda x: x.from_ts)
_children = deque(sorted(child_tier, key=lambda x: x.from_ts))
for ann in _parents:
while _children:
ann_child = _children[0]
if ann_child.from_ts >= ann.from_ts and ann_child.to_ts <= ann.to_ts:
if is_many:
if not getattr(ann, tier_class):
setattr(ann, tier_class, [])
getattr(ann, tier_class).append(ann_child)
else:
if getattr(ann, tier_class) is None:
setattr(ann, tier_class, ann_child)
else:
getLogger().warning("Conflicting child annotations were found")
if errors is not None:
errors.append("Conflicting child annotations were found")
_children.popleft()
getLogger().debug(f"linked: {repr(ann)} -- {repr(ann_child)}")
elif ann.from_ts > ann_child.to_ts:
getLogger().warning(f"DISCARDED -- {repr(ann_child)}")
if errors is not None:
errors.append(f"Orphaned annotation found -- (#{ann_child.ID}) {repr(ann_child)}")
_children.popleft()
else:
getLogger().debug(f"no relation: {repr(ann)} -- {repr(ann_child)}")
break
if _children:
for child in _children:
getLogger().debug(f"Orphaned annotation found: {child}")
class Person(DataObject):
def __init__(self, name, code=None, utterances=None, tiers=[], belav2=None, **kwargs):
super().__init__(name=name, code=code, utterances=utterances, **kwargs)
self.belav2 = belav2
if utterances is None:
self.utterances = elan.Tier()
self.__tier_map = {}
self.__tiers = list(tiers) if tiers else []
@property
def tiers(self):
return self.__tiers
@property
def tier_classes(self):
return list(self.__tier_map.keys())
def __contains__(self, key):
return key in self.__tier_map
def __getitem__(self, key):
return self.__tier_map[key] if key in self.__tier_map else None
def __iter__(self):
return iter(self.tiers)
def add_tier(self, tier):
if tier in self.tiers:
return
if not tier.tier_class:
if self.belav2 is not None:
self.belav2.errors.append(f"Tier [{tier.ID}] does not have a tier_class")
else:
getLogger().warning(f"Tier [{tier.ID}] does not have a tier_class")
elif tier.tier_class in self.__tier_map:
if self.belav2 is not None:
self.belav2.errors.append(f"User [{self.code}] has more than one [class={tier.tier_class}] tier")
else:
getLogger().warning(f"User [{self.code}] has more than one [class={tier.tier_class}] tier")
else:
self.__tier_map[tier.tier_class] = tier
self.tiers.append(tier)
def __str__(self):
return f"Person(name={repr(self.name)}, code={repr(self.code)})"
[docs]class Bela2(DataObject):
''' BELA-convention version 2
'''
def __init__(self, elan, path=":memory:", allow_empty=False,
nlp_tokenizer=False, word_only=True, ellipsis=True,
validate_baby_languages=False,
ansi_languages=ANSI_CHECK_LANGUAGES,
auto_tokenize=True,
split_punc=True, remove_punc=True, **kwargs):
''' Create a new Bela2 object from an :class:`speach.elan.ELANDoc` object
:param: elan: An ELANDoc object
:type: elan: speach.elan.ELANDoc
:returns: a Bela2 object
:rtype: bela.Bela2
'''
super().__init__(elan=elan, path=path, errors=[], warnings=[],
validate_baby_languages=validate_baby_languages,
ansi_languages=ansi_languages,
auto_tokenize=auto_tokenize,
allow_empty=allow_empty, **kwargs)
self.__person_map = {}
# create special speaker map (i.e. Transcriber)
self.__person_map[SPECIAL_SPEAKER] = Person(name=SPECIAL_SPEAKER_NAME, code=SPECIAL_SPEAKER, belav2=self)
self.__persons = None
self.__participant_codes = None
self.__word_only = word_only
self.__nlp_tokenizer = nlp_tokenizer
self.__ellipsis = ellipsis
self.__split_punc = split_punc
self.__remove_punc = remove_punc
if elan is not None:
self.parse_names()
self._init_tier_map()
if self.auto_tokenize:
self.tokenize()
def tiers(self):
return self.elan.tiers()
@property
def roots(self):
''' Direct access to all underlying ELAN root tiers '''
return self.elan.roots
@property
def media_file(self):
return self.elan.media_file
@property
def media_url(self):
return self.elan.media_url
@property
def relative_media_url(self):
return self.elan.relative_media_url
@property
def annotation(self, annID):
''' Get an annotation object by ID '''
return self.elan.annotation(annID)
@property
def person_map(self):
''' Map participant (i.e. person code) to person object '''
return self.__person_map
@property
def persons(self):
''' All Person objects in this BELA object '''
if self.__persons is None:
self.__persons = tuple(self.__person_map.values())
return self.__persons
@property
def participant_codes(self):
''' Immutable list of participant codes '''
if self.__participant_codes is None:
# TODO: Make this thread safe?
self.__participant_codes = tuple(i for i in self.person_map.keys())
return self.__participant_code
@property
def word_only(self):
return self.__word_only
def count_sents(self):
utt_count = 0
for tier in self.elan:
if tier.tier_class == "Utterance":
utt_count += len(tier)
return utt_count
def count_chunks(self):
chunk_count = 0
for tier in self.elan:
if tier.tier_class == "Chunk":
chunk_count += len(tier)
return chunk_count
def get_language_set(self):
languages = set()
for tier in self.elan:
if tier.tier_class == "Language":
for ann in tier:
languages.add(ann.value)
return set(languages)
[docs] def parse_name(self, tier):
''' (Internal) Parse participant name and tier type from a tier object and then update the tier object
This function is internal and should not be used outside of this class.
:param tier: The tier object to parse
:type tier: speach.elan.ELANTier
'''
if tier.ID in SPECIAL_TIERS:
tier.tier_class = tier.ID
tier.speaker_name = SPECIAL_SPEAKER
else:
speaker_name, tier_class = parse_tier_name(tier.ID)
if speaker_name and tier_class:
tier.tier_class = tier_class
tier.speaker_name = speaker_name
else:
self.errors.append(f"Invalid tier name: {tier.ID}")
def _init_tier_map(self):
''' Construct BELA info structure from EAF '''
# init roots first
for tier in self.roots:
if tier.tier_class == 'Utterance':
if not tier.participant:
self.errors.append(f"Tier [{tier.ID}] does not have participant code")
elif tier.participant in self.__person_map:
self.errors.append(f"Person [{tier.participant}] has more than one utterance tier")
else:
person = Person(tier.speaker_name, code=tier.participant, utterances=tier, belav2=self)
self.__person_map[tier.participant] = person
elif tier.ID not in SPECIAL_TIERS:
self.errors.append(f"Unknown root tier: {tier.ID}")
# init other tiers
for tier in self.elan:
# verify timestamps
if tier.time_alignable:
for ann in tier:
if ann.from_ts is None or ann.from_ts.value is None or ann.to_ts is None or ann.to_ts.value is None:
self.errors.append(f"Annotation with corrupted timestamp: {ann.value} (Timestamp: {ann.from_ts} -- {ann.to_ts}) | Tier: {tier.ID}")
if ann.errors is None:
ann.errors = []
ann.errors.append(f"Corrupted timestamp: {ann.value} | (Timestamp: {ann.from_ts} -- {ann.to_ts})")
if ann.from_ts is not None or (ann.from_ts.value is None and ann.to_ts is not None):
ann.from_ts.value = ann.to_ts.value
ann.errors.append("Assumed from_ts value from to_ts")
if ann.to_ts is not None and ann.to_ts.value is None and ann.from_ts is not None:
ann.to_ts.value = ann.from_ts.value
ann.errors.append("Assumed to_ts value from from_ts")
if tier.ID in SPECIAL_TIERS:
self.__person_map[SPECIAL_SPEAKER].tiers.append(tier)
elif tier.participant not in self.__person_map:
self.errors.append(f"Unknown person code [{tier.participant}] used in tier [{tier.ID}]")
else:
self.__person_map[tier.participant].add_tier(tier)
# link languages if available
for person in self.persons:
if person.utterances:
for u in person.utterances:
u.person = person
if person['Chunk']:
_map_children(person.utterances, person['Chunk'], errors=self.errors, tier_class='chunks')
else:
self.errors.append(f"Person {person.name} ({person.code}) does not have a chunk tier")
if not person['Language']:
self.errors.append(f"Person {person.name} ({person.code}) does not have a language tier")
if 'Translation' in person:
translations = person['Translation']
_translation_map = {u.ref: u.value for u in translations}
for u in person.utterances:
if u in _translation_map:
if not u.translation:
u.translation = _translation_map[u]
else:
self.errors.append(f"Conflicted translation for [{person}] Time: [{u.from_ts} -- {u.to_ts}]")
else:
self.errors.append(f"Person {person.name} ({person.code}) does not have a translation tier")
if person['Chunk'] and person['Language']:
lang_tier_time_map = time_map(person['Language'])
linked_language_annotations = set(person['Language'])
for cu in person['Chunk']:
key = (cu.from_ts, cu.to_ts)
if key in lang_tier_time_map:
cu.language = lang_tier_time_map[key].value
linked_language_annotations.remove(lang_tier_time_map[key])
elif cu.from_ts is not None and cu.from_ts.value is not None \
and cu.to_ts is not None and cu.to_ts.value is not None:
# [2021-09-07 火 14:32]
# [TA] try to map using timestamp values if possible
key = (cu.from_ts.value, cu.to_ts.value)
if key in lang_tier_time_map:
cu.language = lang_tier_time_map[key].value
linked_language_annotations.remove(lang_tier_time_map[key])
if linked_language_annotations:
for ann in linked_language_annotations:
self.errors.append(f"Orphaned language annotation could not be linked: {ann} [{ann.from_ts} -- {ann.to_ts}]")
# validate text from utterance tier and chunk tier
if person.utterances:
for u in person.utterances:
# bind tokenize() function to utterances
u.tokenize = types.MethodType(_utterance_tokenize, u)
if u.errors is None:
u.errors = []
if u.warnings is None:
u.warnings = []
if not u.text.strip():
if not self.allow_empty:
u.errors.append(f"Empty annotation '' found at [{u.from_ts} :: {u.to_ts}]")
else:
u.warnings.append(f"Empty annotation '' found at [{u.from_ts} :: {u.to_ts}]")
if u.chunks:
for cu in u.chunks:
# bind tokenize() function to chunks
cu.tokenize = types.MethodType(_utterance_tokenize, cu)
if cu.errors is None:
cu.errors = []
if cu.warnings is None:
cu.warnings = []
if self.ansi_languages:
_enforce_ansi_alphabets(cu, languages=self.ansi_languages)
if self.validate_baby_languages:
_validate_baby_language(cu, person.name)
if not cu.text.strip():
if u.text or not self.allow_empty:
u.errors.append(f"Empty chunk annotation '' found at [{cu.from_ts} :: {cu.to_ts}]")
else:
u.warnings.append(f"Empty chunk annotation '' found at [{cu.from_ts} :: {cu.to_ts}]")
if cu.language is None or not cu.language.strip():
if not self.allow_empty or cu.text.strip() or u.text.strip():
u.errors.append(f"Language tag not found in the chunk `{cu.text.strip()}` [{cu.from_ts} :: {cu.to_ts}]")
else:
u.warnings.append(f"Language tag not found in the chunk `{cu.text.strip()}` [{cu.from_ts} :: {cu.to_ts}]")
elif "#!" in cu.language:
u.errors.append(f"Unsure language tag ({cu.language}) was used for chunk `{cu.text.strip()}` [{cu.from_ts} :: {cu.to_ts}]")
else:
u.chunks = [] # [2022-03-16 水 11:38][TA] Make sure that chunks is not None
u_value = u.text.replace(' ', '')
_chunks = u.chunks if u.chunks else []
c_value = ''.join(x.text for x in _chunks)
c_value = c_value.replace(' ', '')
if u_value != c_value:
_chunk_texts = ' '.join(x.text.strip() for x in _chunks)
# logging.getLogger(__name__).info(f"mismatch:\n + u_value: {repr(u_value)}\n + c_value: {repr(c_value)}")
u.errors.append(f"Utterance text and chunks are mismatched ({repr(u.text)} != {repr(_chunk_texts)})")
def parse_names(self):
for tier in self.elan:
self.parse_name(tier)
[docs] def tokenize(self):
''' tokenize all utterances '''
for tier in self.elan:
if tier.tier_class in ('Utterance', 'Chunk'):
for ann in tier:
ann_errors = [] if ann.errors is None else ann.errors
ann.words = tokenize(ann.value, language=ann.language,
errors=ann_errors,
ellipsis=self.__ellipsis,
nlp_tokenizer=self.__nlp_tokenizer,
split_punc=self.__split_punc,
remove_punc=self.__remove_punc,
word_only=self.__word_only)
_invalid_chars = _find_invalid_characters(ann.value, language=ann.language)
if _invalid_chars:
if ann.language:
ann_errors.append(f"Invalid characters, new line, or tab found ({repr(_invalid_chars)}) (language: {ann.language})")
else:
ann_errors.append(f"Invalid characters, new line, or tab found ({repr(_invalid_chars)})")
ann.errors = ann_errors
pass
[docs] def find_turns(self, threshold=DEFAULT_TURN_THRESHOLD):
''' Find potential turn-takings
:param threshold: Delay between utterances in milliseconds
:type threshold: float
:return: List of utterance pairs (2-tuple) (from utterance, to utterance object)
'''
_utterances = []
for person_code, person in self.person_map.items():
if person.utterances:
for u in person.utterances:
u.person = person
_utterances.append(u)
_utterances.sort(key=lambda x: x.from_ts)
# is_turn = False
_turns = []
for idx, u in enumerate(_utterances):
if idx == len(_utterances) - 1:
continue
next_u = _utterances[idx + 1]
if u.to_ts is None or u.to_ts.value is None or \
next_u.from_ts is None or next_u.from_ts.value is None:
continue
delta_t = u.to_ts - next_u.from_ts
if next_u.person != u.person and abs(delta_t) <= threshold:
_turns.append((u, next_u))
# is_turn = True
return _turns
[docs] def to_language_mix(self, to_ts=None, auto_compute=True):
''' Collapse utterances to generate a language mix timeline '''
langmix = LanguageMix()
for person in self.persons:
if not person.utterances:
continue
getLogger().debug(f"{person.name} -- {person.code}")
for u in person.utterances:
if u.chunks:
for c in u.chunks:
if c.from_ts and c.to_ts:
if to_ts is not None and c.to_ts > to_ts:
continue
else:
langmix.add(c)
return langmix.compute() if auto_compute else langmix
def save(self, *args, **kwargs):
return self.elan.save(*args, **kwargs)
[docs] @staticmethod
def read_eaf(eaf_path, **kwargs):
''' Read an EAF file as a Bela2 object
:param eaf_path: Path to the EAF file
:type eaf_path: str-like object or a Path object
:returns: A Bela2 object
:rtype: bela.Bela2
'''
return Bela2(elan.read_eaf(eaf_path), path=eaf_path, **kwargs)
[docs] @staticmethod
def from_elan(elan, eaf_path=":memory:", **kwargs):
''' Create a BELA-con version 2.x object from a :class:`speach.elan.ELANDoc` object '''
return Bela2(elan, path=eaf_path, **kwargs)
read_eaf = Bela2.read_eaf
from_elan = Bela2.from_elan