web-crawling / main.py
pvanand's picture
Update main.py
11417cc verified
raw
history blame
7.16 kB
from fastapi import FastAPI, HTTPException, Query, Depends
from fastapi.responses import Response
from pydantic import BaseModel
import hrequests
import trafilatura
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional
from pytrends.request import TrendReq
from datetime import datetime, timedelta
from fastapi_cache import FastAPICache
from fastapi_cache.backends.inmemory import InMemoryBackend
from fastapi_cache.decorator import cache
import pdfkit
app = FastAPI()
class URLRequest(BaseModel):
url: str
@app.post("/scrape")
async def scrape(url_request: URLRequest):
try:
response = hrequests.get(url_request.url, browser='chrome')
return {"content": response.text}
except Exception as e:
raise e
@app.get("/extract-article")
def extract_article(
url: str,
record_id: Optional[str] = Query(None, description="Add an ID to the metadata."),
no_fallback: Optional[bool] = Query(False, description="Skip the backup extraction with readability-lxml and justext."),
favor_precision: Optional[bool] = Query(False, description="Prefer less text but correct extraction."),
favor_recall: Optional[bool] = Query(False, description="When unsure, prefer more text."),
include_comments: Optional[bool] = Query(True, description="Extract comments along with the main text."),
output_format: Optional[str] = Query('txt', description="Define an output format: 'csv', 'json', 'markdown', 'txt', 'xml', 'xmltei'.", enum=["csv", "json", "markdown", "txt", "xml", "xmltei"]),
target_language: Optional[str] = Query(None, description="Define a language to discard invalid documents (ISO 639-1 format)."),
include_tables: Optional[bool] = Query(True, description="Take into account information within the HTML <table> element."),
include_images: Optional[bool] = Query(False, description="Take images into account (experimental)."),
include_links: Optional[bool] = Query(False, description="Keep links along with their targets (experimental)."),
deduplicate: Optional[bool] = Query(False, description="Remove duplicate segments and documents."),
max_tree_size: Optional[int] = Query(None, description="Discard documents with too many elements.")
):
response = hrequests.get(url)
filecontent = response.text
extracted = trafilatura.extract(
filecontent,
url=url,
record_id=record_id,
no_fallback=no_fallback,
favor_precision=favor_precision,
favor_recall=favor_recall,
include_comments=include_comments,
output_format=output_format,
target_language=target_language,
include_tables=include_tables,
include_images=include_images,
include_links=include_links,
deduplicate=deduplicate,
max_tree_size=max_tree_size
)
if extracted:
return {"article": trafilatura.utils.sanitize(extracted)}
else:
return {"error": "Could not extract the article"}
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
pytrends = TrendReq()
@app.on_event("startup")
async def startup():
FastAPICache.init(InMemoryBackend(), prefix="fastapi-cache")
@app.get("/realtime_trending_searches")
@cache(expire=3600)
async def get_realtime_trending_searches(pn: str = Query('US', description="Country code for trending searches")):
trending_searches = pytrends.realtime_trending_searches(pn=pn)
return trending_searches.to_dict(orient='records')
@app.get("/", tags=["Home"])
def api_home():
return {'detail': 'Welcome to Web-Scraping API! Visit https://pvanand-web-scraping.hf.space/docs to test'}
class HTMLRequest(BaseModel):
html_content: str
@app.post("/html_to_pdf")
async def convert_to_pdf(request: HTMLRequest):
try:
options = {
'page-size': 'A4',
'margin-top': '0.75in',
'margin-right': '0.75in',
'margin-bottom': '0.75in',
'margin-left': '0.75in',
'encoding': "UTF-8",
}
pdf = pdfkit.from_string(request.html_content, False, options=options)
return Response(content=pdf, media_type="application/pdf")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
from fastapi import FastAPI, HTTPException, Response
from pydantic import BaseModel
from html4docx import HtmlToDocx
import os
class HTMLInput(BaseModel):
html: str
# Define the path to the temporary folder
TEMP_FOLDER = "/app/temp"
@app.post("/convert")
async def convert_html_to_docx(input_data: HTMLInput):
temp_filename = None
try:
# Create a new HtmlToDocx parser
parser = HtmlToDocx()
# Parse the HTML string to DOCX
docx = parser.parse_html_string(input_data.html)
# Create a unique filename in the temporary folder
temp_filename = os.path.join(TEMP_FOLDER, f"temp_{os.urandom(8).hex()}.docx")
# Save the DOCX to the temporary file
docx.save(temp_filename)
# Open the file and read its contents
with open(temp_filename, 'rb') as file:
file_contents = file.read()
# Return the DOCX file as a response
return Response(
content=file_contents,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": "attachment; filename=converted.docx"}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
# Clean up: remove the temporary file
if temp_filename and os.path.exists(temp_filename):
os.remove(temp_filename)
@app.post("/html_to_docx")
async def convert_html_to_docx(input_data: HTMLRequest):
temp_filename = None
try:
# Create a new HtmlToDocx parser
parser = HtmlToDocx()
# Parse the HTML string to DOCX
docx = parser.parse_html_string(input_data.html_content)
# Create a unique filename in the temporary folder
temp_filename = os.path.join(TEMP_FOLDER, f"temp_{os.urandom(8).hex()}.docx")
# Save the DOCX to the temporary file
docx.save(temp_filename)
# Open the file and read its contents
with open(temp_filename, 'rb') as file:
file_contents = file.read()
# Return the DOCX file as a response
return Response(
content=file_contents,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": "attachment; filename=converted.docx"}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
# Clean up: remove the temporary file
if temp_filename and os.path.exists(temp_filename):
os.remove(temp_filename)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)