webcrawler / trend_crawl2.py
Add1E's picture
Update trend_crawl2.py
c63c43e verified
from selenium.common.exceptions import ElementClickInterceptedException
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import json
import time
# Configure Chrome options
def setup_driver():
options = Options()
options.add_argument("--headless")
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--lang=de")
return webdriver.Chrome(options=options)
def click_and_scrape(driver, url):
"""Click each li element and scrape data."""
result_dict = {}
try:
driver.get(url)
for attempt in range(4):
try:
button = WebDriverWait(driver, 20).until(
EC.element_to_be_clickable((
By.XPATH,
"//button[@aria-label='Alle Kategorien, Kategorie auswählen']"
))
)
print("Button located.")
driver.execute_script("arguments[0].scrollIntoView();", button)
print(button.get_attribute("outerHTML"))
button.click()
print("Button clicked successfully.")
break
except ElementClickInterceptedException:
print(f"Attempt {attempt + 1}: Click intercepted. Retrying...")
try:
ul_element = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((
By.XPATH,
"//ul[@aria-label='Kategorie']"
))
)
li_elements = ul_element.find_elements(By.TAG_NAME, "li")
except Exception as e:
print(f"Error locating ul_element: {e}")
selected_elements = [li_elements[2]] + li_elements[4:]
for index, li in enumerate(selected_elements):
try:
driver.execute_script("arguments[0].scrollIntoView();", li)
driver.execute_script("arguments[0].click();", li)
print(f"Clicked LI {index} using JavaScript.")
time.sleep(2)
try:
span = li.find_element(By.CLASS_NAME, "W7g1Rb-rymPhb-fpDzbe-fmcmS")
span_content = span.get_attribute("innerText")
print(f"Extracted span content for LI {index}: {span_content}")
data = scrape_google_trends(driver)
result_dict[f"{span_content}"] = data
except Exception as e:
print(f"Could not find or extract span content in LI {index}: {e}")
span_content = f"iteration_{index}"
result_dict[f"{span_content}"] = []
except Exception as e:
print(f"Error interacting with LI {index}: {e}")
except Exception as e:
print(f"Error during click and scrape: {e}")
finally:
driver.quit()
return result_dict
def process_selenium_row(index, rows, driver):
"""Extract dynamic data using Selenium by clicking on the row."""
max_retries = 3
for attempt in range(max_retries):
try:
articles = {}
driver.execute_script("arguments[0].click();", rows[index])
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CLASS_NAME, "xZCHj"))
)
articles = driver.find_elements(By.CLASS_NAME, "xZCHj")
articles = articles[:3]
dynamic_data = {
"article": [
{
"href": article.get_attribute("href"),
"title": article.text
}
for article in articles
]
}
# Clear previously fetched articles and return current ones
return dynamic_data
except Exception as e:
error = e
print(f"Failed to process row {index} after {max_retries} attempts.")
return {"article": []}
def scrape_google_trends(driver):
"""Scrape data dynamically from the current page."""
all_data = []
try:
selenium_rows = None
WebDriverWait(driver, 2).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '[jsname="oKdM2c"]'))
)
soup = BeautifulSoup(driver.page_source, "html.parser")
selenium_rows = driver.find_elements(By.CSS_SELECTOR, '[jsname="oKdM2c"]')
tables = soup.select('[jsname="cC57zf"]')
for table in tables:
rows_bs = table.find_all("tr")
for index, row_bs in enumerate(rows_bs):
static_data = [
[div.get_text(strip=True) for div in cell.find_all("div")]
for cell in row_bs.find_all("td")[1:4]
]
dynamic_data = process_selenium_row(index, selenium_rows, driver)
combined_row = {
"static_data": static_data,
"dynamic_data": dynamic_data
}
all_data.append(combined_row)
return all_data
except Exception as e:
with open(f"page_source_debug.html", "w", encoding="utf-8") as f:
f.write(driver.page_source)
print(f"An error occurred during scraping: {e}")
return []
def process_li_element(index, li_data, url):
"""Process a single li element."""
driver = setup_driver()
try:
print("driver.get")
driver.get(url)
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]"))
)
print("1")
ul_element = driver.find_element(By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]")
print("2")
li_elements = ul_element.find_elements(By.TAG_NAME, "li")
print("2")
selected_li = li_elements[li_data['index']]
print("2")
driver.execute_script("arguments[0].scrollIntoView();", selected_li)
print("3")
driver.execute_script("arguments[0].click();", selected_li)
time.sleep(2)
span_content = selected_li.find_element(By.CLASS_NAME, "W7g1Rb-rymPhb-fpDzbe-fmcmS").get_attribute("innerText")
print("4")
print(f"LI {li_data['index']} clicked: {span_content}")
data = scrape_google_trends(driver)
return {span_content: data}
except Exception as e:
print(f"Error processing LI {index}: {e}")
return {}
finally:
driver.quit()
def crawl_url(url):
"""Click each li element and scrape data in parallel."""
driver = setup_driver()
result_dict = {}
try:
driver.get(url)
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]"))
)
ul_element = driver.find_element(By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]")
li_elements = ul_element.find_elements(By.TAG_NAME, "li")
selected_elements = [{"index": i} for i in range(2, len(li_elements)) if i != 3]
with ThreadPoolExecutor() as executor:
futures = [executor.submit(process_li_element, idx, li_data, url) for idx, li_data in enumerate(selected_elements)]
for future in as_completed(futures):
result = future.result()
result_dict.update(result)
except Exception as e:
print(f"Error during click and scrape: {e}")
finally:
driver.quit()
return result_dict