import whisper import os from pytube import YouTube import pandas as pd import plotly_express as px import nltk import plotly.graph_objects as go from optimum.onnxruntime import ORTModelForSequenceClassification from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForTokenClassification,WhisperProcessor, WhisperForConditionalGeneration from sentence_transformers import SentenceTransformer, CrossEncoder, util import streamlit as st import en_core_web_lg import validators import re import itertools import numpy as np from bs4 import BeautifulSoup import base64, time from annotated_text import annotated_text nltk.download('punkt') from nltk import sent_tokenize time_str = time.strftime("%d%m%Y-%H%M%S") HTML_WRAPPER = """
{}
""" @st.experimental_singleton(suppress_st_warning=True) def load_models(): asr_model = whisper.load_model("small") asr_pipe = pipeline("automatice-speech-recognition",model = "openai/whisper-large") q_model = ORTModelForSequenceClassification.from_pretrained("nickmuchi/quantized-optimum-finbert-tone") ner_model = AutoModelForTokenClassification.from_pretrained("xlm-roberta-large-finetuned-conll03-english") q_tokenizer = AutoTokenizer.from_pretrained("nickmuchi/quantized-optimum-finbert-tone") ner_tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-large-finetuned-conll03-english") sent_pipe = pipeline("text-classification",model=q_model, tokenizer=q_tokenizer) sum_pipe = pipeline("summarization",model="facebook/bart-large-cnn", tokenizer="facebook/bart-large-cnn",clean_up_tokenization_spaces=True) ner_pipe = pipeline("ner", model=ner_model, tokenizer=ner_tokenizer, grouped_entities=True) sbert = SentenceTransformer("all-mpnet-base-v2") cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2') return asr_pipe, sent_pipe, sum_pipe, ner_pipe, sbert, cross_encoder @st.experimental_singleton(suppress_st_warning=True) def get_spacy(): nlp = en_core_web_lg.load() return nlp @st.experimental_memo(suppress_st_warning=True) def inference(link, upload): '''Convert Youtube video or Audio upload to text''' if validators.url(link): yt = YouTube(link) title = yt.title path = yt.streams.filter(only_audio=True)[0].download(filename="audio.mp4") results = asr_pipe(path) return results['text'], yt.title elif upload: results = asr_pipe(upload) return results['text'], "Transcribed Earnings Audio" @st.experimental_memo(suppress_st_warning=True) def chunk_long_text(text,threshold): '''Chunk long text''' sentences = sent_tokenize(text) out = [] for chunk in sentences: if len(chunk.split()) < threshold: out.append(chunk) else: words = chunk.split() num = int(len(words)/threshold) for i in range(0,num*threshold+1,threshold): out.append(' '.join(words[i:threshold+i])) return out @st.experimental_memo(suppress_st_warning=True) def sentiment_pipe(earnings_text): '''Determine the sentiment of the text''' earnings_sentences = chunk_long_text(earnings_text,400) earnings_sentiment = sent_pipe(earnings_sentences) return earnings_sentiment, earnings_sentences @st.experimental_memo(suppress_st_warning=True) def preprocess_plain_text(text,window_size=3): '''Preprocess text for semantic search''' text = text.encode("ascii", "ignore").decode() # unicode text = re.sub(r"https*\S+", " ", text) # url text = re.sub(r"@\S+", " ", text) # mentions text = re.sub(r"#\S+", " ", text) # hastags text = re.sub(r"\s{2,}", " ", text) # over spaces #text = re.sub("[^.,!?%$A-Za-z0-9]+", " ", text) # special characters except .,!? #break into lines and remove leading and trailing space on each lines = [line.strip() for line in text.splitlines()] # #break multi-headlines into a line each chunks = [phrase.strip() for line in lines for phrase in line.split(" ")] # # drop blank lines text = '\n'.join(chunk for chunk in chunks if chunk) ## We split this article into paragraphs and then every paragraph into sentences paragraphs = [] for paragraph in text.replace('\n',' ').split("\n\n"): if len(paragraph.strip()) > 0: paragraphs.append(sent_tokenize(paragraph.strip())) #We combine up to 3 sentences into a passage. You can choose smaller or larger values for window_size #Smaller value: Context from other sentences might get lost #Lager values: More context from the paragraph remains, but results are longer window_size = window_size passages = [] for paragraph in paragraphs: for start_idx in range(0, len(paragraph), window_size): end_idx = min(start_idx+window_size, len(paragraph)) passages.append(" ".join(paragraph[start_idx:end_idx])) print(f"Sentences: {sum([len(p) for p in paragraphs])}") print(f"Passages: {len(passages)}") return passages @st.experimental_memo(suppress_st_warning=True) def chunk_and_preprocess_text(text): """Chunk text longer than 500 tokens""" text = text.encode("ascii", "ignore").decode() # unicode text = re.sub(r"https*\S+", " ", text) # url text = re.sub(r"@\S+", " ", text) # mentions text = re.sub(r"#\S+", " ", text) # hastags text = re.sub(r"\s{2,}", " ", text) # over spaces article = nlp(text) sentences = [i.text for i in list(article.sents)] current_chunk = 0 chunks = [] for sentence in sentences: if len(chunks) == current_chunk + 1: if len(chunks[current_chunk]) + len(sentence.split(" ")) <= 500: chunks[current_chunk].extend(sentence.split(" ")) else: current_chunk += 1 chunks.append(sentence.split(" ")) else: chunks.append(sentence.split(" ")) for chunk_id in range(len(chunks)): chunks[chunk_id] = " ".join(chunks[chunk_id]) return chunks def summary_downloader(raw_text): b64 = base64.b64encode(raw_text.encode()).decode() new_filename = "new_text_file_{}_.txt".format(time_str) st.markdown("#### Download Summary as a File ###") href = f'Click to Download!!' st.markdown(href,unsafe_allow_html=True) @st.experimental_memo(suppress_st_warning=True) def get_all_entities_per_sentence(text): doc = nlp(''.join(text)) sentences = list(doc.sents) entities_all_sentences = [] for sentence in sentences: entities_this_sentence = [] # SPACY ENTITIES for entity in sentence.ents: entities_this_sentence.append(str(entity)) # FLAIR ENTITIES (CURRENTLY NOT USED) # sentence_entities = Sentence(str(sentence)) # tagger.predict(sentence_entities) # for entity in sentence_entities.get_spans('ner'): # entities_this_sentence.append(entity.text) # XLM ENTITIES entities_xlm = [entity["word"] for entity in ner_pipe(str(sentence))] for entity in entities_xlm: entities_this_sentence.append(str(entity)) entities_all_sentences.append(entities_this_sentence) return entities_all_sentences @st.experimental_memo(suppress_st_warning=True) def get_all_entities(text): all_entities_per_sentence = get_all_entities_per_sentence(text) return list(itertools.chain.from_iterable(all_entities_per_sentence)) @st.experimental_memo(suppress_st_warning=True) def get_and_compare_entities(article_content,summary_output): all_entities_per_sentence = get_all_entities_per_sentence(article_content) entities_article = list(itertools.chain.from_iterable(all_entities_per_sentence)) all_entities_per_sentence = get_all_entities_per_sentence(summary_output) entities_summary = list(itertools.chain.from_iterable(all_entities_per_sentence)) matched_entities = [] unmatched_entities = [] for entity in entities_summary: if any(entity.lower() in substring_entity.lower() for substring_entity in entities_article): matched_entities.append(entity) elif any( np.inner(sbert.encode(entity, show_progress_bar=False), sbert.encode(art_entity, show_progress_bar=False)) > 0.9 for art_entity in entities_article): matched_entities.append(entity) else: unmatched_entities.append(entity) matched_entities = list(dict.fromkeys(matched_entities)) unmatched_entities = list(dict.fromkeys(unmatched_entities)) matched_entities_to_remove = [] unmatched_entities_to_remove = [] for entity in matched_entities: for substring_entity in matched_entities: if entity != substring_entity and entity.lower() in substring_entity.lower(): matched_entities_to_remove.append(entity) for entity in unmatched_entities: for substring_entity in unmatched_entities: if entity != substring_entity and entity.lower() in substring_entity.lower(): unmatched_entities_to_remove.append(entity) matched_entities_to_remove = list(dict.fromkeys(matched_entities_to_remove)) unmatched_entities_to_remove = list(dict.fromkeys(unmatched_entities_to_remove)) for entity in matched_entities_to_remove: matched_entities.remove(entity) for entity in unmatched_entities_to_remove: unmatched_entities.remove(entity) return matched_entities, unmatched_entities @st.experimental_memo(suppress_st_warning=True) def highlight_entities(article_content,summary_output): markdown_start_red = "" markdown_start_green = "" markdown_end = "" matched_entities, unmatched_entities = get_and_compare_entities(article_content,summary_output) print(summary_output) for entity in matched_entities: summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_green + entity + markdown_end,summary_output) for entity in unmatched_entities: summary_output = re.sub(f'({entity})(?![^rgb\(]*\))',markdown_start_red + entity + markdown_end,summary_output) print("") print(summary_output) print("") print(summary_output) soup = BeautifulSoup(summary_output, features="html.parser") return HTML_WRAPPER.format(soup) def display_df_as_table(model,top_k,score='score'): '''Display the df with text and scores as a table''' df = pd.DataFrame([(hit[score],passages[hit['corpus_id']]) for hit in model[0:top_k]],columns=['Score','Text']) df['Score'] = round(df['Score'],2) return df def make_spans(text,results): results_list = [] for i in range(len(results)): results_list.append(results[i]['label']) facts_spans = [] facts_spans = list(zip(sent_tokenizer(text),results_list)) return facts_spans ##Fiscal Sentiment by Sentence def fin_ext(text): results = remote_clx(sent_tokenizer(text)) return make_spans(text,results) nlp = get_spacy() asr_model, sent_pipe, sum_pipe, ner_pipe, sbert, cross_encoder = load_models()