|
from flask import Flask, jsonify, request |
|
import requests |
|
from bs4 import BeautifulSoup |
|
import os |
|
import re |
|
import urllib.parse |
|
import time |
|
import random |
|
import base64 |
|
from io import BytesIO |
|
from googlesearch import search |
|
import json |
|
|
|
app = Flask(__name__) |
|
|
|
def search_images(query, num_images=5): |
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.5', |
|
'Accept-Encoding': 'gzip, deflate', |
|
'DNT': '1', |
|
'Connection': 'keep-alive', |
|
} |
|
|
|
|
|
formatted_query = urllib.parse.quote(query + " high quality") |
|
|
|
|
|
url = f"https://www.google.com/search?q={formatted_query}&tbm=isch&safe=active" |
|
|
|
try: |
|
|
|
response = requests.get(url, headers=headers, timeout=30) |
|
response.raise_for_status() |
|
|
|
|
|
image_urls = re.findall(r'https?://[^"\']*?(?:jpg|jpeg|png|gif)', response.text) |
|
|
|
|
|
image_urls = list(dict.fromkeys(image_urls)) |
|
|
|
|
|
results = [] |
|
for img_url in image_urls: |
|
if len(results) >= num_images: |
|
break |
|
|
|
|
|
if ('gstatic.com' in img_url or |
|
'google.com' in img_url or |
|
'icon' in img_url.lower() or |
|
'thumb' in img_url.lower() or |
|
'small' in img_url.lower()): |
|
continue |
|
|
|
try: |
|
|
|
img_response = requests.head(img_url, headers=headers, timeout=5) |
|
if img_response.status_code == 200: |
|
content_type = img_response.headers.get('Content-Type', '') |
|
if content_type.startswith('image/'): |
|
results.append({ |
|
'url': img_url, |
|
'content_type': content_type |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Error checking image URL: {str(e)}") |
|
continue |
|
|
|
|
|
time.sleep(random.uniform(0.2, 0.5)) |
|
|
|
return results |
|
|
|
except Exception as e: |
|
print(f"An error occurred: {str(e)}") |
|
return [] |
|
|
|
def get_cover_image(query): |
|
"""Get a high-quality cover image URL for a given query""" |
|
try: |
|
|
|
images = search_images(query, num_images=3) |
|
|
|
if not images: |
|
return None |
|
|
|
|
|
return images[0]['url'] |
|
|
|
except Exception as e: |
|
print(f"Error getting cover image: {str(e)}") |
|
return None |
|
|
|
@app.route('/search_images', methods=['GET']) |
|
def api_search_images(): |
|
try: |
|
|
|
query = request.args.get('query', '') |
|
num_images = int(request.args.get('num_images', 5)) |
|
|
|
if not query: |
|
return jsonify({'error': 'Query parameter is required'}), 400 |
|
|
|
if num_images < 1 or num_images > 20: |
|
return jsonify({'error': 'Number of images must be between 1 and 20'}), 400 |
|
|
|
|
|
results = search_images(query, num_images) |
|
|
|
return jsonify({ |
|
'success': True, |
|
'query': query, |
|
'results': results |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({ |
|
'success': False, |
|
'error': str(e) |
|
}), 500 |
|
|
|
def scrape_site_content(query, num_sites=5): |
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.5', |
|
'Accept-Encoding': 'gzip, deflate', |
|
'DNT': '1', |
|
'Connection': 'keep-alive', |
|
} |
|
|
|
results = [] |
|
scraped = 0 |
|
retries = 2 |
|
timeout = 5 |
|
|
|
try: |
|
|
|
search_results = list(search(query, num_results=num_sites * 2)) |
|
|
|
|
|
for url in search_results: |
|
if scraped >= num_sites: |
|
break |
|
|
|
success = False |
|
for attempt in range(retries): |
|
try: |
|
|
|
print(f"Trying {url} (attempt {attempt + 1}/{retries})") |
|
response = requests.get( |
|
url, |
|
headers=headers, |
|
timeout=timeout, |
|
verify=False |
|
) |
|
response.raise_for_status() |
|
|
|
|
|
content_type = response.headers.get('Content-Type', '').lower() |
|
if 'text/html' not in content_type: |
|
print(f"Skipping {url} - not HTML content") |
|
break |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
for script in soup(["script", "style"]): |
|
script.decompose() |
|
|
|
|
|
text_content = soup.get_text(separator='\n', strip=True)[:10000] |
|
|
|
|
|
if len(text_content.split()) < 100: |
|
print(f"Skipping {url} - not enough content") |
|
break |
|
|
|
|
|
links = [] |
|
for link in soup.find_all('a', href=True)[:10]: |
|
href = link['href'] |
|
if href.startswith('http'): |
|
links.append({ |
|
'text': link.get_text(strip=True), |
|
'url': href |
|
}) |
|
|
|
|
|
title = soup.title.string if soup.title else '' |
|
meta_description = '' |
|
meta_keywords = '' |
|
|
|
meta_desc_tag = soup.find('meta', attrs={'name': 'description'}) |
|
if meta_desc_tag: |
|
meta_description = meta_desc_tag.get('content', '') |
|
|
|
meta_keywords_tag = soup.find('meta', attrs={'name': 'keywords'}) |
|
if meta_keywords_tag: |
|
meta_keywords = meta_keywords_tag.get('content', '') |
|
|
|
results.append({ |
|
'url': url, |
|
'title': title, |
|
'meta_description': meta_description, |
|
'meta_keywords': meta_keywords, |
|
'text_content': text_content, |
|
'links': links |
|
}) |
|
|
|
scraped += 1 |
|
success = True |
|
|
|
time.sleep(random.uniform(0.5, 1)) |
|
break |
|
|
|
except requests.Timeout: |
|
print(f"Timeout on {url} (attempt {attempt + 1}/{retries})") |
|
if attempt == retries - 1: |
|
print(f"Skipping {url} after {retries} timeout attempts") |
|
except requests.RequestException as e: |
|
print(f"Error scraping {url} (attempt {attempt + 1}/{retries}): {str(e)}") |
|
if attempt == retries - 1: |
|
print(f"Skipping {url} after {retries} failed attempts") |
|
|
|
|
|
if not success and attempt < retries - 1: |
|
time.sleep(random.uniform(1, 2)) |
|
|
|
|
|
if scraped < num_sites and len(results) < len(search_results): |
|
continue |
|
|
|
return results |
|
|
|
except Exception as e: |
|
print(f"Error in search/scraping process: {str(e)}") |
|
|
|
return results |
|
|
|
@app.route('/scrape_sites', methods=['GET']) |
|
def api_scrape_sites(): |
|
try: |
|
|
|
query = request.args.get('query', '') |
|
num_sites = int(request.args.get('num_sites', 10)) |
|
|
|
if not query: |
|
return jsonify({'error': 'Query parameter is required'}), 400 |
|
|
|
if num_sites < 1 or num_sites > 20: |
|
return jsonify({'error': 'Number of sites must be between 1 and 20'}), 400 |
|
|
|
|
|
results = scrape_site_content(query, num_sites) |
|
|
|
return jsonify({ |
|
'success': True, |
|
'query': query, |
|
'results': results |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({ |
|
'success': False, |
|
'error': str(e) |
|
}), 500 |
|
|
|
def analyze_with_gpt(scraped_content, research_query): |
|
"""Analyze scraped content using OpenRouter's Gemini model""" |
|
try: |
|
headers = { |
|
'Authorization': f'Bearer {os.getenv("OPENROUTER_API_KEY")}', |
|
'HTTP-Referer': 'http://localhost:5001', |
|
'X-Title': 'Research Assistant' |
|
} |
|
|
|
|
|
prompt = f"""You are a research assistant analyzing web content to provide comprehensive research. |
|
|
|
Research Query: {research_query} |
|
|
|
Below is content scraped from various web sources. Analyze this content and provide a detailed, well-structured research response. |
|
Make sure to cite sources when making specific claims. |
|
|
|
Scraped Content: |
|
{json.dumps(scraped_content, indent=2)} |
|
|
|
Please provide: |
|
1. A comprehensive analysis of the topic |
|
2. Key findings and insights |
|
3. Supporting evidence from the sources |
|
4. Any additional considerations or caveats |
|
|
|
Format your response in markdown with proper headings and citations.""" |
|
|
|
response = requests.post( |
|
'https://openrouter.ai/api/v1/chat/completions', |
|
headers=headers, |
|
json={ |
|
'model': 'google/gemini-2.0-flash-thinking-exp:free', |
|
'messages': [{ |
|
'role': 'user', |
|
'content': prompt |
|
}] |
|
}, |
|
timeout=60 |
|
) |
|
|
|
if response.status_code != 200: |
|
raise Exception(f"OpenRouter API error: {response.text}") |
|
|
|
return response.json()['choices'][0]['message']['content'] |
|
except Exception as e: |
|
print(f"Error in analyze_with_gpt: {str(e)}") |
|
return f"Error analyzing content: {str(e)}" |
|
|
|
def research_topic(query, num_sites=5): |
|
"""Research a topic using web scraping and GPT analysis""" |
|
try: |
|
|
|
scraped_results = scrape_site_content(query, num_sites) |
|
|
|
|
|
formatted_content = [] |
|
for result in scraped_results: |
|
formatted_content.append({ |
|
'source': result['url'], |
|
'title': result['title'], |
|
'content': result['text_content'][:2000], |
|
'meta_info': { |
|
'description': result['meta_description'], |
|
'keywords': result['meta_keywords'] |
|
} |
|
}) |
|
|
|
|
|
analysis = analyze_with_gpt(formatted_content, query) |
|
|
|
return { |
|
'success': True, |
|
'query': query, |
|
'analysis': analysis, |
|
'sources': formatted_content |
|
} |
|
except Exception as e: |
|
return { |
|
'success': False, |
|
'error': str(e) |
|
} |
|
|
|
@app.route('/research', methods=['GET']) |
|
def api_research(): |
|
try: |
|
query = request.args.get('query', '') |
|
|
|
num_sites = 5 |
|
|
|
if not query: |
|
return jsonify({'error': 'Query parameter is required'}), 400 |
|
|
|
results = research_topic(query, num_sites) |
|
return jsonify(results) |
|
|
|
except Exception as e: |
|
return jsonify({ |
|
'success': False, |
|
'error': str(e) |
|
}), 500 |
|
|
|
if __name__ == '__main__': |
|
app.run(host='0.0.0.0', port=5000) |
|
|