|
|
|
""" |
|
Created on Wed Sep 7 08:58:22 2022 |
|
|
|
@author: luol2 |
|
""" |
|
|
|
|
|
""" |
|
Created on Fri Jun 24 11:27:57 2022 |
|
|
|
@author: luol2 |
|
""" |
|
|
|
|
|
import stanza |
|
import sys |
|
import os |
|
import io |
|
import json |
|
import re |
|
|
|
def pubtator_entitysort(infile): |
|
|
|
fin=open(infile,'r',encoding='utf-8') |
|
|
|
fout=io.StringIO() |
|
all_in=fin.read().strip().split('\n\n') |
|
fin.close() |
|
error_dict={} |
|
for doc in all_in: |
|
entity_dict={} |
|
lines=doc.split('\n') |
|
fout.write(lines[0]+'\n'+lines[1]+'\n') |
|
for i in range(2,len(lines)): |
|
segs=lines[i].split('\t') |
|
if len(segs)>=5: |
|
if lines[i] not in entity_dict.keys(): |
|
entity_dict[lines[i]]=int(segs[1]) |
|
else: |
|
print('entity have in',lines[i]) |
|
if segs[0] not in error_dict.keys(): |
|
error_dict[segs[0]]=[lines[i]] |
|
else: |
|
if lines[i] not in error_dict[segs[0]]: |
|
error_dict[segs[0]].append(lines[i]) |
|
|
|
entity_sort=sorted(entity_dict.items(), key=lambda kv:(kv[1]), reverse=False) |
|
for ele in entity_sort: |
|
fout.write(ele[0]+'\n') |
|
fout.write('\n') |
|
return fout |
|
|
|
def filter_overlap(infile): |
|
|
|
fin=io.StringIO(infile.getvalue()) |
|
fout=io.StringIO() |
|
|
|
documents=fin.read().strip().split('\n\n') |
|
fin.close() |
|
total_entity=0 |
|
over_entity=0 |
|
nest_entity=0 |
|
for doc in documents: |
|
lines=doc.split('\n') |
|
entity_list=[] |
|
if len(lines)>2: |
|
first_entity=lines[2].split('\t') |
|
nest_list=[first_entity] |
|
max_eid=int(first_entity[2]) |
|
total_entity+=len(lines)-2 |
|
for i in range(3,len(lines)): |
|
segs=lines[i].split('\t') |
|
if int(segs[1])> max_eid: |
|
if len(nest_list)==1: |
|
entity_list.append(nest_list[0]) |
|
nest_list=[] |
|
nest_list.append(segs) |
|
if int(segs[2])>max_eid: |
|
max_eid=int(segs[2]) |
|
else: |
|
|
|
nest_entity+=len(nest_list)-1 |
|
tem=find_max_entity(nest_list) |
|
|
|
|
|
entity_list.extend(tem) |
|
nest_list=[] |
|
nest_list.append(segs) |
|
if int(segs[2])>max_eid: |
|
max_eid=int(segs[2]) |
|
|
|
else: |
|
nest_list.append(segs) |
|
if int(segs[2])>max_eid: |
|
max_eid=int(segs[2]) |
|
if nest_list!=[]: |
|
if len(nest_list)==1: |
|
entity_list.append(nest_list[0]) |
|
|
|
else: |
|
tem=find_max_entity(nest_list) |
|
|
|
|
|
entity_list.extend(tem) |
|
fout.write(lines[0]+'\n'+lines[1]+'\n') |
|
for ele in entity_list: |
|
fout.write('\t'.join(ele)+'\n') |
|
fout.write('\n') |
|
|
|
return fout |
|
def find_max_entity(nest_list): |
|
max_len=0 |
|
final_tem=[] |
|
max_index=0 |
|
for i in range(0, len(nest_list)): |
|
cur_len=int(nest_list[i][2])-int(nest_list[i][1]) |
|
if cur_len>max_len: |
|
max_len=cur_len |
|
max_index=i |
|
|
|
final_tem.append(nest_list[max_index]) |
|
return final_tem |
|
|
|
|
|
def pubtator_to_labeltext(infile): |
|
|
|
fin=io.StringIO(infile.getvalue()) |
|
all_context=fin.read().strip().split('\n\n') |
|
fin.close() |
|
fout=io.StringIO() |
|
label_dic={} |
|
|
|
for doc in all_context: |
|
lines=doc.split('\n') |
|
ori_text=lines[0].split('|t|')[1]+' '+lines[1].split('|a|')[1] |
|
pmid=lines[0].split('|t|')[0] |
|
s_index=0 |
|
e_index=0 |
|
new_text='' |
|
for i in range(2,len(lines)): |
|
segs=lines[i].split('\t') |
|
label_dic[segs[4].lower()]=segs[4] |
|
if len(segs)==6: |
|
e_index=int(segs[1]) |
|
new_text+=ori_text[s_index:e_index]+' ssss'+segs[4].lower()+' '+ori_text[int(segs[1]):int(segs[2])]+' eeee'+segs[4].lower()+' ' |
|
s_index=int(segs[2]) |
|
|
|
|
|
|
|
new_text+=ori_text[s_index:] |
|
fout.write(pmid+'\t'+' '.join(new_text.strip().split())+'\n') |
|
return fout, label_dic |
|
|
|
|
|
def pre_token(sentence): |
|
sentence=re.sub("([\=\/\(\)\<\>\+\-\_])"," \\1 ",sentence) |
|
sentence=re.sub("[ ]+"," ",sentence); |
|
return sentence |
|
|
|
|
|
def labeltext_to_conll_fasttoken(infile,label_dic): |
|
|
|
fin=io.StringIO(infile.getvalue()) |
|
all_context=fin.read().strip().split('\n') |
|
fin.close() |
|
fout=io.StringIO() |
|
|
|
|
|
nlp = stanza.Pipeline(lang='en', processors={'tokenize': 'spacy'},package='None') |
|
|
|
doc_i=0 |
|
for doc in all_context: |
|
doc_text=doc.split('\t')[1] |
|
doc_text=pre_token(doc_text) |
|
doc_stanza = nlp(doc_text) |
|
doc_i+=1 |
|
|
|
inentity_flag=0 |
|
last_label='O' |
|
for sent in doc_stanza.sentences: |
|
temp_sent=[] |
|
word_num=0 |
|
for word in sent.words: |
|
word_num+=1 |
|
|
|
if word.text.strip()=='': |
|
continue |
|
temp_sent.append(word.text) |
|
if word.text.startswith('ssss')==True: |
|
last_label=word.text |
|
inentity_flag=1 |
|
elif word.text.startswith('eeee')==True: |
|
last_label=word.text |
|
inentity_flag=0 |
|
else: |
|
if last_label=='O': |
|
now_label='O' |
|
elif last_label.startswith('ssss')==True: |
|
now_label='B-'+label_dic[last_label[4:]] |
|
|
|
elif last_label.startswith('B-')==True: |
|
now_label='I-'+last_label[2:] |
|
elif last_label.startswith('I-')==True: |
|
now_label='I-'+last_label[2:] |
|
elif last_label.startswith('eeee')==True: |
|
now_label='O' |
|
|
|
fout.write(word.text+'\t'+now_label+'\n') |
|
last_label=now_label |
|
if inentity_flag==1: |
|
|
|
|
|
|
|
pass |
|
else: |
|
fout.write('\n') |
|
return fout |
|
|
|
def pubtator_to_conll(infile): |
|
|
|
|
|
input_sort=pubtator_entitysort(infile) |
|
|
|
|
|
|
|
input_nonest=filter_overlap(input_sort) |
|
|
|
|
|
|
|
input_labtext,label_dic=pubtator_to_labeltext(input_nonest) |
|
|
|
|
|
|
|
|
|
output = labeltext_to_conll_fasttoken(input_labtext,label_dic) |
|
|
|
|
|
|
|
|
|
return output |
|
|
|
if __name__=='__main__': |
|
|
|
|
|
infile='../../TrainingSet/No100/NER.Train.txt' |
|
output=pubtator_to_conll(infile) |
|
fout=open('../../TrainingSet/No100/NER.Train.conll','w',encoding='utf-8') |
|
fout.write(output.getvalue()) |
|
fout.close() |
|
output.close() |
|
|
|
|
|
|
|
|
|
|
|
|