File size: 6,366 Bytes
c0f4283 e29f82e 0fb5970 c0f4283 0fb5970 c0f4283 e29f82e c0f4283 0fb5970 fed0e7f 0fb5970 2183c09 c0f4283 d545750 c0f4283 527c569 c0f4283 c1c6bfc d3aafaf c0f4283 c1c6bfc 0fb5970 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
import asyncpraw
import asyncio
import re
import csv
import matplotlib.pyplot as plt
import requests
import asyncprawcore
import os
import ast
from typing import List, Dict, Any
from collections import defaultdict
from asyncpraw.models import MoreComments, Submission
from tqdm import tqdm
from huggingface_hub import InferenceClient, login
def get_access_to_reddit(user_agent="financial sentiment analysis project (research phase) (by u/ditalinianalysis)"):
reddit = asyncpraw.Reddit(
client_id=os.getenv("REDDIT_CLIENT_ID"),
client_secret=os.getenv("REDDIT_SECRET"),
user_agent=user_agent
)
return reddit
def get_write_access_to_hf():
login(token=os.getenv("REDDIT_WRITE"), write_permission=True)
async def search_subreddits_by_keyword_in_name_or_description(reddit, search_string: str) -> List[asyncpraw.models.Subreddit]:
"""
search(query: str, **generator_kwargs: str | int | Dict[str, str])β AsyncIterator[asyncpraw.models.Subreddit]
Return a ListingGenerator of subreddits matching query. Additional keyword arguments are passed in the initialization of ListingGenerator.
Subreddits are searched by both their title and description.
Parameters:
query β The query string to filter subreddits by.
"""
subs = []
async for subreddit in reddit.subreddits.search(search_string):
subs.append(subreddit)
return subs
async def filter_subreddits_by_keywords(subreddits: List[asyncpraw.models.Subreddit], keywords: List[str], min_keyword_count: int = 2) -> List[asyncpraw.models.Subreddit]:
filtered_subreddits = []
for subreddit in subreddits:
title = subreddit.title.lower()
description = subreddit.description.lower() if subreddit.description else ""
# Check if the subreddit contains a minimum number of keywords
keyword_count = sum(keyword.lower() in title or keyword.lower() in description for keyword in keywords)
if keyword_count >= min_keyword_count:
filtered_subreddits.append(subreddit)
return filtered_subreddits
def get_subreddits_name_title_description(subreddits: List[asyncpraw.models.Subreddit]) -> Dict[asyncpraw.models.Subreddit,str]:
subreddit_name_title_descriptions = {}
for subreddit in subreddits:
name = subreddit.display_name
title = subreddit.title
description = subreddit.description if subreddit.description else ""
text = "Name:" + name + "\nTitle: " + title + "\nDescription: " + description
subreddit_name_title_descriptions[subreddit] = text[:512]
return subreddit_name_title_descriptions
def process_output(output):
"""Process output from subreddit topic classifier."""
result_dict = {'TECHNOLOGY RELATED': 0.0, 'NOT TECHNOLOGY RELATED': 0.0}
for prediction in output:
label = prediction['label']
score = prediction['score']
if label == 'TECHNOLOGY RELATED':
result_dict['TECHNOLOGY RELATED'] = score
elif label == 'NOT TECHNOLOGY RELATED':
result_dict['NOT TECHNOLOGY RELATED'] = score
return result_dict
async def probe_subs_for_posts(subs: List[str],
num_posts: int,
time_filter: str = "day"):
"""
Iterate through selected subreddits, retrieve a specified number of top posts from each subreddit,
sort the comments for each post and pick the top few comments along with some of its replies,
and store the posts.
Args:
subs (List[str]): A list of subreddit names to probe for posts.
num_posts (int): The number of top posts to retrieve from each subreddit.
time_filter (str, optional): The time period to filter posts by. Default is "day".
Possible values: "all", "day", "hour", "month", "week", "year".
Returns:
defaultdict: A defaultdict where keys are subreddit names and values are lists of
top posts retrieved from each subreddit.
"""
# key -> subreddit, value -> list of posts
posts = defaultdict(list)
failed_subreddits = []
# for each subreddit
for sub in subs:
try:
async for submission in sub.top(limit=num_posts, time_filter=time_filter):
posts[sub].append(submission)
except Exception as e:
print(f"Error processing posts from subreddit {sub.display_name}")
failed_subreddits.append(sub.display_name)
return posts, failed_subreddits
def default_dict_list():
return defaultdict(list)
def default_dict_dict_list():
return defaultdict(default_dict_list)
def default_dict_dict_dict_list():
return defaultdict(default_dict_dict_list)
async def probe_submissions_for_comments(submission: asyncpraw.models.Submission,
num_comments: int,
sort_type: str) -> List[asyncpraw.models.Comment]:
"""
Retrieve comments from a Reddit submission and return a list of comments.
Args:
submission (asyncpraw.models.Submission): The Reddit submission object.
num_comments (int): The number of comments to retrieve.
sort_type (str): The sorting type for comments.
Possible values: 'confidence', 'top', 'new', 'controversial', 'old', 'random', 'qa'.
Returns:
List[asyncpraw.models.Comment]: A list of comment objects retrieved from the submission.
Note:
- This function sorts the comments based on the specified sort_type.
- If there are 'MoreComments' objects encountered, they are skipped.
"""
comments_list = []
submission.comment_sort = sort_type
submission.comment_limit = num_comments
await submission.load()
comments = submission.comments.replace_more(limit=None)
# all_comments = comments.list()
for comment in submission.comments.list():
if isinstance(comment, MoreComments):
continue
comments_list.append(comment)
return comments_list
def results_str_to_dict(input_results):
for key, value in input_results.items():
if type(value) == int: continue
elif not value.isdigit() and "{" in value:
start_index = value.find("{")
end_index = value.find("}")
substring = value[start_index:end_index+1]
converted_value = ast.literal_eval(substring)
# print("substring:", substring)
# print("type(converted_value):", type(converted_value))
input_results[key] = converted_value
elif value.isdigit():
input_results[key] = int(value)
return input_results |