import spacy import time import re import plac import operator import datetime from pathlib import Path import xml.etree.ElementTree as ET from spacy.cli.ud import conll17_ud_eval from spacy.cli.ud.ud_train import write_conllu from spacy.lang.lex_attrs import word_shape from spacy.util import get_lang_class # All languages in spaCy - in UD format (note that Norwegian is 'no' instead of 'nb') ALL_LANGUAGES = "ar, ca, da, de, el, en, es, fa, fi, fr, ga, he, hi, hr, hu, id, " \ "it, ja, no, nl, pl, pt, ro, ru, sv, tr, ur, vi, zh" # Non-parsing tasks that will be evaluated (works for default models) EVAL_NO_PARSE = ['Tokens', 'Words', 'Lemmas', 'Sentences', 'Feats'] # Tasks that will be evaluated if check_parse=True (does not work for default models) EVAL_PARSE = ['Tokens', 'Words', 'Lemmas', 'Sentences', 'Feats', 'UPOS', 'XPOS', 'AllTags', 'UAS', 'LAS'] # Minimum frequency an error should have to be printed PRINT_FREQ = 20 # Maximum number of errors printed per category PRINT_TOTAL = 10 space_re = re.compile("\s+") def load_model(modelname, add_sentencizer=False): """ Load a specific spaCy model """ loading_start = time.time() nlp = spacy.load(modelname) if add_sentencizer: nlp.add_pipe(nlp.create_pipe('sentencizer')) loading_end = time.time() loading_time = loading_end - loading_start if add_sentencizer: return nlp, loading_time, modelname + '_sentencizer' return nlp, loading_time, modelname def load_default_model_sentencizer(lang): """ Load a generic spaCy model and add the sentencizer for sentence tokenization""" loading_start = time.time() lang_class = get_lang_class(lang) nlp = lang_class() nlp.add_pipe(nlp.create_pipe('sentencizer')) loading_end = time.time() loading_time = loading_end - loading_start return nlp, loading_time, lang + "_default_" + 'sentencizer' def split_text(text): return [space_re.sub(" ", par.strip()) for par in text.split("\n\n")] def get_freq_tuples(my_list, print_total_threshold): """ Turn a list of errors into frequency-sorted tuples thresholded by a certain total number """ d = {} for token in my_list: d.setdefault(token, 0) d[token] += 1 return sorted(d.items(), key=operator.itemgetter(1), reverse=True)[:print_total_threshold] def _contains_blinded_text(stats_xml): """ Heuristic to determine whether the treebank has blinded texts or not """ tree = ET.parse(stats_xml) root = tree.getroot() total_tokens = int(root.find('size/total/tokens').text) unique_lemmas = int(root.find('lemmas').get('unique')) # assume the corpus is largely blinded when there are less than 1% unique tokens return (unique_lemmas / total_tokens) < 0.01 def fetch_all_treebanks(ud_dir, languages, corpus, best_per_language): """" Fetch the txt files for all treebanks for a given set of languages """ all_treebanks = dict() treebank_size = dict() for l in languages: all_treebanks[l] = [] treebank_size[l] = 0 for treebank_dir in ud_dir.iterdir(): if treebank_dir.is_dir(): for txt_path in treebank_dir.iterdir(): if txt_path.name.endswith('-ud-' + corpus + '.txt'): file_lang = txt_path.name.split('_')[0] if file_lang in languages: gold_path = treebank_dir / txt_path.name.replace('.txt', '.conllu') stats_xml = treebank_dir / "stats.xml" # ignore treebanks where the texts are not publicly available if not _contains_blinded_text(stats_xml): if not best_per_language: all_treebanks[file_lang].append(txt_path) # check the tokens in the gold annotation to keep only the biggest treebank per language else: with gold_path.open(mode='r', encoding='utf-8') as gold_file: gold_ud = conll17_ud_eval.load_conllu(gold_file) gold_tokens = len(gold_ud.tokens) if treebank_size[file_lang] < gold_tokens: all_treebanks[file_lang] = [txt_path] treebank_size[file_lang] = gold_tokens return all_treebanks def run_single_eval(nlp, loading_time, print_name, text_path, gold_ud, tmp_output_path, out_file, print_header, check_parse, print_freq_tasks): """" Run an evaluation of a model nlp on a certain specified treebank """ with text_path.open(mode='r', encoding='utf-8') as f: flat_text = f.read() # STEP 1: tokenize text tokenization_start = time.time() texts = split_text(flat_text) docs = list(nlp.pipe(texts)) tokenization_end = time.time() tokenization_time = tokenization_end - tokenization_start # STEP 2: record stats and timings tokens_per_s = int(len(gold_ud.tokens) / tokenization_time) print_header_1 = ['date', 'text_path', 'gold_tokens', 'model', 'loading_time', 'tokenization_time', 'tokens_per_s'] print_string_1 = [str(datetime.date.today()), text_path.name, len(gold_ud.tokens), print_name, "%.2f" % loading_time, "%.2f" % tokenization_time, tokens_per_s] # STEP 3: evaluate predicted tokens and features with tmp_output_path.open(mode="w", encoding="utf8") as tmp_out_file: write_conllu(docs, tmp_out_file) with tmp_output_path.open(mode="r", encoding="utf8") as sys_file: sys_ud = conll17_ud_eval.load_conllu(sys_file, check_parse=check_parse) tmp_output_path.unlink() scores = conll17_ud_eval.evaluate(gold_ud, sys_ud, check_parse=check_parse) # STEP 4: format the scoring results eval_headers = EVAL_PARSE if not check_parse: eval_headers = EVAL_NO_PARSE for score_name in eval_headers: score = scores[score_name] print_string_1.extend(["%.2f" % score.precision, "%.2f" % score.recall, "%.2f" % score.f1]) print_string_1.append("-" if score.aligned_accuracy is None else "%.2f" % score.aligned_accuracy) print_string_1.append("-" if score.undersegmented is None else "%.4f" % score.under_perc) print_string_1.append("-" if score.oversegmented is None else "%.4f" % score.over_perc) print_header_1.extend([score_name + '_p', score_name + '_r', score_name + '_F', score_name + '_acc', score_name + '_under', score_name + '_over']) if score_name in print_freq_tasks: print_header_1.extend([score_name + '_word_under_ex', score_name + '_shape_under_ex', score_name + '_word_over_ex', score_name + '_shape_over_ex']) d_under_words = get_freq_tuples(score.undersegmented, PRINT_TOTAL) d_under_shapes = get_freq_tuples([word_shape(x) for x in score.undersegmented], PRINT_TOTAL) d_over_words = get_freq_tuples(score.oversegmented, PRINT_TOTAL) d_over_shapes = get_freq_tuples([word_shape(x) for x in score.oversegmented], PRINT_TOTAL) # saving to CSV with ; seperator so blinding ; in the example output print_string_1.append( str({k: v for k, v in d_under_words if v > PRINT_FREQ}).replace(";", "*SEMICOLON*")) print_string_1.append( str({k: v for k, v in d_under_shapes if v > PRINT_FREQ}).replace(";", "*SEMICOLON*")) print_string_1.append( str({k: v for k, v in d_over_words if v > PRINT_FREQ}).replace(";", "*SEMICOLON*")) print_string_1.append( str({k: v for k, v in d_over_shapes if v > PRINT_FREQ}).replace(";", "*SEMICOLON*")) # STEP 5: print the formatted results to CSV if print_header: out_file.write(';'.join(map(str, print_header_1)) + '\n') out_file.write(';'.join(map(str, print_string_1)) + '\n') def run_all_evals(models, treebanks, out_file, check_parse, print_freq_tasks): """" Run an evaluation for each language with its specified models and treebanks """ print_header = True for tb_lang, treebank_list in treebanks.items(): print() print("Language", tb_lang) for text_path in treebank_list: print(" Evaluating on", text_path) gold_path = text_path.parent / (text_path.stem + '.conllu') print(" Gold data from ", gold_path) # nested try blocks to ensure the code can continue with the next iteration after a failure try: with gold_path.open(mode='r', encoding='utf-8') as gold_file: gold_ud = conll17_ud_eval.load_conllu(gold_file) for nlp, nlp_loading_time, nlp_name in models[tb_lang]: try: print(" Benchmarking", nlp_name) tmp_output_path = text_path.parent / str('tmp_' + nlp_name + '.conllu') run_single_eval(nlp, nlp_loading_time, nlp_name, text_path, gold_ud, tmp_output_path, out_file, print_header, check_parse, print_freq_tasks) print_header = False except Exception as e: print(" Ran into trouble: ", str(e)) except Exception as e: print(" Ran into trouble: ", str(e)) @plac.annotations( out_path=("Path to output CSV file", "positional", None, Path), ud_dir=("Path to Universal Dependencies corpus", "positional", None, Path), check_parse=("Set flag to evaluate parsing performance", "flag", "p", bool), langs=("Enumeration of languages to evaluate (default: all)", "option", "l", str), exclude_trained_models=("Set flag to exclude trained models", "flag", "t", bool), exclude_multi=("Set flag to exclude the multi-language model as default baseline", "flag", "m", bool), hide_freq=("Set flag to avoid printing out more detailed high-freq tokenization errors", "flag", "f", bool), corpus=("Whether to run on train, dev or test", "option", "c", str), best_per_language=("Set flag to only keep the largest treebank for each language", "flag", "b", bool) ) def main(out_path, ud_dir, check_parse=False, langs=ALL_LANGUAGES, exclude_trained_models=False, exclude_multi=False, hide_freq=False, corpus='train', best_per_language=False): """" Assemble all treebanks and models to run evaluations with. When setting check_parse to True, the default models will not be evaluated as they don't have parsing functionality """ languages = [lang.strip() for lang in langs.split(",")] print_freq_tasks = [] if not hide_freq: print_freq_tasks = ['Tokens'] # fetching all relevant treebank from the directory treebanks = fetch_all_treebanks(ud_dir, languages, corpus, best_per_language) print() print("Loading all relevant models for", languages) models = dict() # multi-lang model multi = None if not exclude_multi and not check_parse: multi = load_model('xx_ent_wiki_sm', add_sentencizer=True) # initialize all models with the multi-lang model for lang in languages: models[lang] = [multi] if multi else [] # add default models if we don't want to evaluate parsing info if not check_parse: # Norwegian is 'nb' in spaCy but 'no' in the UD corpora if lang == 'no': models['no'].append(load_default_model_sentencizer('nb')) else: models[lang].append(load_default_model_sentencizer(lang)) # language-specific trained models if not exclude_trained_models: if 'de' in models: models['de'].append(load_model('de_core_news_sm')) if 'es' in models: models['es'].append(load_model('es_core_news_sm')) models['es'].append(load_model('es_core_news_md')) if 'pt' in models: models['pt'].append(load_model('pt_core_news_sm')) if 'it' in models: models['it'].append(load_model('it_core_news_sm')) if 'nl' in models: models['nl'].append(load_model('nl_core_news_sm')) if 'en' in models: models['en'].append(load_model('en_core_web_sm')) models['en'].append(load_model('en_core_web_md')) models['en'].append(load_model('en_core_web_lg')) if 'fr' in models: models['fr'].append(load_model('fr_core_news_sm')) models['fr'].append(load_model('fr_core_news_md')) with out_path.open(mode='w', encoding='utf-8') as out_file: run_all_evals(models, treebanks, out_file, check_parse, print_freq_tasks) if __name__ == "__main__": plac.call(main)