# -*- coding: utf-8 -*-
# This code is a part of BELA package: https://github.com/letuananh/bela
# :developer: Le Tuan Anh <tuananh.ke@gmail.com>
# :license: MIT, see LICENSE for more details.
"""
Lexical Analyser
"""
import logging
from pathlib import Path
from collections import defaultdict as dd
from collections import Counter
from chirptext import chio
from chirptext import ttl
from .common import tokenize
from .common import NLTK_AVAILABLE
from .common import _process_token, InvalidTokenException
def read_lexicon(lex_name):
p = Path(__file__).parent / 'data' / lex_name
forms = set()
with chio.open(p) as _lex_stream:
for line in _lex_stream:
if line.startswith("#"):
continue
else:
forms.add(line.strip())
return forms
# TODO add all BELA special keywords
_KEYWORDS = ['babyname', 'adultname', 'siblingname', 'strangername',
'...', 'english', 'chinese', 'malay', 'tamil']
if NLTK_AVAILABLE:
from nltk.corpus import words # English words
from nltk.corpus import stopwords
_ENGLISH_WORDS = set(words.words())
_ENGLISH_WORDS.update(stopwords.words('english'))
else:
_ENGLISH_WORDS = set()
_ENGLISH_WORDS.update(("'s", "ok", "'m", "'ll", "n't", "okay", "'re", "'d", "'ve"))
_MANDARIN_WORDS = read_lexicon('cmn_lex.txt.gz')
_MALAY_WORDS = read_lexicon('msa_lex.txt.gz')
# [2021-03-11 木 11:48]
# Adopted from https://github.com/letuananh/lelesk/blob/master/lelesk/util.py
# LeLESK: MIT License
if NLTK_AVAILABLE:
# from nltk.tokenize import word_tokenize
from nltk import pos_tag
from nltk.stem import WordNetLemmatizer
wnl = WordNetLemmatizer()
def ptpos_to_wn(ptpos, default='x'):
''' Penn Treebank Project POS to WN '''
if ptpos.startswith('JJ'):
return 'a'
elif ptpos.startswith('NN'):
return 'n'
elif ptpos.startswith('RB'):
return 'r'
elif ptpos.startswith('VB'):
return 'v'
else:
return default
def _tokenize(words):
if NLTK_AVAILABLE:
tags = pos_tag(words)
tokens = [(w, t, wnl.lemmatize(w, pos=ptpos_to_wn(t, default='n'))) for w, t in tags]
return tokens
else:
return words
class LexicalAnalyser:
def __init__(self, lang_lex_map=None, word_only=False, ellipsis=False, non_word='', lemmatizer=True, **kwargs):
self.utterances = ttl.Document()
self.word_sent_map = dd(set)
self.lang_word_sent_map = dd(lambda: dd(set))
self.lang_word_speaker_map = dd(lambda: dd(set))
self.word_only = word_only
self.ellipsis = ellipsis
self.non_word = non_word
self.lemmatizer = lemmatizer
# setup built-in language-lexicon map
self.lang_lex_map = {
'English': _ENGLISH_WORDS,
'Mandarin': _MANDARIN_WORDS,
'Malay': _MALAY_WORDS
}
# allow custom language_map
self.__custom_lang_lex_map = lang_lex_map if lang_lex_map else {}
self.word_speaker_map = dd(set)
self.word_map = dd(Counter)
def analyse(self, external_tokenizer=True):
self.word_sent_map.clear()
self.word_map.clear()
for utterance in self.utterances:
language = utterance.tag.language.value
speaker = utterance.tag.speaker.value
# source = utterance.tag.source.value
tokens = [t.lower() for t in tokenize(
utterance.text, language=language,
ellipsis=self.ellipsis, non_word=self.non_word,
word_only=self.word_only, nlp_tokenizer=external_tokenizer)]
self.word_map[language].update(tokens)
for token in tokens:
self.word_speaker_map[token] = speaker
self.word_sent_map[token].add(utterance.text)
self.lang_word_speaker_map[language][token].add(speaker)
self.lang_word_sent_map[language][token].add(utterance.text)
def gen_type_token_map(self):
ratio_map = {}
for lang, counter in self.word_map.items():
count_token = len(list(counter.elements()))
count_type = len(counter)
ratio = count_token / count_type if count_type > 0 else 0
ratio_map[lang] = (count_token, count_type, ratio)
return ratio_map
def gen_type_token_list(self):
_list = [(lang, count_token, count_type, ratio) for lang, (count_token, count_type, ratio) in self.gen_type_token_map().items()]
_list.sort(key=lambda x: -x[3])
return _list
def add(self, text, language, **kwargs):
sent = self.utterances.sents.new(text)
sent.tag.language = language
for k, v in kwargs.items():
sent.tag[k] = v
def is_special_token(self, word, language):
''' Determine if a given token is a special token (keywords, markup, etc.) '''
return word == '###' or word.startswith(':')
def is_unknown(self, word, language):
'''Check if a word is a known word (exists in the current lexicon)'''
if word in _KEYWORDS:
return False
elif language in self.lang_lex_map:
if word in self.lang_lex_map[language]:
return False
elif language not in self.__custom_lang_lex_map:
return True
else:
return word not in self.__custom_lang_lex_map[language]
elif language in self.__custom_lang_lex_map:
return word not in self.__custom_lang_lex_map[language]
else:
return False
def to_dict(self, ignore_empty=True):
stats_dict = {'languages': [], 'lexicon': [], 'errors': []}
__lemmatize_error = False
for lang, count_token, count_type, ratio in self.gen_type_token_list():
if ignore_empty and not count_type and not count_token and not ratio:
continue
stats_dict['languages'].append({
'language': lang,
'types': count_type,
'tokens': count_token,
'ratio': round(ratio, 2)
})
for lang, counter in self.word_map.items():
lang_lexicon = {'language': lang, 'vocabs': []}
for word, freq in counter.most_common():
_is_special = self.is_special_token(word, lang)
if _is_special:
try:
_process_token(word)
_is_unknown = False
except InvalidTokenException:
_is_unknown = True
else:
lemma = word
# try to lemmatize if possible
if NLTK_AVAILABLE and self.lemmatizer and not __lemmatize_error and lang == 'English':
try:
__, tag = pos_tag([word])[0]
lemma = wnl.lemmatize(word, pos=ptpos_to_wn(tag, default='n'))
except Exception as e:
# logging.getLogger(__name__).exception("BELA.Lemmatizer crashed")
# do not lemmatize if NLTK crashed
__lemmatize_error = True
if isinstance(e, LookupError):
if 'omw-1.4' in str(e):
stats_dict['errors'].append(f'Lexicon was generated without lemmatizer. OMW-1.4 data not found.')
else:
stats_dict['errors'].append(f'Lexicon was generated without lemmatizer. Unknown resource missing.')
else:
stats_dict['errors'].append('Lexicon was generated without lemmatizer. Unknown error was raised.')
_is_unknown = self.is_unknown(lemma, lang)
_lex_entry = {
'word': word,
'freq': freq,
'sents': list(self.lang_word_sent_map[lang][word]),
'speakers': list(self.lang_word_speaker_map[lang][word]),
'special_code': _is_special,
'unknown_word': _is_unknown
}
lang_lexicon['vocabs'].append(_lex_entry)
if not ignore_empty or lang_lexicon['vocabs']:
stats_dict['lexicon'].append(lang_lexicon)
return stats_dict
[docs]class CorpusLexicalAnalyser:
''' Analyse a corpus text '''
def __init__(self, filepath=':memory:', lang_lex_map=None, word_only=False, lemmatizer=True, **kwargs):
self.filepath = filepath
self.word_only = word_only
self.lemmatizer = lemmatizer
self.__lang_lex_map = {} if lang_lex_map is None else lang_lex_map
self.profiles = dd(self._create_lex_analyzer)
def _create_lex_analyzer(self):
return LexicalAnalyser(lang_lex_map=self.__lang_lex_map,
word_only=self.word_only,
lemmatizer=self.lemmatizer)
[docs] def read(self, **kwargs):
''' Read the CSV file content specified by self.filepath '''
for text, language, source, speaker in chio.read_csv_iter(self.filepath, **kwargs):
self.add(text, language, source=source, speaker=speaker)
return self
def add(self, text, language, source='', speaker=''):
if text is None:
text = ''
if language is None:
language = ''
if source is None:
source = ''
if speaker is None:
speaker = ''
self.profiles['ALL'].add(text, language, source=source, speaker=speaker)
self.profiles[speaker].add(text, language, source=source, speaker=speaker)
[docs] def analyse(self, external_tokenizer=True):
''' Analyse all available profiles (i.e. speakers) '''
for profile in self.profiles.values():
profile.analyse(external_tokenizer=external_tokenizer)
return self
[docs] def to_dict(self):
''' Export analysed result as a JSON-ready object '''
profile_list = []
for pname in sorted(self.profiles.keys()):
profile = self.profiles[pname]
profile_list.append({
'name': pname,
'stats': profile.to_dict()
})
return profile_list