Spaces:
Sleeping
Sleeping
import openai | |
import pandas as pd | |
import streamlit as st | |
import matplotlib.pyplot as plt | |
from wordcloud import WordCloud | |
from dotenv import load_dotenv | |
import os | |
import time | |
import glob | |
from audio_predictions import AudioTranslation | |
# Load environment variables from .env file | |
# load_dotenv(dotenv_path='file.env') | |
# Retrieve API key from .env file | |
# API_KEY = os.getenv('OPENAI_API_KEY') | |
API_KEY = os.getenv('OPENAI_API_KEY') | |
openai.api_key = API_KEY | |
# Set the API key for OpenAI | |
# openai.api_key = API_KEY | |
dataset_path = 'updated_company_tweets.csv' | |
def load_data(): | |
main_sentiment_df = pd.read_csv('main_sentiment_df.csv') | |
kinya_df = pd.read_csv('kinya.csv') | |
return pd.merge(main_sentiment_df, kinya_df, on='tweet_id', how='left') | |
def list_audio_files(directory): | |
return glob.glob(os.path.join(directory, '*.mp3')) | |
# Function to display audio player widgets | |
def display_audio_players(audio_files, column): | |
for file in audio_files: | |
with column: | |
st.audio(file) | |
st.text(os.path.basename(file)) | |
def process_audio_files(directories): | |
audio_translator = AudioTranslation() | |
results = [] | |
for directory in directories: | |
audio_files = list_audio_files(f"{directory}/") | |
for file_path in audio_files: | |
transc = audio_translator.transcribe_audio(file_path) | |
print(file_path) | |
print('transcription') | |
print(transc) | |
translation_result = audio_translator.translate_sentence("rw", "en", "MULTI-rw-en", "", transc) | |
translation_text = translation_result #.get('translatedText') if translation_result else "Translation Failed" | |
results.append({ | |
"filename": os.path.basename(file_path), | |
"company": directory, | |
"transcription": transc, | |
"translation": translation_text["translation"] | |
}) | |
results_df = pd.DataFrame(results) | |
return results_df | |
def audio_analysis_page(): | |
st.header("Audio Analysis") | |
# Display audio files in columns | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
st.subheader("MTN") | |
mtn_files = list_audio_files("mtn/") | |
display_audio_players(mtn_files, col1) | |
with col2: | |
st.subheader("Liquid") | |
liquid_files = list_audio_files("liquid/") | |
display_audio_players(liquid_files, col2) | |
with col3: | |
st.subheader("Irembo") | |
irembo_files = list_audio_files("irembo/") | |
display_audio_players(irembo_files, col3) | |
# Process button (functionality to be defined) | |
if st.button("Process"): | |
#st.write("Process function not yet implemented") | |
results_df = process_audio_files(["mtn", "liquid", "irembo"]) | |
st.dataframe(results_df) | |
# Process dataset for each company and display visualizations | |
for company in ["mtn", "liquid", "irembo"]: | |
st.write(f"Company: {company.upper()}") | |
company_data = process_dataset_for_audio(results_df, company) | |
display_audio_visualizations(company_data) | |
def display_audio_visualizations(company_data): | |
col1, col2 = st.columns(2) | |
with col1: | |
st.write("Sentiment Distribution") | |
pie_chart = generate_audiopie(company_data) | |
st.pyplot(pie_chart) | |
with col2: | |
st.write("Word Cloud for Translations") | |
word_cloud = generate_audioword_cloud(company_data) | |
st.pyplot(word_cloud) | |
def generate_audiopie(data): | |
start_time = time.time() | |
# Filter data for the selected company | |
company_data = data#[data['company_id'] == selected_company] | |
sentiment_counts = company_data['sentiment_score'].value_counts() | |
# Define colors for different sentiments | |
colors = {'Positive sentiment': 'green', 'Negative sentiment': 'red', 'Neutral': 'blue'} | |
pie_colors = [colors.get(sentiment) for sentiment in sentiment_counts.index] | |
fig, ax = plt.subplots(figsize=(10, 6)) | |
ax.pie(sentiment_counts, labels=sentiment_counts.index, autopct='%1.1f%%', startangle=90, colors=pie_colors) | |
ax.axis('equal') # Keeps the pie chart circular | |
end_time = time.time() | |
print(f'Pie chart execution time: {end_time - start_time} seconds') | |
return fig | |
def generate_audioword_cloud(data): | |
start_time = time.time() | |
# Filter data for the selected company | |
company_data = data#[data['company_id'] == selected_company] | |
# Choose the appropriate text column based on the selected company's data | |
text_column = 'transcription' | |
text_data = ' '.join(company_data[text_column].dropna()) | |
wordcloud = WordCloud(width=1000, height=600, background_color='white').generate(text_data) | |
fig, ax = plt.subplots() | |
ax.imshow(wordcloud, interpolation='bilinear') | |
ax.axis('off') | |
end_time = time.time() | |
print(f'Word cloud execution time: {end_time - start_time} seconds') | |
return fig | |
def display_company_visualizations(company_data): | |
col1, col2 = st.columns(2) | |
with col1: | |
st.write("Sentiment Distribution") | |
pie_chart = generate_pie_chart(company_data, company_data['company'].iloc[0]) | |
st.pyplot(pie_chart) | |
with col2: | |
st.write("Word Cloud for Translations") | |
word_cloud = generate_word_cloud(company_data, company_data['translation'].iloc[0]) | |
st.pyplot(word_cloud) | |
def analyze_sentiment(texts): | |
"""Analyze the sentiment of a batch of texts using the OpenAI API.""" | |
try: | |
responses = [] | |
for text in texts: | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You are a sentiment analysis model."}, | |
{"role": "user", "content": text} | |
] | |
) | |
sentiment_response = response.choices[0].message['content'] | |
if "positive" in sentiment_response.lower(): | |
responses.append("Positive sentiment") | |
elif "negative" in sentiment_response.lower(): | |
responses.append("Negative sentiment") | |
else: | |
responses.append('Neutral') | |
return responses | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return ["Error"] * len(texts) | |
def process_dataset(data): | |
start_time = time.time() | |
text_column = 'text' #if 'translated_kinyarwanda_manual' in data.columns and \ | |
# data['translated_kinyarwanda_manual'].notna().any() else 'text' | |
texts = data[text_column].tolist() | |
data['sentiment_score'] = analyze_sentiment(texts) | |
end_time = time.time() | |
print(f'process dataset execution time : {end_time - start_time} seconds') | |
data.to_csv('predictions.csv') | |
return data | |
def generate_pie_chart(data, selected_company): | |
start_time = time.time() | |
# Filter data for the selected company | |
company_data = data[data['company_id'] == selected_company] | |
sentiment_counts = company_data['sentiment_score'].value_counts() | |
colors = {'Positive sentiment': 'green', 'Negative sentiment': 'red', 'Neutral': 'blue'} | |
pie_colors = [colors.get(sentiment) for sentiment in sentiment_counts.index] | |
fig, ax = plt.subplots(figsize=(10, 6)) | |
ax.pie(sentiment_counts, labels=sentiment_counts.index, autopct='%1.1f%%', startangle=90, colors=pie_colors) | |
ax.axis('equal') # Keeps the pie chart circular | |
end_time = time.time() | |
print(f'Pie chart execution time: {end_time - start_time} seconds') | |
return fig | |
def generate_word_cloud(data, selected_company): | |
start_time = time.time() | |
# Filter data for the selected company | |
company_data = data[data['company_id'] == selected_company] | |
# Choose the appropriate text column based on the selected company's data | |
if 'translated_kinyarwanda_manual' in company_data.columns and company_data['translated_kinyarwanda_manual'].notna().any(): | |
text_column = 'translated_kinyarwanda_manual' | |
else: | |
text_column = 'text' | |
text_data = ' '.join(company_data[text_column].dropna()) | |
wordcloud = WordCloud(width=1000, height=600, background_color='white').generate(text_data) | |
fig, ax = plt.subplots() | |
ax.imshow(wordcloud, interpolation='bilinear') | |
ax.axis('off') | |
end_time = time.time() | |
print(f'Word cloud execution time: {end_time - start_time} seconds') | |
return fig | |
def generate_time_series_chart(data, selected_company): | |
start_time = time.time() | |
# Filter data for the selected company | |
company_data = data[data['company_id'] == selected_company] | |
company_data['date'] = pd.to_datetime(company_data['date']) | |
company_data.sort_values('date', inplace=True) | |
grouped = company_data.groupby([company_data['date'].dt.date, 'sentiment_score']).size().unstack().fillna(0) | |
# Define colors for different sentiments | |
colors = {'Positive sentiment': 'green', 'Negative sentiment': 'red', 'Neutral': 'blue'} | |
fig, ax = plt.subplots(figsize=(10, 6)) | |
# Plot each sentiment score with its corresponding color | |
for sentiment in grouped.columns: | |
ax.plot(grouped.index, grouped[sentiment], label=sentiment, color=colors.get(sentiment, 'black')) | |
# grouped.plot(kind='line', ax=ax) | |
ax.set_title('Sentiment Over Time') | |
ax.set_xlabel('Date') | |
ax.set_ylabel('Count') | |
ax.legend() | |
end_time = time.time() | |
print(f'Time series chart execution time: {end_time - start_time} seconds') | |
return fig | |
def process_dataset_for_company(company_data): | |
start_time = time.time() | |
# Determine the column to analyze based on 'translated_kinyarwanda_manual' availability | |
analyze_column = 'english' if 'translated_kinyarwanda_manual' in company_data.columns and \ | |
company_data['translated_kinyarwanda_manual'].notna().any() else 'text' | |
texts = company_data[analyze_column].tolist() | |
company_data['sentiment_score'] = analyze_sentiment(texts) | |
end_time = time.time() | |
print(f'process_dataset_for_company execution time: {end_time - start_time} seconds') | |
return company_data | |
def process_dataset_for_audio(company_data, company): | |
result = company_data[company_data.company==company] | |
start_time = time.time() | |
# Determine the column to analyze based on 'translated_kinyarwanda_manual' availability | |
analyze_column = 'translation' | |
texts = result[analyze_column].tolist() | |
result['sentiment_score'] = analyze_sentiment(texts) | |
end_time = time.time() | |
print(f'process_dataset_for_company execution time: {end_time - start_time} seconds') | |
return result | |
def display_charts(data, selected_company): | |
col1, col2 = st.columns(2) | |
with col1: | |
st.write("Sentiment Distribution") | |
pie_chart = generate_pie_chart(data, selected_company) | |
st.pyplot(pie_chart) | |
with col2: | |
st.write("Word Cloud for Text") | |
word_cloud = generate_word_cloud(data, selected_company) | |
st.pyplot(word_cloud) | |
st.write('Sentiment Trend Over Time') | |
time_series_chart = generate_time_series_chart(data, selected_company) | |
st.pyplot(time_series_chart) | |
def display_sampled_data(data): | |
sampled_data = pd.DataFrame() | |
for company in data['company_id'].unique(): | |
company_data = data[data['company_id'] == company] | |
unique_profiles = company_data.drop_duplicates(subset='profile_name') | |
sampled_company_data = unique_profiles.sample(n=min(5, len(unique_profiles)), replace=False) | |
if 'translated_kinyarwanda_manual' in company_data.columns and company_data['translated_kinyarwanda_manual'].notna().any(): | |
sampled_company_data['text'] = sampled_company_data['translated_kinyarwanda_manual'] | |
sampled_data = pd.concat([sampled_data, sampled_company_data], ignore_index=True) | |
columns_to_display = ['tweet_id', 'company_id', 'user_id', 'profile_name', 'text', 'date'] | |
st.dataframe(sampled_data[columns_to_display]) | |
def run_online_mode(data): | |
company_list = data['company_id'].unique() | |
selected_company = st.selectbox('Select a Company', company_list) | |
if selected_company: | |
company_data = data[data['company_id'] == selected_company] | |
st.write(f'Sample of the collected data for {selected_company}') | |
st.dataframe(company_data.head(10)) | |
processed_data = process_dataset_for_company(company_data) | |
display_charts(processed_data, selected_company) | |
def run_batch_processing_mode(): | |
if os.path.exists('predictions.csv'): | |
processed_data = pd.read_csv('predictions.csv') | |
else: | |
data = load_data() | |
processed_data = process_dataset(data) | |
processed_data.to_csv('predictions.csv', index=False) | |
company_list = processed_data['company_id'].unique() | |
selected_company = st.selectbox('Select a Company', company_list) | |
if selected_company: | |
company_data = processed_data[processed_data['company_id'] == selected_company] | |
st.write(f'Sample of the collected data for {selected_company}') | |
st.dataframe(company_data.head(10)) | |
display_charts(company_data, selected_company) | |
def sentiment_analysis_page(): | |
st.title('Company Sentiment Analysis') | |
processing_mode = st.selectbox("Choose Processing Mode", ["Batch Processing", "Online"]) | |
data = load_data() | |
display_sampled_data(data) | |
if processing_mode == "Online": | |
run_online_mode(data) | |
else: | |
run_batch_processing_mode() | |
def main(): | |
st.sidebar.title('Navigation') | |
page = st.sidebar.radio("Select a Page", ["Sentiment Analysis", "Audio Analysis"]) | |
if page == "Sentiment Analysis": | |
sentiment_analysis_page() | |
elif page =="Audio Analysis": | |
audio_analysis_page() | |
if __name__ == "__main__": | |
main() | |