Spaces:
Runtime error
Runtime error
import streamlit as st | |
import pandas as pd | |
import twint | |
import nest_asyncio | |
import multiprocessing.pool | |
import functools | |
from transformers import AutoModelForSequenceClassification | |
from transformers import TFAutoModelForSequenceClassification | |
from transformers import AutoTokenizer | |
import numpy as np | |
from scipy.special import softmax | |
import csv | |
import urllib.request | |
import IPython.display as ipd | |
# Preprocess text (username and link placeholders) | |
def preprocess(text): | |
new_text = [] | |
for t in text.split(" "): | |
t = '@user' if t.startswith('@') and len(t) > 1 else t | |
t = 'http' if t.startswith('http') else t | |
new_text.append(t) | |
return " ".join(new_text) | |
# Loading pretrained model | |
MODEL = 'cardiffnlp/twitter-roberta-base-sentiment' | |
tokenizer = AutoTokenizer.from_pretrained(MODEL) | |
model = AutoModelForSequenceClassification.from_pretrained(MODEL) | |
model.save_pretrained(MODEL) | |
tokenizer.save_pretrained(MODEL) | |
# Func to get a score using the above model | |
def combined_score(text): | |
text = preprocess(text) | |
encoded_input = tokenizer(text, return_tensors='pt') | |
output = model(**encoded_input) | |
scores = output[0][0].detach().numpy() | |
scores = softmax(scores) | |
return -scores[0] + scores[2] # scores = [negative, neutral, positive] | |
# https://stackoverflow.com/questions/492519/timeout-on-a-function-call | |
def timeout(max_timeout): | |
"""Timeout decorator, parameter in seconds.""" | |
def timeout_decorator(item): | |
"""Wrap the original function.""" | |
def func_wrapper(*args, **kwargs): | |
"""Closure for function.""" | |
pool = multiprocessing.pool.ThreadPool(processes=1) | |
async_result = pool.apply_async(item, args, kwargs) | |
# raises a TimeoutError if execution exceeds max_timeout | |
return async_result.get(max_timeout) | |
return func_wrapper | |
return timeout_decorator | |
# Getting tweets from a user | |
def get_tweets(username, limit=500, save_name=None): | |
#nest_asyncio.apply() # Helps avoid RuntimeError: This event loop is already running | |
# Setup config | |
c = twint.Config() # Create a config object to store our settings | |
c.Limit = limit # Max number of tweets to fetch (increments of 20) | |
c.Username = username # User of interest | |
c.Pandas = True # Store tweets in a dataframe | |
c.Hide_output = True # Avoid printing out tweets | |
# Run the seearch | |
twint.run.Search(c) | |
# Get the results and optionally save to a file as well | |
df = twint.storage.panda.Tweets_df | |
if save_name != None: | |
df.to_csv(save_name) | |
return df | |
st.title('Test') | |
with st.form("my_form"): | |
st.write("Inside the form") | |
user = st.text_input("Twitter Username") | |
n_tweets = st.slider('How Many Tweets', 20, 2000, 20) | |
# Every form must have a submit button. | |
submitted = st.form_submit_button("Submit") | |
if submitted: | |
st.write("Fetching user", user, "n_tweets", n_tweets) | |
tweets = get_tweets(user, limit=n_tweets) | |
st.write("Resulting dataframe shape:", tweets.shape) | |
st.write("Calculating sentiments...") | |
tweets['sentiment'] = tweets['tweet'].map(lambda s: combined_score(s)) | |
st.write("Average sentiment:", tweets.sentiment.mean()) | |
st.pyplot.hexbin(tweets['tweet_length'], tweets['sentiment']*1, | |
gridsize=20, bins=12, cmap='inferno') |