from flask import Flask, jsonify, request import undetected_chromedriver as uc from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from bs4 import BeautifulSoup import base64 import time import random import re import requests from io import BytesIO from PIL import Image import json import threading from urllib.parse import quote, urlparse import html2text app = Flask(__name__) # Thread-local storage for the browser instance thread_local = threading.local() def get_browser(): """Get or create thread-local browser instance""" if not hasattr(thread_local, "browser"): chrome_options = uc.ChromeOptions() chrome_options.add_argument('--headless') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--window-size=1920,1080') thread_local.browser = uc.Chrome(options=chrome_options) return thread_local.browser def search_images(query, num_images=5): """Enhanced image search using selenium""" browser = get_browser() results = [] try: # Google Images search search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch" browser.get(search_url) # Wait for images to load time.sleep(2) # Scroll to load more images for _ in range(3): browser.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(1) # Find image elements image_elements = browser.find_elements(By.CSS_SELECTOR, 'img.rg_i') for img in image_elements[:num_images]: try: # Click image to get full resolution img.click() time.sleep(1) # Wait for full resolution image wait = WebDriverWait(browser, 10) full_img = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'img.n3VNCb'))) img_url = full_img.get_attribute('src') # Skip data URLs and unwanted domains if (img_url.startswith('data:') or any(domain in img_url.lower() for domain in ['gstatic.com', 'google.com'])): continue # Download and process image response = requests.get(img_url, timeout=10) img_content = response.content # Verify it's an image and get format img = Image.open(BytesIO(img_content)) img_format = img.format.lower() # Convert to base64 buffered = BytesIO() img.save(buffered, format=img_format) img_base64 = base64.b64encode(buffered.getvalue()).decode() results.append({ 'image_url': img_url, 'base64_data': f"data:image/{img_format};base64,{img_base64}", 'size': len(img_content), 'dimensions': img.size, 'format': img_format }) time.sleep(random.uniform(0.5, 1.0)) except Exception as e: print(f"Error processing image: {str(e)}") continue if len(results) >= num_images: break except Exception as e: print(f"Search error: {str(e)}") return results def scrape_website(url): """Enhanced website scraping using selenium""" browser = get_browser() try: browser.get(url) time.sleep(2) # Wait for dynamic content # Get page source after JavaScript execution page_source = browser.page_source soup = BeautifulSoup(page_source, 'html.parser') # Extract metadata and content meta_data = { 'title': soup.title.string if soup.title else '', 'description': '', 'keywords': '', 'author': '', 'published_date': '' } # Meta tags meta_tags = { 'description': ['description', 'og:description'], 'keywords': ['keywords'], 'author': ['author', 'og:author'], 'published_date': ['article:published_time', 'datePublished'] } for key, meta_names in meta_tags.items(): for name in meta_names: meta_tag = soup.find('meta', attrs={'name': name}) or soup.find('meta', attrs={'property': name}) if meta_tag and meta_tag.get('content'): meta_data[key] = meta_tag.get('content') break # Get main content main_content = '' content_tags = soup.find_all(['article', 'main', 'div'], class_=re.compile(r'(content|article|post|entry)')) if content_tags: main_content = ' '.join(tag.get_text(strip=True) for tag in content_tags) else: main_content = ' '.join(p.get_text(strip=True) for p in soup.find_all('p')) return { 'title': clean_text(meta_data['title']), 'meta_description': clean_text(meta_data['description']), 'keywords': clean_text(meta_data['keywords']), 'author': clean_text(meta_data['author']), 'published_date': meta_data['published_date'], 'content': clean_text(main_content)[:2000], 'url': url, 'domain': get_domain(url) } except Exception as e: print(f"Error scraping {url}: {str(e)}") return None def search_and_scrape(query, num_results=5): """Enhanced search and scrape using selenium""" browser = get_browser() results = [] try: # Perform Google search search_url = f"https://www.google.com/search?q={quote(query)}&num={num_results + 5}" browser.get(search_url) time.sleep(2) # Get search results search_results = browser.find_elements(By.CSS_SELECTOR, 'div.g') seen_domains = set() for result in search_results: if len(results) >= num_results: break try: link = result.find_element(By.CSS_SELECTOR, 'a') href = link.get_attribute('href') # Skip unwanted URLs if not href or not href.startswith('http') or \ any(x in href.lower() for x in ['google.', 'youtube.', 'facebook.', 'twitter.']): continue # Check for duplicate domains domain = get_domain(href) if domain in seen_domains: continue seen_domains.add(domain) # Scrape website site_data = scrape_website(href) if site_data and site_data['content']: results.append(site_data) time.sleep(random.uniform(1, 2)) except Exception as e: print(f"Error processing result: {str(e)}") continue except Exception as e: print(f"Search error: {str(e)}") return results def clean_text(text): """Clean extracted text""" if not text: return '' text = str(text) text = re.sub(r'\s+', ' ', text) text = re.sub(r'[^\w\s.,!?-]', '', text) return text.strip() def get_domain(url): """Extract domain from URL""" try: return urlparse(url).netloc.replace('www.', '') except: return url @app.route('/search_images', methods=['GET']) def api_search_images(): try: query = request.args.get('query', '') num_images = int(request.args.get('num_images', 5)) if not query: return jsonify({'error': 'Query parameter is required'}), 400 if num_images < 1 or num_images > 20: return jsonify({'error': 'Number of images must be between 1 and 20'}), 400 results = search_images(query, num_images) return jsonify({ 'success': True, 'query': query, 'count': len(results), 'results': results }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/scrape_sites', methods=['GET']) def api_scrape_sites(): try: query = request.args.get('query', '') num_results = int(request.args.get('num_results', 5)) if not query: return jsonify({'error': 'Query parameter is required'}), 400 if num_results < 1 or num_results > 10: return jsonify({'error': 'Number of results must be between 1 and 10'}), 400 results = search_and_scrape(query, num_results) return jsonify({ 'success': True, 'query': query, 'count': len(results), 'results': results }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }), 500 @app.teardown_appcontext def cleanup(exception=None): """Clean up browser instances""" if hasattr(thread_local, "browser"): thread_local.browser.quit() if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)