"""Utility functions for UCCA package."""
import os
import time
from collections import OrderedDict
from collections import deque
from enum import Enum
from itertools import groupby, islice
from operator import attrgetter, itemgetter
import numpy as np
from ucca import layer0, layer1
MODEL_ENV_VAR = "SPACY_MODEL" # Determines the default spaCy model to load
DEFAULT_MODEL = {"en": "en_core_web_md", "fr": "fr_core_news_md", "de": "de_core_news_md"}
N_THREADS = 4
BATCH_SIZE = 50
class Attr(Enum):
"""Wrapper for spaCy Attr, determining order for saving in layer0.extra per token when as_array=True"""
ORTH = 0
LEMMA = 1
TAG = 2
POS = 3
ENT_TYPE = 4
ENT_IOB = 5
DEP = 6
HEAD = 7
SHAPE = 8
PREFIX = 9
SUFFIX = 10
def __call__(self, value, vocab=None, as_array=False, lang=None):
"""Resolve numeric ID of attribute value to string (if as_array=False) or to int (if as_array=True)"""
if value is None:
return None
if self in (Attr.ENT_IOB, Attr.HEAD):
return int(np.int64(value))
if as_array:
is_str = isinstance(value, str)
if is_str or self in (Attr.ORTH, Attr.LEMMA):
try: # Will find the value even if it's a new string, but that's OK since the hash is deterministic
i = get_vocab(vocab, lang).strings[value]
if is_str: # Replace with numeric ID since as_array=True
value = i
except KeyError:
value = None
return value if value is None or isinstance(value, str) else int(value)
try:
return get_vocab(vocab, lang)[value].text
except KeyError:
return None
@property
def key(self):
"""String used in `extra' dict of Terminals to store this attribute when as_array=False"""
return self.name.lower()
def get_nlp(lang="en"):
"""Load spaCy model for a given language, determined by `models' dict or by MODEL_ENV_VAR"""
instance = nlp.get(lang)
if instance is None:
import spacy
model = models.get(lang)
if not model:
models[lang] = model = os.environ.get("_".join((MODEL_ENV_VAR, lang.upper()))) or \
os.environ.get(MODEL_ENV_VAR) or DEFAULT_MODEL.get(lang, "xx")
started = time.time()
with external_write_mode():
print("Loading spaCy model '%s'... " % model, end="", flush=True)
try:
nlp[lang] = instance = spacy.load(model)
except OSError:
spacy.cli.download(model)
try:
nlp[lang] = instance = spacy.load(model)
except OSError as e:
raise OSError("Failed to get spaCy model. Download it manually using "
"`python -m spacy download %s`." % model) from e
tokenizer[lang] = instance.tokenizer
instance.tokenizer = lambda words: spacy.tokens.Doc(instance.vocab, words=words)
print("Done (%.3fs)." % (time.time() - started))
return instance
models = {} # maps language two-letter code to name of spaCy model
nlp = {} # maps language two-letter code to actual loaded spaCy model
tokenizer = {} # maps language two-letter code to tokenizer of spaCy model
def get_tokenizer(tokenized=False, lang="en"):
instance = get_nlp(lang)
return instance.tokenizer if tokenized else tokenizer[lang]
def get_vocab(vocab=None, lang=None):
if vocab is not None:
return vocab
return (get_nlp(lang) if lang else get_nlp()).vocab
def get_word_vectors(dim=None, size=None, filename=None, vocab=None):
"""
Get word vectors from spaCy model or from text file
:param dim: dimension to trim vectors to (default: keep original)
:param size: maximum number of vectors to load (default: all)
:param filename: text file to load vectors from (default: from spaCy model)
:param vocab: instead of strings, look up keys of returned dict in vocab (use lang str, e.g. "en", for spaCy vocab)
:return: tuple of (dict of word [string or integer] -> vector [NumPy array], dimension)
"""
orig_keys = vocab is None
if isinstance(vocab, str) or not filename:
vocab = get_nlp(vocab if isinstance(vocab, str) else "en").vocab
def _lookup(word):
try:
return word.orth_ if orig_keys else word.orth
except AttributeError:
if orig_keys:
return word
lex = vocab[word]
return getattr(lex, "orth", lex)
if filename:
it = read_word_vectors(dim, size, filename)
nr_row, nr_dim = next(it)
vectors = OrderedDict(islice(((_lookup(w), v) for w, v in it if orig_keys or w in vocab), nr_row))
else: # return spaCy vectors
nr_row, nr_dim = vocab.vectors.shape
if dim is not None and dim < nr_dim:
nr_dim = int(dim)
vocab.vectors.resize(shape=(int(size or nr_row), nr_dim))
lexemes = sorted([l for l in vocab if l.has_vector], key=attrgetter("prob"), reverse=True)[:size]
vectors = OrderedDict((_lookup(l), l.vector) for l in lexemes)
return vectors, nr_dim
def read_word_vectors(dim, size, filename):
"""
Read word vectors from text file, with an optional first row indicating size and dimension
:param dim: dimension to trim vectors to
:param size: maximum number of vectors to load
:param filename: text file to load vectors from
:return: generator: first element is (#vectors, #dims); and all the rest are (word [string], vector [NumPy array])
"""
try:
first_line = True
nr_row = nr_dim = None
with open(filename, encoding="utf-8") as f:
for line in f:
fields = line.split()
if first_line:
first_line = False
try:
nr_row, nr_dim = map(int, fields)
is_header = True
except ValueError:
nr_dim = len(fields) - 1 # No header, just get vector length from first one
is_header = False
if dim and dim < nr_dim:
nr_dim = dim
yield size or nr_row, nr_dim
if is_header:
continue # Read next line
word, *vector = fields
if len(vector) >= nr_dim: # May not be equal if word is whitespace
yield word, np.asarray(vector[-nr_dim:], dtype="f")
except OSError as e:
raise IOError("Failed loading word vectors from '%s'" % filename) from e
def annotate(passage, *args, **kwargs):
"""
Run spaCy pipeline on the given passage, unless already annotated
:param passage: Passage object, whose layer 0 nodes will be added entries in the `extra' dict
"""
list(annotate_all([passage], *args, **kwargs))
def annotate_as_tuples(passages, replace=False, as_array=False, lang="en", vocab=None, verbose=False):
for passage_lang, passages_by_lang in groupby(passages, get_lang):
for need_annotation, stream in groupby(to_annotate(passages_by_lang, replace, as_array), lambda x: bool(x[0])):
annotated = get_nlp(passage_lang or lang).pipe(
stream, as_tuples=True, n_threads=N_THREADS, batch_size=BATCH_SIZE) if need_annotation else stream
annotated = set_docs(annotated, as_array, passage_lang or lang, vocab, replace, verbose)
for passage, passages in groupby(annotated, itemgetter(0)):
yield deque(passages, maxlen=1).pop() # Wait until all paragraphs have been annotated
def annotate_all(passages, replace=False, as_array=False, as_tuples=False, lang="en", vocab=None, verbose=False):
"""
Run spaCy pipeline on the given passages, unless already annotated
:param passages: iterable of Passage objects, whose layer 0 nodes will be added entries in the `extra' dict
:param replace: even if a given passage is already annotated, replace with new annotation
:param as_array: instead of adding `extra' entries to each terminal, set layer 0 extra["doc"] to array of ids
:param as_tuples: treat input as tuples of (passage text, context), and return context for each passage as-is
:param lang: optional two-letter language code, will be overridden if passage has "lang" attrib
:param vocab: optional dictionary of vocabulary IDs to string values, to avoid loading spaCy model
:param verbose: whether to print annotated text
:return: generator of annotated passages, which are actually modified in-place (same objects as input)
"""
if not as_tuples:
passages = ((p,) for p in passages)
for t in annotate_as_tuples(passages, replace=replace, as_array=as_array, lang=lang, vocab=vocab, verbose=verbose):
yield t if as_tuples else t[0]
def get_lang(passage_context):
return passage_context[0].attrib.get("lang")
def to_annotate(passage_contexts, replace, as_array):
"""Filter passages to get only those that require annotation; split to paragraphs and return generator of
(list of tokens, (paragraph index, list of Terminals, Passage) + original context appended) tuples"""
return (([t.text for t in terminals] if replace or not is_annotated(passage, as_array) else (),
(i, terminals, passage) + tuple(context)) for passage, *context in passage_contexts
for i, terminals in enumerate(break2paragraphs(passage, return_terminals=True)))
def is_annotated(passage, as_array):
"""Whether the passage is already annotated or only partially annotated"""
l0 = passage.layer(layer0.LAYER_ID)
if as_array:
docs = l0.extra.get("doc")
return not l0.all or docs is not None and len(docs) == max(t.paragraph for t in l0.all) and \
sum(map(len, docs)) == len(l0.all) and \
all(i is None or isinstance(i, int) for l in docs for t in l for i in t)
return all(a.key in t.extra for t in l0.all for a in Attr)
def set_docs(annotated, as_array, lang, vocab, replace, verbose):
"""Given spaCy annotations, set values in layer0.extra per paragraph if as_array=True, or else in Terminal.extra"""
for doc, (i, terminals, passage, *context) in annotated:
if doc: # Not empty, so copy values
from spacy import attrs
arr = doc.to_array([getattr(attrs, a.name) for a in Attr])
if as_array:
docs = passage.layer(layer0.LAYER_ID).docs(i + 1)
existing = docs[i] + (len(arr) - len(docs[i])) * [len(Attr) * [None]]
docs[i] = [[a(v if e is None or replace else e, get_vocab(vocab, lang), as_array=True)
for a, v, e in zip(Attr, values, es)] for values, es in zip(arr, existing)]
else:
for terminal, values in zip(terminals, arr):
for attr, value in zip(Attr, values):
if replace or not terminal.extra.get(attr.key):
terminal.extra[attr.key] = attr(value, get_vocab(vocab, lang))
if verbose:
data = [[a.key for a in Attr]] + \
[[str(a(t.tok[a.value], get_vocab(vocab, lang)) if as_array else t.extra[a.key])
for a in Attr] for j, t in enumerate(terminals)]
width = [max(len(f) for f in t) for t in data]
for j in range(len(Attr)):
try:
print(" ".join("%-*s" % (w, f[j]) for f, w in zip(data, width)))
except UnicodeEncodeError:
pass
print()
yield (passage,) + tuple(context)
SENTENCE_END_MARKS = ('.', '?', '!')
def break2sentences(passage, lang="en", *args, **kwargs):
"""
Breaks paragraphs into sentences according to the annotation.
A sentence is a list of terminals which ends with a mark from
SENTENCE_END_MARKS, and is also the end of a paragraph or parallel scene.
:param passage: the Passage object to operate on
:param lang: optional two-letter language code
:return: a list of positions in the Passage, each denotes a closing Terminal of a sentence.
"""
del args, kwargs
l1 = passage.layer(layer1.LAYER_ID)
terminals = extract_terminals(passage)
if not terminals:
return []
if any(n.outgoing for n in l1.all): # Passage is labeled
ps_ends = [ps.end_position for ps in l1.top_scenes]
ps_starts = [ps.start_position for ps in l1.top_scenes]
marks = [t.position for t in terminals if t.text in SENTENCE_END_MARKS]
# Annotations doesn't always include the ending period (or other mark)
# with the parallel scene it closes. Hence, if the terminal before the
# mark closed the parallel scene, and this mark doesn't open a scene
# in any way (hence it probably just "hangs" there), it's a sentence end
marks = [x for x in marks if x in ps_ends or ((x - 1) in ps_ends and x not in ps_starts)]
else: # Not labeled, split using spaCy
annotated = get_nlp(lang=lang)([t.text for t in terminals])
marks = [span.end for span in annotated.sents]
marks = sorted(set(marks + break2paragraphs(passage)))
# Avoid punctuation-only sentences
if len(marks) > 1:
marks = [x for x, y in zip(marks[:-1], marks[1:]) if not all(layer0.is_punct(t) for t in terminals[x:y])] + \
[marks[-1]]
return marks
def extract_terminals(p):
"""returns an iterator of the terminals of the passage p"""
return p.layer(layer0.LAYER_ID).all
def break2paragraphs(passage, return_terminals=False, *args, **kwargs):
"""
Breaks into paragraphs according to the annotation.
Uses the `paragraph' attribute of layer 0 to find paragraphs.
:param passage: the Passage object to operate on
:param return_terminals: whether to return actual Terminal objects of all terminals rather than just end positions
:return: a list of positions in the Passage, each denotes a closing Terminal of a paragraph.
"""
del args, kwargs
terminals = list(extract_terminals(passage))
if not terminals:
return []
return [list(p) for _, p in groupby(terminals, key=attrgetter("paragraph"))] if return_terminals else \
[t.position - 1 for t in terminals if t.position > 1 and t.para_pos == 1] + [terminals[-1].position]
def indent_xml(xml_as_string):
"""
Indents a string of XML-like objects.
This works only for units with no text or tail members, and only for
strings whose leaves are written as and not .
:param xml_as_string: XML string to indent
:return: indented XML string
"""
tabs = 0
lines = str(xml_as_string).replace('><', '>\n<').splitlines()
s = ''
for line in lines:
if line.startswith(''):
tabs -= 1
s += (" " * tabs) + line + '\n'
if not (line.endswith('/>') or line.startswith('')):
tabs += 1
return s