import os from PyPDF2 import PdfReader import pandas as pd from dotenv import load_dotenv import groq import json from datetime import datetime from sklearn.decomposition import NMF from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from sklearn.cluster import KMeans import random from joblib import Parallel, delayed class TweetDatasetProcessor: def __init__(self): load_dotenv() self.groq_client = groq.Groq(api_key=os.getenv('Groq_api')) self.tweets = [] self.personality_profile = {} self.vectorizer = TfidfVectorizer(stop_words='english') self.used_tweets = set() # Track used tweets to avoid repetition @staticmethod def _process_line(line): """Process a single line.""" line = line.strip() if not line or line.startswith('http'): # Skip empty lines and URLs return None return { 'content': line, 'timestamp': datetime.now(), 'mentions': [word for word in line.split() if word.startswith('@')], 'hashtags': [word for word in line.split() if word.startswith('#')] } def extract_text_from_pdf(self, pdf_path): """Extract text content from PDF file.""" reader = PdfReader(pdf_path) text = "" for page in reader.pages: text += page.extract_text() return text def process_pdf_content(self, text): """Process PDF content and clean extracted tweets.""" if not text.strip(): raise ValueError("The uploaded PDF appears to be empty.") lines = text.split('\n') # Pass the static method explicitly clean_tweets = Parallel(n_jobs=-1)(delayed(TweetDatasetProcessor._process_line)(line) for line in lines) self.tweets = [tweet for tweet in clean_tweets if tweet] if not self.tweets: raise ValueError("No tweets were extracted from the PDF. Ensure the content is properly formatted.") # Save the processed tweets to a CSV df = pd.DataFrame(self.tweets) df.to_csv('processed_tweets.csv', index=False) return df def _extract_mentions(self, text): """Extract mentioned users from tweet.""" return [word for word in text.split() if word.startswith('@')] def _extract_hashtags(self, text): """Extract hashtags from tweet.""" return [word for word in text.split() if word.startswith('#')] def categorize_tweets(self): """Cluster tweets into categories using KMeans.""" all_tweets = [tweet['content'] for tweet in self.tweets] if not all_tweets: raise ValueError("No tweets available for clustering.") tfidf_matrix = self.vectorizer.fit_transform(all_tweets) kmeans = KMeans(n_clusters=5, random_state=1) kmeans.fit(tfidf_matrix) for i, tweet in enumerate(self.tweets): tweet['category'] = f"Category {kmeans.labels_[i]}" return pd.DataFrame(self.tweets) def analyze_personality(self, max_tweets=50): """Comprehensive personality analysis using a limited subset of tweets.""" if not self.tweets: raise ValueError("No tweets available for personality analysis.") all_tweets = [tweet['content'] for tweet in self.tweets][:max_tweets] analysis_prompt = f"""Perform a deep psychological analysis of the author based on these tweets: Core beliefs, emotional tendencies, cognitive patterns, etc. Tweets for analysis: {json.dumps(all_tweets, indent=2)} """ try: response = self.groq_client.chat.completions.create( messages=[ {"role": "system", "content": "You are an expert psychologist."}, {"role": "user", "content": analysis_prompt}, ], model="llama-3.1-70b-versatile", temperature=0.1, ) self.personality_profile = response.choices[0].message.content return self.personality_profile except Exception as e: return f"Error during personality analysis: {str(e)}" def analyze_topics(self, n_topics=None): """Extract and identify different topics the author has tweeted about.""" all_tweets = [tweet['content'] for tweet in self.tweets] if not all_tweets: return [] n_topics = n_topics or min(5, len(all_tweets) // 10) tfidf_matrix = self.vectorizer.fit_transform(all_tweets) nmf_model = NMF(n_components=n_topics, random_state=1) nmf_model.fit(tfidf_matrix) topics = [] for topic_idx, topic in enumerate(nmf_model.components_): topic_words = [self.vectorizer.get_feature_names_out()[i] for i in topic.argsort()[:-n_topics - 1:-1]] topics.append(" ".join(topic_words)) return list(set(topics)) # Remove duplicates def count_tokens(self, text): """Estimate the number of tokens in the given text.""" return len(text.split()) def generate_tweet(self, context="", sample_size=3): """Generate a new tweet by sampling random tweets and avoiding repetition.""" if not self.tweets: return "Error: No tweets available for generation." # Randomly sample unique tweets available_tweets = [tweet for tweet in self.tweets if tweet['content'] not in self.used_tweets] if len(available_tweets) < sample_size: self.used_tweets.clear() # Reset used tweets if all have been used available_tweets = self.tweets sampled_tweets = random.sample(available_tweets, sample_size) sampled_contents = [tweet['content'] for tweet in sampled_tweets] # Update the used tweets tracker self.used_tweets.update(sampled_contents) # Truncate personality profile to avoid token overflow personality_profile_excerpt = self.personality_profile[:400] if len(self.personality_profile) > 400 else self.personality_profile # Construct the prompt prompt = f"""Based on this personality profile: {personality_profile_excerpt} Current context or topic (if any): {context} Tweets for context: {', '.join(sampled_contents)} **Only generate the tweet. Do not include analysis, explanation, or any other content.** """ try: response = self.groq_client.chat.completions.create( messages=[ {"role": "system", "content": "You are an expert in replicating writing and thinking patterns."}, {"role": "user", "content": prompt}, ], model="llama-3.1-70b-versatile", temperature=1.0, max_tokens=150, ) tweet = response.choices[0].message.content.strip() return tweet except Exception as e: return f"Error generating tweet: {str(e)}"