from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.common.exceptions import ElementClickInterceptedException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from webdriver_manager.chrome import ChromeDriverManager
import time
import json
# Configure Chrome options
chrome_options = Options()
chrome_options.add_argument("--headless") # Run in headless mode
chrome_options.add_argument("--disable-gpu")
def setup_driver():
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--lang=de")
wd = webdriver.Chrome(options=options)
return wd
def click_and_scrape(driver, url):
"""Click each li element and scrape data."""
result_dict = {}
try:
driver.get(url)
# Wait for the ul element to load
try:
# Wait for the ul element with the specific aria-label to load
ul_element = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]"))
)
li_elements = ul_element.find_elements(By.TAG_NAME, "li")
except Exception as e:
print(f"Error locating ul_element: {e}")
selected_elements = [li_elements[2]] + li_elements[4:]
for index, li in enumerate(selected_elements):
try:
# Scroll each li element into view
driver.execute_script("arguments[0].scrollIntoView();", li)
# Click the
using JavaScript
driver.execute_script("arguments[0].click();", li)
print(f"Clicked LI {index} using JavaScript.")
time.sleep(2)
try:
span = li.find_element(By.CLASS_NAME, "W7g1Rb-rymPhb-fpDzbe-fmcmS")
span_content = span.get_attribute("innerText")
print(f"Extracted span content for LI {index}: {span_content}")
data = scrape_google_trends(driver)
result_dict[f"{span_content}"] = data
except Exception as e:
print(f"Could not find or extract span content in LI {index}: {e}")
span_content = f"iteration_{index}"
result_dict[f"{span_content}"] = []
except Exception as e:
print(f"Error interacting with LI {index}: {e}")
except Exception as e:
print(f"Error during click and scrape: {e}")
finally:
driver.quit()
return result_dict
def process_selenium_row(index, rows, driver):
"""Extract dynamic data using Selenium by clicking on the row."""
max_retries = 3
for attempt in range(max_retries):
try:
articles = {}
driver.execute_script("arguments[0].click();", rows[index]) # Use JavaScript click for stability
# Wait for the articles to load dynamically
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CLASS_NAME, "xZCHj"))
)
# Fetch only the newly loaded articles
articles = driver.find_elements(By.CLASS_NAME, "xZCHj")
# Extract data from the current row only
dynamic_data = {
"article": [
{
"href": article.get_attribute("href"),
"title": article.text
}
for article in articles
]
}
# Clear previously fetched articles and return current ones
return dynamic_data
except Exception as e:
error = e
print(f"Failed to process row {index} after {max_retries} attempts.")
return {"article": []}
def scrape_google_trends(driver):
"""Scrape data dynamically from the current page."""
all_data = []
try:
selenium_rows = None
WebDriverWait(driver, 2).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '[jsname="oKdM2c"]'))
)
soup = BeautifulSoup(driver.page_source, "html.parser")
selenium_rows = driver.find_elements(By.CSS_SELECTOR, '[jsname="oKdM2c"]')
tables = soup.select('[jsname="cC57zf"]')
for table in tables:
rows_bs = table.find_all("tr")
for index, row_bs in enumerate(rows_bs):
static_data = [
[div.get_text(strip=True) for div in cell.find_all("div")]
for cell in row_bs.find_all("td")[1:4]
]
dynamic_data = process_selenium_row(index, selenium_rows, driver)
combined_row = {
"static_data": static_data,
"dynamic_data": dynamic_data
}
all_data.append(combined_row)
return all_data
except Exception as e:
with open(f"page_source_debug.html", "w", encoding="utf-8") as f:
f.write(driver.page_source)
print(f"An error occurred during scraping: {e}")
return []
def crawl_url(url="https://trends.google.com/trends/trendingsearches/daily?geo=AT"):
"""Main function to crawl dynamically and scrape Google Trends."""
driver = setup_driver()
results = click_and_scrape(driver,url)
return results
if __name__ == "__main__":
results = crawl_url()
try:
with open("results.json", "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=4)
except Exception as e:
print(f"Error writing results to JSON: {e}")