Spaces:
Sleeping
Sleeping
import gzip | |
import json | |
import os | |
import logging | |
from argparse import ArgumentParser | |
from itertools import accumulate | |
import nltk | |
import numpy as np | |
from tools.framenet.nltk_framenet import framenet, framenet15 | |
from tqdm import tqdm | |
from tools.framenet.fn_util import framenet_split, Sentence | |
logger = logging.getLogger('fn') | |
def _load_raw(version): | |
if version == '1.5': | |
nltk.download('framenet_v15') | |
return framenet15 | |
else: | |
nltk.download('framenet_v17') | |
return framenet | |
def one_frame(sentence, ann): | |
frame_info = {'label': ann.frame.name} | |
target_list = list() | |
for start, end in ann.Target: | |
start, end = sentence.span(start, end) | |
target_list.extend(list(range(start, end+1))) | |
assert len(target_list) > 0 | |
frame_info['span'] = [sorted(target_list)[0], sorted(target_list)[-1]] | |
frame_info['lu'] = ann.LU.name | |
frame_info['children'] = fes = list() | |
for start, end, fe_name in ann.FE[0]: | |
start, end = sentence.span(start, end) | |
fes.append({'span': [start, end], 'label': fe_name}) | |
return frame_info | |
def load_nltk_exemplars(version, exclude_ann_ids=None): | |
exclude_ann_ids = exclude_ann_ids or list() | |
fn = _load_raw(version) | |
egs = list() | |
bar = tqdm() | |
skipped = 0 | |
try: | |
for eg in fn.annotations(full_text=False): | |
if 'Target' not in eg.keys(): | |
# A bug of nltk | |
continue | |
if eg.ID in exclude_ann_ids: | |
skipped += 1 | |
continue | |
try: | |
sentence = Sentence(eg.text) | |
egs.append({ | |
'tokens': list(map(str, sentence.tokens)), 'annotations': [one_frame(sentence, eg)], | |
'meta': { | |
'fully_annotated': False, | |
'source': f'framenet_v{version}', | |
'with_fe': True, | |
'type': 'exemplar', | |
'ann_ids': [eg.ID], | |
} | |
}) | |
bar.update() | |
except: | |
pass | |
except: | |
pass | |
bar.close() | |
logger.info(f'Loaded {len(egs)} sentences for framenet v{version} from exemplars. (skipped {skipped} sentences)') | |
return egs | |
def load_nltk_fully_annotated(version): | |
fn = _load_raw(version) | |
splits = list(framenet_split.keys()) | |
all_containers = {split: [] for split in splits} | |
for doc in tqdm(fn.docs()): | |
container = all_containers['train'] | |
for sp in splits: | |
if doc.filename in framenet_split[sp]: | |
container = all_containers[sp] | |
for sent in doc.sentence: | |
sentence = Sentence(sent.text) | |
all_frames = list() | |
ann_ids = [] | |
for ann in sent.annotationSet: | |
if ann._type == 'posannotationset': | |
continue | |
assert ann._type == 'fulltext_annotationset' | |
if 'Target' not in ann.keys(): | |
logger.warning('Target not found.') | |
continue | |
if 'ID' in ann: | |
ann_ids.append(ann['ID']) | |
frame_info = one_frame(sentence, ann) | |
all_frames.append(frame_info) | |
eg_dict = { | |
'tokens': list(map(str, sentence.tokens)), 'annotations': all_frames, | |
'meta': { | |
'source': f'framenet_v{version}', | |
'fully_annotated': True, | |
'with_fe': True, | |
'type': 'full text', | |
'sentence ID': sent.ID, | |
'doc': doc.filename, | |
'ann_ids': ann_ids | |
} | |
} | |
container.append(eg_dict) | |
for sp in splits: | |
logger.info(f'Load {len(all_containers[sp])} for {sp}.') | |
return all_containers | |
def load_expanded_fn(path): | |
raise NotImplementedError | |
with gzip.open(path, 'rb') as compressed: | |
lines = compressed.read().decode() | |
instances = list() | |
lines = lines.split('\n') | |
for line in tqdm(lines): | |
if len(line) != 0: | |
instances.append(json.loads(line)) | |
logger.info(f'{len(instances)} lines loaded.') | |
dataset = list() | |
for instance in tqdm(instances, desc='Processing expanded framenet...'): | |
for output in instance['outputs']: | |
ins_dict = dict() | |
ins_dict['meta'] = { | |
'source': 'expanded framenet', | |
'type': 'paraphrase', | |
'exemplar_id': instance['exemplar_id'], | |
'annoset_id': instance['annoset_id'] | |
} | |
words = output['output_string'] | |
text = ' '.join(words) | |
length_offsets = [0] + list(accumulate(map(len, words))) | |
start_idx, end_idx = output['output_trigger_offset'] | |
start_idx = length_offsets[start_idx] + start_idx | |
end_idx = length_offsets[end_idx] + end_idx - 2 | |
sentence = Sentence(text) | |
ins_dict['text'] = sentence.tokens | |
ins_dict['pos'] = sentence.pos | |
ins_dict['tag'] = sentence.tag | |
ins_dict['frame'] = [{ | |
'name': instance['frame_name'], | |
'target': list(range(sentence.span(start_idx, end_idx)[0], sentence.span(start_idx, end_idx)[1]+1)), | |
'lu': output['output_trigger'], | |
'fe': [] | |
}] | |
ins_dict['score'] = { | |
'pbr': np.exp(-output['pbr_score']), | |
'aligner': output['aligner_score'], | |
} | |
ins_dict['with_fe'] = False | |
ins_dict['fully_annotated'] = False | |
dataset.append(ins_dict) | |
logger.info(f'{len(dataset)} sentences loaded.') | |
return dataset | |
if __name__ == '__main__': | |
logging.basicConfig(level='INFO') | |
arg_parser = ArgumentParser() | |
arg_parser.add_argument('output', type=str) | |
arg_parser.add_argument('-v', type=str, default='1.7') | |
cmd_args = arg_parser.parse_args() | |
full = load_nltk_fully_annotated(cmd_args.v) | |
full_ann_ids = list() | |
for split in ['train', 'dev', 'test']: | |
for sent in full[split]: | |
full_ann_ids.extend(sent['meta']['ann_ids']) | |
exe = load_nltk_exemplars(cmd_args.v, full_ann_ids) | |
os.makedirs(cmd_args.output, exist_ok=True) | |
with open(os.path.join(cmd_args.output, 'full.' + cmd_args.v.replace('.', '') + '.json'), 'w') as fp: | |
json.dump(full, fp) | |
with open(os.path.join(cmd_args.output, 'exe.' + cmd_args.v.replace('.', '') + '.json'), 'w') as fp: | |
json.dump(exe, fp) | |