webcrawler / trend_crawl2.py
Add1E's picture
Update trend_crawl2.py
7b0df94 verified
raw
history blame
6.09 kB
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.common.exceptions import ElementClickInterceptedException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from webdriver_manager.chrome import ChromeDriverManager
import time
import json
# Configure Chrome options
chrome_options = Options()
chrome_options.add_argument("--headless") # Run in headless mode
chrome_options.add_argument("--disable-gpu")
def setup_driver():
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--lang=de")
wd = webdriver.Chrome(options=options)
return wd
def click_and_scrape(driver, url):
"""Click each li element and scrape data."""
result_dict = {}
try:
driver.get(url)
# Wait for the ul element to load
try:
# Wait for the ul element with the specific aria-label to load
ul_element = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]"))
)
li_elements = ul_element.find_elements(By.TAG_NAME, "li")
except Exception as e:
print(f"Error locating ul_element: {e}")
selected_elements = [li_elements[2]] + li_elements[4:]
for index, li in enumerate(selected_elements):
try:
# Scroll each li element into view
driver.execute_script("arguments[0].scrollIntoView();", li)
# Click the <li> using JavaScript
driver.execute_script("arguments[0].click();", li)
print(f"Clicked LI {index} using JavaScript.")
time.sleep(2)
try:
span = li.find_element(By.CLASS_NAME, "W7g1Rb-rymPhb-fpDzbe-fmcmS")
span_content = span.get_attribute("innerText")
print(f"Extracted span content for LI {index}: {span_content}")
data = scrape_google_trends(driver)
result_dict[f"{span_content}"] = data
except Exception as e:
print(f"Could not find or extract span content in LI {index}: {e}")
span_content = f"iteration_{index}"
result_dict[f"{span_content}"] = []
except Exception as e:
print(f"Error interacting with LI {index}: {e}")
except Exception as e:
print(f"Error during click and scrape: {e}")
finally:
driver.quit()
return result_dict
def process_selenium_row(index, rows, driver):
"""Extract dynamic data using Selenium by clicking on the row."""
max_retries = 3
for attempt in range(max_retries):
try:
articles = {}
driver.execute_script("arguments[0].click();", rows[index]) # Use JavaScript click for stability
# Wait for the articles to load dynamically
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CLASS_NAME, "xZCHj"))
)
# Fetch only the newly loaded articles
articles = driver.find_elements(By.CLASS_NAME, "xZCHj")
# Extract data from the current row only
dynamic_data = {
"article": [
{
"href": article.get_attribute("href"),
"title": article.text
}
for article in articles
]
}
# Clear previously fetched articles and return current ones
return dynamic_data
except Exception as e:
error = e
print(f"Failed to process row {index} after {max_retries} attempts.")
return {"article": []}
def scrape_google_trends(driver):
"""Scrape data dynamically from the current page."""
all_data = []
try:
selenium_rows = None
WebDriverWait(driver, 2).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '[jsname="oKdM2c"]'))
)
soup = BeautifulSoup(driver.page_source, "html.parser")
selenium_rows = driver.find_elements(By.CSS_SELECTOR, '[jsname="oKdM2c"]')
tables = soup.select('[jsname="cC57zf"]')
for table in tables:
rows_bs = table.find_all("tr")
for index, row_bs in enumerate(rows_bs):
static_data = [
[div.get_text(strip=True) for div in cell.find_all("div")]
for cell in row_bs.find_all("td")[1:4]
]
dynamic_data = process_selenium_row(index, selenium_rows, driver)
combined_row = {
"static_data": static_data,
"dynamic_data": dynamic_data
}
all_data.append(combined_row)
return all_data
except Exception as e:
with open(f"page_source_debug.html", "w", encoding="utf-8") as f:
f.write(driver.page_source)
print(f"An error occurred during scraping: {e}")
return []
def crawl_url(url="https://trends.google.com/trends/trendingsearches/daily?geo=AT"):
"""Main function to crawl dynamically and scrape Google Trends."""
driver = setup_driver()
results = click_and_scrape(driver,url)
return results
if __name__ == "__main__":
results = crawl_url()
try:
with open("results.json", "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=4)
except Exception as e:
print(f"Error writing results to JSON: {e}")