Spaces:
Running
Running
from selenium.common.exceptions import ElementClickInterceptedException | |
from bs4 import BeautifulSoup | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
import json | |
import time | |
# Configure Chrome options | |
def setup_driver(): | |
options = Options() | |
options.add_argument("--headless") | |
options.add_argument('--no-sandbox') | |
options.add_argument('--disable-dev-shm-usage') | |
options.add_argument("--lang=de") | |
return webdriver.Chrome(options=options) | |
def click_and_scrape(driver, url): | |
"""Click each li element and scrape data.""" | |
result_dict = {} | |
try: | |
driver.get(url) | |
for attempt in range(4): | |
try: | |
button = WebDriverWait(driver, 20).until( | |
EC.element_to_be_clickable(( | |
By.XPATH, | |
"//button[@aria-label='Alle Kategorien, Kategorie auswählen']" | |
)) | |
) | |
print("Button located.") | |
driver.execute_script("arguments[0].scrollIntoView();", button) | |
print(button.get_attribute("outerHTML")) | |
button.click() | |
print("Button clicked successfully.") | |
break | |
except ElementClickInterceptedException: | |
print(f"Attempt {attempt + 1}: Click intercepted. Retrying...") | |
try: | |
ul_element = WebDriverWait(driver, 20).until( | |
EC.presence_of_element_located(( | |
By.XPATH, | |
"//ul[@aria-label='Kategorie']" | |
)) | |
) | |
li_elements = ul_element.find_elements(By.TAG_NAME, "li") | |
except Exception as e: | |
print(f"Error locating ul_element: {e}") | |
selected_elements = [li_elements[2]] + li_elements[4:] | |
for index, li in enumerate(selected_elements): | |
try: | |
driver.execute_script("arguments[0].scrollIntoView();", li) | |
driver.execute_script("arguments[0].click();", li) | |
print(f"Clicked LI {index} using JavaScript.") | |
time.sleep(2) | |
try: | |
span = li.find_element(By.CLASS_NAME, "W7g1Rb-rymPhb-fpDzbe-fmcmS") | |
span_content = span.get_attribute("innerText") | |
print(f"Extracted span content for LI {index}: {span_content}") | |
data = scrape_google_trends(driver) | |
result_dict[f"{span_content}"] = data | |
except Exception as e: | |
print(f"Could not find or extract span content in LI {index}: {e}") | |
span_content = f"iteration_{index}" | |
result_dict[f"{span_content}"] = [] | |
except Exception as e: | |
print(f"Error interacting with LI {index}: {e}") | |
except Exception as e: | |
print(f"Error during click and scrape: {e}") | |
finally: | |
driver.quit() | |
return result_dict | |
def process_selenium_row(index, rows, driver): | |
"""Extract dynamic data using Selenium by clicking on the row.""" | |
max_retries = 3 | |
for attempt in range(max_retries): | |
try: | |
articles = {} | |
driver.execute_script("arguments[0].click();", rows[index]) | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_all_elements_located((By.CLASS_NAME, "xZCHj")) | |
) | |
articles = driver.find_elements(By.CLASS_NAME, "xZCHj") | |
articles = articles[:3] | |
dynamic_data = { | |
"article": [ | |
{ | |
"href": article.get_attribute("href"), | |
"title": article.text | |
} | |
for article in articles | |
] | |
} | |
# Clear previously fetched articles and return current ones | |
return dynamic_data | |
except Exception as e: | |
error = e | |
print(f"Failed to process row {index} after {max_retries} attempts.") | |
return {"article": []} | |
def scrape_google_trends(driver): | |
"""Scrape data dynamically from the current page.""" | |
all_data = [] | |
try: | |
selenium_rows = None | |
WebDriverWait(driver, 2).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, '[jsname="oKdM2c"]')) | |
) | |
soup = BeautifulSoup(driver.page_source, "html.parser") | |
selenium_rows = driver.find_elements(By.CSS_SELECTOR, '[jsname="oKdM2c"]') | |
tables = soup.select('[jsname="cC57zf"]') | |
for table in tables: | |
rows_bs = table.find_all("tr") | |
for index, row_bs in enumerate(rows_bs): | |
static_data = [ | |
[div.get_text(strip=True) for div in cell.find_all("div")] | |
for cell in row_bs.find_all("td")[1:4] | |
] | |
dynamic_data = process_selenium_row(index, selenium_rows, driver) | |
combined_row = { | |
"static_data": static_data, | |
"dynamic_data": dynamic_data | |
} | |
all_data.append(combined_row) | |
return all_data | |
except Exception as e: | |
with open(f"page_source_debug.html", "w", encoding="utf-8") as f: | |
f.write(driver.page_source) | |
print(f"An error occurred during scraping: {e}") | |
return [] | |
def process_li_element(index, li_data, url): | |
"""Process a single li element.""" | |
driver = setup_driver() | |
try: | |
print("driver.get") | |
driver.get(url) | |
WebDriverWait(driver, 20).until( | |
EC.presence_of_element_located((By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]")) | |
) | |
print("1") | |
ul_element = driver.find_element(By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]") | |
print("2") | |
li_elements = ul_element.find_elements(By.TAG_NAME, "li") | |
print("2") | |
selected_li = li_elements[li_data['index']] | |
print("2") | |
driver.execute_script("arguments[0].scrollIntoView();", selected_li) | |
print("3") | |
driver.execute_script("arguments[0].click();", selected_li) | |
time.sleep(2) | |
span_content = selected_li.find_element(By.CLASS_NAME, "W7g1Rb-rymPhb-fpDzbe-fmcmS").get_attribute("innerText") | |
print("4") | |
print(f"LI {li_data['index']} clicked: {span_content}") | |
data = scrape_google_trends(driver) | |
return {span_content: data} | |
except Exception as e: | |
print(f"Error processing LI {index}: {e}") | |
return {} | |
finally: | |
driver.quit() | |
def crawl_url(url): | |
"""Click each li element and scrape data in parallel.""" | |
driver = setup_driver() | |
result_dict = {} | |
try: | |
driver.get(url) | |
WebDriverWait(driver, 20).until( | |
EC.presence_of_element_located((By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]")) | |
) | |
ul_element = driver.find_element(By.XPATH, "//ul[contains(@aria-label, 'Kategorie') or contains(@aria-label, 'Category')]") | |
li_elements = ul_element.find_elements(By.TAG_NAME, "li") | |
selected_elements = [{"index": i} for i in range(2, len(li_elements)) if i != 3] | |
with ThreadPoolExecutor() as executor: | |
futures = [executor.submit(process_li_element, idx, li_data, url) for idx, li_data in enumerate(selected_elements)] | |
for future in as_completed(futures): | |
result = future.result() | |
result_dict.update(result) | |
except Exception as e: | |
print(f"Error during click and scrape: {e}") | |
finally: | |
driver.quit() | |
return result_dict | |