Spaces:
Sleeping
Sleeping
from flask import Flask, jsonify, request | |
import undetected_chromedriver as uc | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.chrome.options import Options | |
from bs4 import BeautifulSoup | |
import base64 | |
import time | |
import random | |
import re | |
import requests | |
from io import BytesIO | |
from PIL import Image | |
import json | |
import threading | |
from urllib.parse import quote, urlparse | |
import html2text | |
app = Flask(__name__) | |
# Thread-local storage for the browser instance | |
thread_local = threading.local() | |
def get_browser(): | |
"""Get or create thread-local browser instance""" | |
if not hasattr(thread_local, "browser"): | |
chrome_options = uc.ChromeOptions() | |
chrome_options.add_argument('--headless') | |
chrome_options.add_argument('--no-sandbox') | |
chrome_options.add_argument('--disable-dev-shm-usage') | |
chrome_options.add_argument('--disable-gpu') | |
chrome_options.add_argument('--window-size=1920,1080') | |
thread_local.browser = uc.Chrome(options=chrome_options) | |
return thread_local.browser | |
def search_images(query, num_images=5): | |
"""Enhanced image search using selenium""" | |
browser = get_browser() | |
results = [] | |
try: | |
# Google Images search | |
search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch" | |
browser.get(search_url) | |
# Wait for images to load | |
time.sleep(2) | |
# Scroll to load more images | |
for _ in range(3): | |
browser.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
time.sleep(1) | |
# Find image elements | |
image_elements = browser.find_elements(By.CSS_SELECTOR, 'img.rg_i') | |
for img in image_elements[:num_images]: | |
try: | |
# Click image to get full resolution | |
img.click() | |
time.sleep(1) | |
# Wait for full resolution image | |
wait = WebDriverWait(browser, 10) | |
full_img = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'img.n3VNCb'))) | |
img_url = full_img.get_attribute('src') | |
# Skip data URLs and unwanted domains | |
if (img_url.startswith('data:') or | |
any(domain in img_url.lower() for domain in ['gstatic.com', 'google.com'])): | |
continue | |
# Download and process image | |
response = requests.get(img_url, timeout=10) | |
img_content = response.content | |
# Verify it's an image and get format | |
img = Image.open(BytesIO(img_content)) | |
img_format = img.format.lower() | |
# Convert to base64 | |
buffered = BytesIO() | |
img.save(buffered, format=img_format) | |
img_base64 = base64.b64encode(buffered.getvalue()).decode() | |
results.append({ | |
'image_url': img_url, | |
'base64_data': f"data:image/{img_format};base64,{img_base64}", | |
'size': len(img_content), | |
'dimensions': img.size, | |
'format': img_format | |
}) | |
time.sleep(random.uniform(0.5, 1.0)) | |
except Exception as e: | |
print(f"Error processing image: {str(e)}") | |
continue | |
if len(results) >= num_images: | |
break | |
except Exception as e: | |
print(f"Search error: {str(e)}") | |
return results | |
def scrape_website(url): | |
"""Enhanced website scraping using selenium""" | |
browser = get_browser() | |
try: | |
browser.get(url) | |
time.sleep(2) # Wait for dynamic content | |
# Get page source after JavaScript execution | |
page_source = browser.page_source | |
soup = BeautifulSoup(page_source, 'html.parser') | |
# Extract metadata and content | |
meta_data = { | |
'title': soup.title.string if soup.title else '', | |
'description': '', | |
'keywords': '', | |
'author': '', | |
'published_date': '' | |
} | |
# Meta tags | |
meta_tags = { | |
'description': ['description', 'og:description'], | |
'keywords': ['keywords'], | |
'author': ['author', 'og:author'], | |
'published_date': ['article:published_time', 'datePublished'] | |
} | |
for key, meta_names in meta_tags.items(): | |
for name in meta_names: | |
meta_tag = soup.find('meta', attrs={'name': name}) or soup.find('meta', attrs={'property': name}) | |
if meta_tag and meta_tag.get('content'): | |
meta_data[key] = meta_tag.get('content') | |
break | |
# Get main content | |
main_content = '' | |
content_tags = soup.find_all(['article', 'main', 'div'], | |
class_=re.compile(r'(content|article|post|entry)')) | |
if content_tags: | |
main_content = ' '.join(tag.get_text(strip=True) for tag in content_tags) | |
else: | |
main_content = ' '.join(p.get_text(strip=True) for p in soup.find_all('p')) | |
return { | |
'title': clean_text(meta_data['title']), | |
'meta_description': clean_text(meta_data['description']), | |
'keywords': clean_text(meta_data['keywords']), | |
'author': clean_text(meta_data['author']), | |
'published_date': meta_data['published_date'], | |
'content': clean_text(main_content)[:2000], | |
'url': url, | |
'domain': get_domain(url) | |
} | |
except Exception as e: | |
print(f"Error scraping {url}: {str(e)}") | |
return None | |
def search_and_scrape(query, num_results=5): | |
"""Enhanced search and scrape using selenium""" | |
browser = get_browser() | |
results = [] | |
try: | |
# Perform Google search | |
search_url = f"https://www.google.com/search?q={quote(query)}&num={num_results + 5}" | |
browser.get(search_url) | |
time.sleep(2) | |
# Get search results | |
search_results = browser.find_elements(By.CSS_SELECTOR, 'div.g') | |
seen_domains = set() | |
for result in search_results: | |
if len(results) >= num_results: | |
break | |
try: | |
link = result.find_element(By.CSS_SELECTOR, 'a') | |
href = link.get_attribute('href') | |
# Skip unwanted URLs | |
if not href or not href.startswith('http') or \ | |
any(x in href.lower() for x in ['google.', 'youtube.', 'facebook.', 'twitter.']): | |
continue | |
# Check for duplicate domains | |
domain = get_domain(href) | |
if domain in seen_domains: | |
continue | |
seen_domains.add(domain) | |
# Scrape website | |
site_data = scrape_website(href) | |
if site_data and site_data['content']: | |
results.append(site_data) | |
time.sleep(random.uniform(1, 2)) | |
except Exception as e: | |
print(f"Error processing result: {str(e)}") | |
continue | |
except Exception as e: | |
print(f"Search error: {str(e)}") | |
return results | |
def clean_text(text): | |
"""Clean extracted text""" | |
if not text: | |
return '' | |
text = str(text) | |
text = re.sub(r'\s+', ' ', text) | |
text = re.sub(r'[^\w\s.,!?-]', '', text) | |
return text.strip() | |
def get_domain(url): | |
"""Extract domain from URL""" | |
try: | |
return urlparse(url).netloc.replace('www.', '') | |
except: | |
return url | |
def api_search_images(): | |
try: | |
query = request.args.get('query', '') | |
num_images = int(request.args.get('num_images', 5)) | |
if not query: | |
return jsonify({'error': 'Query parameter is required'}), 400 | |
if num_images < 1 or num_images > 20: | |
return jsonify({'error': 'Number of images must be between 1 and 20'}), 400 | |
results = search_images(query, num_images) | |
return jsonify({ | |
'success': True, | |
'query': query, | |
'count': len(results), | |
'results': results | |
}) | |
except Exception as e: | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def api_scrape_sites(): | |
try: | |
query = request.args.get('query', '') | |
num_results = int(request.args.get('num_results', 5)) | |
if not query: | |
return jsonify({'error': 'Query parameter is required'}), 400 | |
if num_results < 1 or num_results > 10: | |
return jsonify({'error': 'Number of results must be between 1 and 10'}), 400 | |
results = search_and_scrape(query, num_results) | |
return jsonify({ | |
'success': True, | |
'query': query, | |
'count': len(results), | |
'results': results | |
}) | |
except Exception as e: | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def cleanup(exception=None): | |
"""Clean up browser instances""" | |
if hasattr(thread_local, "browser"): | |
thread_local.browser.quit() | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=5000) | |