ss / app.py
arcticaurora's picture
Create app.py
e38e721 verified
raw
history blame
5.27 kB
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import requests
from bs4 import BeautifulSoup
import json
app = FastAPI()
class ArticleScraper:
def __init__(self):
self.scraper_api_key = "24610cfe7680c5a15d77bd32cfd23fc3"
self.scraper_api_url = "http://api.scraperapi.com/"
def scrape_bloomberg(self, article_url):
params = {
"api_key": self.scraper_api_key,
"url": article_url,
}
response = requests.get(self.scraper_api_url, params=params)
html = response.text
soup = BeautifulSoup(html, 'html.parser')
script = soup.find('script', {'id': '__NEXT_DATA__'})
json_data = json.loads(script.text)
props = json_data['props']['pageProps']
contents = props['story']['body']['content']
article_text = []
for item in contents:
text = self.extract_text(item)
if text:
article_text.append(text)
return '\n\n'.join(article_text)
def scrape_financial_times(self, article_url):
headers = {
'Referer': 'https://twitter.com'
}
cookies = {
'FTCookieConsentGDPR': 'true',
'FTAllocation': '00000000-0000-0000-0000-000000000000'
}
response = requests.get(article_url, headers=headers, cookies=cookies)
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'html.parser')
article_script = soup.find('script', {'type': 'application/ld+json'})
if article_script:
article_data = json.loads(article_script.string)
return article_data.get('articleBody', '')
else:
return "Article content not found in the expected format."
else:
return f"Failed to retrieve the webpage. Status code: {response.status_code}"
def extract_text(self, content_item):
if content_item['type'] == 'paragraph':
text_parts = []
for item in content_item['content']:
if item['type'] == 'text':
text_parts.append(item['value'])
elif item['type'] == 'entity':
if 'link' in item['data'] and item['data']['link']['destination'].get('web'):
url = item['data']['link']['destination']['web']
text = ' '.join([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text'])
text_parts.append(f"[{text}]({url})")
else:
text_parts.extend([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text'])
elif item['type'] == 'link':
url = item['data']['destination'].get('web', '')
text = ' '.join([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text'])
if url:
text_parts.append(f"[{text}]({url})")
else:
text_parts.append(text)
return ' '.join(text_parts)
elif content_item['type'] == 'entity' and content_item['subType'] == 'story':
url = content_item['data']['link']['destination'].get('web', '')
text = ' '.join([sub_item['value'] for sub_item in content_item['content'] if sub_item['type'] == 'text'])
return f"Read More: [{text}]({url})"
elif content_item['type'] == 'media' and content_item['subType'] == 'photo':
photo_data = content_item['data']['photo']
caption = photo_data.get('caption', '')
credit = photo_data.get('credit', '')
src = photo_data.get('src', '')
alt = photo_data.get('alt', '')
return f"\n![{alt}]({src})\n*{caption}* {credit}\n"
elif content_item['type'] == 'media' and content_item['subType'] == 'chart':
chart_data = content_item['data']['chart']
attachment = content_item['data']['attachment']
title = attachment.get('title', '')
subtitle = attachment.get('subtitle', '')
source = attachment.get('source', '')
fallback_image = chart_data.get('fallback', '')
footnote = attachment.get('footnote', '')
return f"\n![{title}]({fallback_image})\n**{title}**\n*{subtitle}*\n{footnote}\n{source}\n"
return ''
def scrape_article(self, url):
if 'bloomberg.com' in url:
return self.scrape_bloomberg(url)
elif 'ft.com' in url:
return self.scrape_financial_times(url)
else:
return "Unsupported website. Please provide a URL from Bloomberg or Financial Times."
class ArticleRequest(BaseModel):
url: str
@app.post("/scrape_article/")
async def scrape_article(request: ArticleRequest):
scraper = ArticleScraper()
content = scraper.scrape_article(request.url)
if "Unsupported website" in content:
raise HTTPException(status_code=400, detail=content)
return {"content": content}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)