import whisper
import os
from pytube import YouTube
import pandas as pd
import plotly_express as px
import nltk
import plotly.graph_objects as go
from optimum.onnxruntime import ORTModelForSequenceClassification
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForTokenClassification,WhisperProcessor, WhisperForConditionalGeneration
from sentence_transformers import SentenceTransformer, CrossEncoder, util
import streamlit as st
import en_core_web_lg
import validators
import re
import itertools
import numpy as np
from bs4 import BeautifulSoup
import base64, time
from annotated_text import annotated_text
nltk.download('punkt')
from nltk import sent_tokenize
time_str = time.strftime("%d%m%Y-%H%M%S")
HTML_WRAPPER = """
{}
"""
@st.experimental_singleton(suppress_st_warning=True)
def load_models():
asr_model = whisper.load_model("small")
asr_pipe = pipeline("automatice-speech-recognition",model = "openai/whisper-large")
q_model = ORTModelForSequenceClassification.from_pretrained("nickmuchi/quantized-optimum-finbert-tone")
ner_model = AutoModelForTokenClassification.from_pretrained("xlm-roberta-large-finetuned-conll03-english")
q_tokenizer = AutoTokenizer.from_pretrained("nickmuchi/quantized-optimum-finbert-tone")
ner_tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-large-finetuned-conll03-english")
sent_pipe = pipeline("text-classification",model=q_model, tokenizer=q_tokenizer)
sum_pipe = pipeline("summarization",model="facebook/bart-large-cnn", tokenizer="facebook/bart-large-cnn",clean_up_tokenization_spaces=True)
ner_pipe = pipeline("ner", model=ner_model, tokenizer=ner_tokenizer, grouped_entities=True)
sbert = SentenceTransformer("all-mpnet-base-v2")
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2')
return asr_pipe, sent_pipe, sum_pipe, ner_pipe, sbert, cross_encoder
@st.experimental_singleton(suppress_st_warning=True)
def get_spacy():
nlp = en_core_web_lg.load()
return nlp
@st.experimental_memo(suppress_st_warning=True)
def inference(link, upload):
'''Convert Youtube video or Audio upload to text'''
if validators.url(link):
yt = YouTube(link)
title = yt.title
path = yt.streams.filter(only_audio=True)[0].download(filename="audio.mp4")
results = asr_pipe(path)
return results['text'], yt.title
elif upload:
results = asr_pipe(upload)
return results['text'], "Transcribed Earnings Audio"
@st.experimental_memo(suppress_st_warning=True)
def chunk_long_text(text,threshold):
'''Chunk long text'''
sentences = sent_tokenize(text)
out = []
for chunk in sentences:
if len(chunk.split()) < threshold:
out.append(chunk)
else:
words = chunk.split()
num = int(len(words)/threshold)
for i in range(0,num*threshold+1,threshold):
out.append(' '.join(words[i:threshold+i]))
return out
@st.experimental_memo(suppress_st_warning=True)
def sentiment_pipe(earnings_text):
'''Determine the sentiment of the text'''
earnings_sentences = chunk_long_text(earnings_text,400)
earnings_sentiment = sent_pipe(earnings_sentences)
return earnings_sentiment, earnings_sentences
@st.experimental_memo(suppress_st_warning=True)
def preprocess_plain_text(text,window_size=3):
'''Preprocess text for semantic search'''
text = text.encode("ascii", "ignore").decode() # unicode
text = re.sub(r"https*\S+", " ", text) # url
text = re.sub(r"@\S+", " ", text) # mentions
text = re.sub(r"#\S+", " ", text) # hastags
text = re.sub(r"\s{2,}", " ", text) # over spaces
#text = re.sub("[^.,!?%$A-Za-z0-9]+", " ", text) # special characters except .,!?
#break into lines and remove leading and trailing space on each
lines = [line.strip() for line in text.splitlines()]
# #break multi-headlines into a line each
chunks = [phrase.strip() for line in lines for phrase in line.split(" ")]
# # drop blank lines
text = '\n'.join(chunk for chunk in chunks if chunk)
## We split this article into paragraphs and then every paragraph into sentences
paragraphs = []
for paragraph in text.replace('\n',' ').split("\n\n"):
if len(paragraph.strip()) > 0:
paragraphs.append(sent_tokenize(paragraph.strip()))
#We combine up to 3 sentences into a passage. You can choose smaller or larger values for window_size
#Smaller value: Context from other sentences might get lost
#Lager values: More context from the paragraph remains, but results are longer
window_size = window_size
passages = []
for paragraph in paragraphs:
for start_idx in range(0, len(paragraph), window_size):
end_idx = min(start_idx+window_size, len(paragraph))
passages.append(" ".join(paragraph[start_idx:end_idx]))
print(f"Sentences: {sum([len(p) for p in paragraphs])}")
print(f"Passages: {len(passages)}")
return passages
@st.experimental_memo(suppress_st_warning=True)
def chunk_and_preprocess_text(text):
"""Chunk text longer than 500 tokens"""
text = text.encode("ascii", "ignore").decode() # unicode
text = re.sub(r"https*\S+", " ", text) # url
text = re.sub(r"@\S+", " ", text) # mentions
text = re.sub(r"#\S+", " ", text) # hastags
text = re.sub(r"\s{2,}", " ", text) # over spaces
article = nlp(text)
sentences = [i.text for i in list(article.sents)]
current_chunk = 0
chunks = []
for sentence in sentences:
if len(chunks) == current_chunk + 1:
if len(chunks[current_chunk]) + len(sentence.split(" ")) <= 500:
chunks[current_chunk].extend(sentence.split(" "))
else:
current_chunk += 1
chunks.append(sentence.split(" "))
else:
chunks.append(sentence.split(" "))
for chunk_id in range(len(chunks)):
chunks[chunk_id] = " ".join(chunks[chunk_id])
return chunks
def summary_downloader(raw_text):
b64 = base64.b64encode(raw_text.encode()).decode()
new_filename = "new_text_file_{}_.txt".format(time_str)
st.markdown("#### Download Summary as a File ###")
href = f'Click to Download!!'
st.markdown(href,unsafe_allow_html=True)
@st.experimental_memo(suppress_st_warning=True)
def get_all_entities_per_sentence(text):
doc = nlp(''.join(text))
sentences = list(doc.sents)
entities_all_sentences = []
for sentence in sentences:
entities_this_sentence = []
# SPACY ENTITIES
for entity in sentence.ents:
entities_this_sentence.append(str(entity))
# FLAIR ENTITIES (CURRENTLY NOT USED)
# sentence_entities = Sentence(str(sentence))
# tagger.predict(sentence_entities)
# for entity in sentence_entities.get_spans('ner'):
# entities_this_sentence.append(entity.text)
# XLM ENTITIES
entities_xlm = [entity["word"] for entity in ner_pipe(str(sentence))]
for entity in entities_xlm:
entities_this_sentence.append(str(entity))
entities_all_sentences.append(entities_this_sentence)
return entities_all_sentences
@st.experimental_memo(suppress_st_warning=True)
def get_all_entities(text):
all_entities_per_sentence = get_all_entities_per_sentence(text)
return list(itertools.chain.from_iterable(all_entities_per_sentence))
@st.experimental_memo(suppress_st_warning=True)
def get_and_compare_entities(article_content,summary_output):
all_entities_per_sentence = get_all_entities_per_sentence(article_content)
entities_article = list(itertools.chain.from_iterable(all_entities_per_sentence))
all_entities_per_sentence = get_all_entities_per_sentence(summary_output)
entities_summary = list(itertools.chain.from_iterable(all_entities_per_sentence))
matched_entities = []
unmatched_entities = []
for entity in entities_summary:
if any(entity.lower() in substring_entity.lower() for substring_entity in entities_article):
matched_entities.append(entity)
elif any(
np.inner(sbert.encode(entity, show_progress_bar=False),
sbert.encode(art_entity, show_progress_bar=False)) > 0.9 for
art_entity in entities_article):
matched_entities.append(entity)
else:
unmatched_entities.append(entity)
matched_entities = list(dict.fromkeys(matched_entities))
unmatched_entities = list(dict.fromkeys(unmatched_entities))
matched_entities_to_remove = []
unmatched_entities_to_remove = []
for entity in matched_entities:
for substring_entity in matched_entities:
if entity != substring_entity and entity.lower() in substring_entity.lower():
matched_entities_to_remove.append(entity)
for entity in unmatched_entities:
for substring_entity in unmatched_entities:
if entity != substring_entity and entity.lower() in substring_entity.lower():
unmatched_entities_to_remove.append(entity)
matched_entities_to_remove = list(dict.fromkeys(matched_entities_to_remove))
unmatched_entities_to_remove = list(dict.fromkeys(unmatched_entities_to_remove))
for entity in matched_entities_to_remove:
matched_entities.remove(entity)
for entity in unmatched_entities_to_remove:
unmatched_entities.remove(entity)
return matched_entities, unmatched_entities
@st.experimental_memo(suppress_st_warning=True)
def highlight_entities(article_content,summary_output):
markdown_start_red = ""
markdown_start_green = ""
markdown_end = ""
matched_entities, unmatched_entities = get_and_compare_entities(article_content,summary_output)
print(summary_output)
for entity in matched_entities:
summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_green + entity + markdown_end,summary_output)
for entity in unmatched_entities:
summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_red + entity + markdown_end,summary_output)
print("")
print(summary_output)
print("")
print(summary_output)
soup = BeautifulSoup(summary_output, features="html.parser")
return HTML_WRAPPER.format(soup)
def display_df_as_table(model,top_k,score='score'):
'''Display the df with text and scores as a table'''
df = pd.DataFrame([(hit[score],passages[hit['corpus_id']]) for hit in model[0:top_k]],columns=['Score','Text'])
df['Score'] = round(df['Score'],2)
return df
def make_spans(text,results):
results_list = []
for i in range(len(results)):
results_list.append(results[i]['label'])
facts_spans = []
facts_spans = list(zip(sent_tokenizer(text),results_list))
return facts_spans
##Fiscal Sentiment by Sentence
def fin_ext(text):
results = remote_clx(sent_tokenizer(text))
return make_spans(text,results)
nlp = get_spacy()
asr_model, sent_pipe, sum_pipe, ner_pipe, sbert, cross_encoder = load_models()