Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
import requests | |
from bs4 import BeautifulSoup | |
import json | |
app = FastAPI() | |
class ArticleScraper: | |
def __init__(self): | |
self.scraper_api_key = "24610cfe7680c5a15d77bd32cfd23fc3" | |
self.scraper_api_url = "http://api.scraperapi.com/" | |
def scrape_bloomberg(self, article_url): | |
params = { | |
"api_key": self.scraper_api_key, | |
"url": article_url, | |
} | |
response = requests.get(self.scraper_api_url, params=params) | |
html = response.text | |
soup = BeautifulSoup(html, 'html.parser') | |
script = soup.find('script', {'id': '__NEXT_DATA__'}) | |
json_data = json.loads(script.text) | |
props = json_data['props']['pageProps'] | |
contents = props['story']['body']['content'] | |
article_text = [] | |
for item in contents: | |
text = self.extract_text(item) | |
if text: | |
article_text.append(text) | |
return '\n\n'.join(article_text) | |
def scrape_financial_times(self, article_url): | |
headers = { | |
'Referer': 'https://twitter.com' | |
} | |
cookies = { | |
'FTCookieConsentGDPR': 'true', | |
'FTAllocation': '00000000-0000-0000-0000-000000000000' | |
} | |
response = requests.get(article_url, headers=headers, cookies=cookies) | |
if response.status_code == 200: | |
soup = BeautifulSoup(response.content, 'html.parser') | |
article_script = soup.find('script', {'type': 'application/ld+json'}) | |
if article_script: | |
article_data = json.loads(article_script.string) | |
return article_data.get('articleBody', '') | |
else: | |
return "Article content not found in the expected format." | |
else: | |
return f"Failed to retrieve the webpage. Status code: {response.status_code}" | |
def extract_text(self, content_item): | |
if content_item['type'] == 'paragraph': | |
text_parts = [] | |
for item in content_item['content']: | |
if item['type'] == 'text': | |
text_parts.append(item['value']) | |
elif item['type'] == 'entity': | |
if 'link' in item['data'] and item['data']['link']['destination'].get('web'): | |
url = item['data']['link']['destination']['web'] | |
text = ' '.join([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text']) | |
text_parts.append(f"[{text}]({url})") | |
else: | |
text_parts.extend([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text']) | |
elif item['type'] == 'link': | |
url = item['data']['destination'].get('web', '') | |
text = ' '.join([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text']) | |
if url: | |
text_parts.append(f"[{text}]({url})") | |
else: | |
text_parts.append(text) | |
return ' '.join(text_parts) | |
elif content_item['type'] == 'entity' and content_item['subType'] == 'story': | |
url = content_item['data']['link']['destination'].get('web', '') | |
text = ' '.join([sub_item['value'] for sub_item in content_item['content'] if sub_item['type'] == 'text']) | |
return f"Read More: [{text}]({url})" | |
elif content_item['type'] == 'media' and content_item['subType'] == 'photo': | |
photo_data = content_item['data']['photo'] | |
caption = photo_data.get('caption', '') | |
credit = photo_data.get('credit', '') | |
src = photo_data.get('src', '') | |
alt = photo_data.get('alt', '') | |
return f"\n![{alt}]({src})\n*{caption}* {credit}\n" | |
elif content_item['type'] == 'media' and content_item['subType'] == 'chart': | |
chart_data = content_item['data']['chart'] | |
attachment = content_item['data']['attachment'] | |
title = attachment.get('title', '') | |
subtitle = attachment.get('subtitle', '') | |
source = attachment.get('source', '') | |
fallback_image = chart_data.get('fallback', '') | |
footnote = attachment.get('footnote', '') | |
return f"\n![{title}]({fallback_image})\n**{title}**\n*{subtitle}*\n{footnote}\n{source}\n" | |
return '' | |
def scrape_article(self, url): | |
if 'bloomberg.com' in url: | |
return self.scrape_bloomberg(url) | |
elif 'ft.com' in url: | |
return self.scrape_financial_times(url) | |
else: | |
return "Unsupported website. Please provide a URL from Bloomberg or Financial Times." | |
class ArticleRequest(BaseModel): | |
url: str | |
async def scrape_article(request: ArticleRequest): | |
scraper = ArticleScraper() | |
content = scraper.scrape_article(request.url) | |
if "Unsupported website" in content: | |
raise HTTPException(status_code=400, detail=content) | |
return {"content": content} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |