|
import requests |
|
from bs4 import BeautifulSoup |
|
import time |
|
import pandas as pd |
|
import re |
|
import datetime |
|
import os |
|
import random |
|
|
|
class ReedsyCrawler: |
|
|
|
def __init__(self): |
|
path = './' |
|
if not os.path.exists(path): |
|
os.mkdir(path) |
|
if not os.path.exists(path + '/stories'): |
|
os.mkdir(path + '/stories') |
|
self.folder = path |
|
self.prompt_df = self.get_prompts() |
|
|
|
def run(self): |
|
for i, r in self.prompt_df[self.prompt_df['done'] == 0].iterrows(): |
|
try: |
|
story_df, posted_date = self.get_stories(r) |
|
story_df.to_csv(os.path.join(self.folder, 'stories', '{}.csv'.format(r['prompt_id'])), index=None) |
|
self.prompt_df.loc[i, 'done'] = 1 |
|
self.prompt_df.loc[i, 'posted_date'] = posted_date |
|
self.prompt_df.to_csv(os.path.join(self.folder, 'prompt.csv'), index=None) |
|
except: |
|
print(r) |
|
|
|
def scrape_story(self, i, r): |
|
story_df, posted_date = self.get_stories(r) |
|
story_df.to_csv(os.path.join(self.folder, 'stories', '{}.csv'.format(r['prompt_id'])), index=None) |
|
self.prompt_df.loc[i, 'done'] = 1 |
|
self.prompt_df.loc[i, 'posted_date'] = posted_date |
|
self.prompt_df.to_csv(os.path.join(self.folder, 'prompt.csv'), index=None) |
|
|
|
def get_soup(self,url): |
|
attempt = 1 |
|
while True: |
|
response = requests.get( |
|
url, |
|
headers={ |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36' |
|
} |
|
) |
|
soup = BeautifulSoup(response.content) |
|
if response.status_code == 200: |
|
time.sleep(0.2) |
|
return soup |
|
elif attempt >=5: |
|
print('Attempt >= 5', url) |
|
else: |
|
attempt+=1 |
|
time.sleep(3) |
|
|
|
def get_prompts(self): |
|
|
|
|
|
if os.path.exists(os.path.join(self.folder, 'prompt.csv')): |
|
prompt_df = pd.read_csv(os.path.join(self.folder, 'prompt.csv')) |
|
else: |
|
prompt_df = pd.DataFrame(columns=['prompt_id', 'prompt', 'category', 'link', 'no_of_stories', 'done']) |
|
|
|
prompt_result_list = [] |
|
|
|
soup = self.get_soup('https://blog.reedsy.com/creative-writing-prompts/') |
|
prompt_last_page = int(soup.find_all('span', class_='page')[-1].find('a').text) |
|
prompt_list = soup.find_all('div', class_='prompt panel panel-thin panel-white-bordered space-bottom-xs-md') |
|
if len(prompt_df) > 0: |
|
prompt_count = int(prompt_df['prompt_id'].max()[8:]) + 1 |
|
else: |
|
prompt_count = 1 |
|
|
|
for i in range(1, prompt_last_page+1): |
|
soup = self.get_soup(f'https://blog.reedsy.com/creative-writing-prompts/page/{i}') |
|
time.sleep(1) |
|
prompt_list = soup.find_all('div', class_='prompt panel panel-thin panel-white-bordered space-bottom-xs-md') |
|
for p in prompt_list: |
|
if p.find('p').text.strip()[:4] == 'LIVE': |
|
continue |
|
category = p.find('p').text.split('–')[0].strip() |
|
try: |
|
count = int(p.find('p').text.split('–')[-1].replace(' stories', '').strip()) |
|
except: |
|
count = 0 |
|
prompt = p.find('a').text |
|
if len(prompt_df[prompt_df['prompt'] == prompt])==0: |
|
new_row = { |
|
'prompt_id': f"prompt_{prompt_count:04d}", |
|
'prompt': p.find('a').text, |
|
'category': category, |
|
'link': 'https://blog.reedsy.com' + p.find('a').get('href'), |
|
'no_of_stories': count, |
|
'done': 0 |
|
} |
|
prompt_result_list.append(new_row) |
|
prompt_count += 1 |
|
new_prompt_df = pd.DataFrame(prompt_result_list) |
|
prompt_df = pd.concat([prompt_df, new_prompt_df]) |
|
prompt_df.to_csv(os.path.join(self.folder, 'prompt.csv'), index=None) |
|
return prompt_df |
|
|
|
|
|
def get_stories(self, r): |
|
|
|
prompt_story_list = [] |
|
prompt_id = r['prompt_id'] |
|
prompt = r['prompt'] |
|
next_page = r['link'] |
|
while next_page: |
|
prompt_soup = self.get_soup(next_page) |
|
posted_date = prompt_soup.find('section', class_='row-thin').find('p').text.split('on')[-1].strip() |
|
|
|
try: |
|
next_page = 'https://blog.reedsy.com' + prompt_soup.find('div', id='submissions-load').find('a').get('href') |
|
except: |
|
next_page = None |
|
|
|
stories = prompt_soup.find_all('div', class_='submission') |
|
for s in stories: |
|
new_row = { |
|
'prompt_id': prompt_id, |
|
'prompt': prompt, |
|
'story_id': s.find('h3', class_='mimic-h4').find_all('a')[0].get('href').split('/')[-2], |
|
'story_title': s.find('h3', class_='mimic-h4').find_all('a')[0].text, |
|
'story_author': s.find('h3', class_='mimic-h4').find_all('a')[1].text, |
|
'story_url': 'https://blog.reedsy.com' + s.find('h3', class_='mimic-h4').find_all('a')[0].get('href'), |
|
'link': s.find('a').get('href'), |
|
'genre': r['category'], |
|
'is_sensitive': 0 |
|
} |
|
new_row = self.get_story_info(new_row) |
|
sensitive_div = s.find('div', class_='panel panel-thinner panel-white-bordered space-bottom-xs-md space-top-xs-md') |
|
if sensitive_div: |
|
if sensitive_div.text.strip() == 'This story contains sensitive content': |
|
new_row['is_sensitive'] = 1 |
|
prompt_story_list.append(new_row) |
|
prompt_story_df = pd.DataFrame(prompt_story_list) |
|
prompt_story_df.to_csv(os.path.join(self.folder, 'stories', '{}.csv'.format(r['prompt_id'])), index=None) |
|
return prompt_story_df, posted_date |
|
|
|
def get_story_info(self, r, comment=True): |
|
|
|
story_comment_list = [] |
|
story_id = r['story_id'] |
|
story_soup = self.get_soup(r['story_url']) |
|
print(r['story_url']) |
|
r['categories'] = str([a.text for a in story_soup.find('p', class_='small space-top-xs-md').find_all('a')]) |
|
r['likes'] = story_soup.find('p', class_='text-grey space-top-xs-md').find('button').text.strip() |
|
r['story_text'] = story_soup.find('article').text |
|
r['posted_date'] = story_soup.find('div', class_='grid gutter-xs-md no-response space-bottom-xs-md').find('div', class_='cell-shrink').text.strip() |
|
if comment: |
|
comment_soup = story_soup |
|
while comment_soup: |
|
story_comment_list += self.get_comment(comment_soup) |
|
next_page = comment_soup.find('a', attrs={'rel': 'next'}) |
|
if next_page: |
|
comment_soup = self.get_soup('https://blog.reedsy.com' + next_page.get('href')) |
|
print('https://blog.reedsy.com' + next_page.get('href')) |
|
else: |
|
comment_soup = None |
|
r['comments'] = story_comment_list |
|
return r |
|
|
|
def get_comment_info(self, c): |
|
try: |
|
author = c.find('a', class_='profile-link text-weight-normal text-blue').text.strip() |
|
except: |
|
author = 'Unknown user' |
|
regex = re.compile('small text-wrap.*') |
|
comment_text = c.find('p', class_=regex).text.strip() |
|
time = c.find('small', class_='text-grey').text.strip() |
|
points = c.find('button', class_='btn-no-decoration trigger-signup-modal text-grey').text.replace('points', '').strip() |
|
return { |
|
author: comment_text, |
|
'time': time, |
|
'points': points |
|
} |
|
|
|
def get_comment(self, s): |
|
|
|
story_comment_list = [] |
|
comment_containers = s.find_all('div', class_='comment-container') |
|
|
|
for cc in comment_containers: |
|
comments = cc.find_all('div', class_='comment') |
|
|
|
|
|
if len(comments) > 0: |
|
main_comment = self.get_comment_info(comments[0]) |
|
nested_comments = [] |
|
if len(comments) > 1: |
|
for c in comments[1:]: |
|
nested_comments.append(self.get_comment_info(c)) |
|
story_comment_list.append([main_comment, nested_comments]) |
|
return story_comment_list |
|
|
|
if __name__ == '__main__': |
|
rc = ReedsyCrawler() |
|
rc.run() |