Add1E commited on
Commit
2dc16bf
·
verified ·
1 Parent(s): edb3c11

Update trend_crawl2.py

Browse files
Files changed (1) hide show
  1. trend_crawl2.py +187 -202
trend_crawl2.py CHANGED
@@ -1,202 +1,187 @@
1
- from selenium import webdriver
2
- from selenium.webdriver.common.by import By
3
- from selenium.webdriver.chrome.service import Service as ChromeService
4
- from selenium.common.exceptions import ElementClickInterceptedException
5
- from selenium.webdriver.chrome.options import Options
6
- from selenium.webdriver.support.ui import WebDriverWait
7
- from selenium.webdriver.support import expected_conditions as EC
8
- from bs4 import BeautifulSoup
9
- from webdriver_manager.chrome import ChromeDriverManager
10
- import time
11
- import json
12
-
13
- # Configure Chrome options
14
- chrome_options = Options()
15
- chrome_options.add_argument("--headless") # Run in headless mode
16
- chrome_options.add_argument("--disable-gpu")
17
-
18
-
19
- def setup_driver():
20
- options = webdriver.ChromeOptions()
21
- options.add_argument('--headless')
22
- options.add_argument('--no-sandbox')
23
- options.add_argument('--disable-dev-shm-usage')
24
- wd = webdriver.Chrome(options=options)
25
- return wd
26
-
27
- def click_and_scrape(driver, url):
28
- """Click each li element and scrape data."""
29
- result_dict = {}
30
- try:
31
- driver.get(url)
32
-
33
- for attempt in range(4):
34
- try:
35
- button = WebDriverWait(driver, 20).until(
36
- EC.element_to_be_clickable((
37
- By.XPATH,
38
- "//button[@aria-label='Alle Kategorien, Kategorie auswählen']"
39
- ))
40
- )
41
- print("Button located.")
42
-
43
- # Scroll into view to ensure visibility
44
- driver.execute_script("arguments[0].scrollIntoView();", button)
45
- print(button.get_attribute("outerHTML"))
46
-
47
-
48
- button.click()
49
- print("Button clicked successfully.")
50
- break
51
- except ElementClickInterceptedException:
52
- print(f"Attempt {attempt + 1}: Click intercepted. Retrying...")
53
-
54
-
55
-
56
- # Wait for the ul element to load
57
- try:
58
- # Wait for the ul element with the specific aria-label to load
59
- ul_element = WebDriverWait(driver, 20).until(
60
- EC.presence_of_element_located((
61
- By.XPATH,
62
- "//ul[@aria-label='Kategorie']"
63
- ))
64
- )
65
- li_elements = ul_element.find_elements(By.TAG_NAME, "li")
66
- except Exception as e:
67
- print(f"Error locating ul_element: {e}")
68
- selected_elements = [li_elements[2]] + li_elements[4:]
69
- for index, li in enumerate(selected_elements):
70
- try:
71
- # Scroll each li element into view
72
- driver.execute_script("arguments[0].scrollIntoView();", li)
73
- # Click the <li> using JavaScript
74
- driver.execute_script("arguments[0].click();", li)
75
- print(f"Clicked LI {index} using JavaScript.")
76
- time.sleep(2)
77
- try:
78
- span = li.find_element(By.CLASS_NAME, "W7g1Rb-rymPhb-fpDzbe-fmcmS")
79
- span_content = span.get_attribute("innerText")
80
- print(f"Extracted span content for LI {index}: {span_content}")
81
- data = scrape_google_trends(driver)
82
- result_dict[f"{span_content}"] = data
83
- except Exception as e:
84
- print(f"Could not find or extract span content in LI {index}: {e}")
85
- span_content = f"iteration_{index}"
86
- result_dict[f"{span_content}"] = []
87
-
88
- except Exception as e:
89
- print(f"Error interacting with LI {index}: {e}")
90
-
91
- # for index, li in enumerate(li_elements):
92
- # try:
93
- # # Click each li element
94
- # driver.execute_script("arguments[0].scrollIntoView();", li) # Ensure li is in view
95
- # li.click()
96
- # time.sleep(1) # Slight delay to ensure loading
97
-
98
- # # Wait for content to load dynamically
99
- # WebDriverWait(driver, 10).until(
100
- # EC.presence_of_all_elements_located((By.CLASS_NAME, "xZCHj"))
101
- # )
102
-
103
- # # Extract data using scrape_google_trends logic
104
- # data = scrape_google_trends(driver)
105
-
106
- # # Save results to the dictionary
107
- # result_dict[f"iteration_{index}"] = data
108
-
109
- # except Exception as e:
110
- # print(f"Error processing li element {index}: {e}")
111
-
112
- except Exception as e:
113
- print(f"Error during click and scrape: {e}")
114
-
115
- finally:
116
- driver.quit()
117
-
118
- return result_dict
119
-
120
- def process_selenium_row(index, rows, driver):
121
- """Extract dynamic data using Selenium by clicking on the row."""
122
- max_retries = 3
123
- for attempt in range(max_retries):
124
- try:
125
- articles = {}
126
-
127
- driver.execute_script("arguments[0].click();", rows[index]) # Use JavaScript click for stability
128
-
129
- # Wait for the articles to load dynamically
130
- WebDriverWait(driver, 10).until(
131
- EC.presence_of_all_elements_located((By.CLASS_NAME, "xZCHj"))
132
- )
133
-
134
- # Fetch only the newly loaded articles
135
- articles = driver.find_elements(By.CLASS_NAME, "xZCHj")
136
- # Extract data from the current row only
137
- dynamic_data = {
138
- "article": [
139
- {
140
- "href": article.get_attribute("href"),
141
- "title": article.text
142
- }
143
- for article in articles
144
- ]
145
- }
146
-
147
- # Clear previously fetched articles and return current ones
148
- return dynamic_data
149
-
150
- except Exception as e:
151
- error = e
152
-
153
- print(f"Failed to process row {index} after {max_retries} attempts.")
154
- return {"article": []}
155
-
156
- def scrape_google_trends(driver):
157
- """Scrape data dynamically from the current page."""
158
- all_data = []
159
- try:
160
- selenium_rows = None
161
- WebDriverWait(driver, 2).until(
162
- EC.presence_of_element_located((By.CSS_SELECTOR, '[jsname="oKdM2c"]'))
163
- )
164
- soup = BeautifulSoup(driver.page_source, "html.parser")
165
- selenium_rows = driver.find_elements(By.CSS_SELECTOR, '[jsname="oKdM2c"]')
166
- tables = soup.select('[jsname="cC57zf"]')
167
-
168
- for table in tables:
169
- rows_bs = table.find_all("tr")
170
- for index, row_bs in enumerate(rows_bs):
171
- static_data = [
172
- [div.get_text(strip=True) for div in cell.find_all("div")]
173
- for cell in row_bs.find_all("td")[1:4]
174
- ]
175
- dynamic_data = process_selenium_row(index, selenium_rows, driver)
176
- combined_row = {
177
- "static_data": static_data,
178
- "dynamic_data": dynamic_data
179
- }
180
- all_data.append(combined_row)
181
-
182
- return all_data
183
-
184
- except Exception as e:
185
- with open(f"page_source_debug.html", "w", encoding="utf-8") as f:
186
- f.write(driver.page_source)
187
- print(f"An error occurred during scraping: {e}")
188
- return []
189
-
190
- def crawl_url(url="https://trends.google.com/trends/trendingsearches/daily?geo=AT"):
191
- """Main function to crawl dynamically and scrape Google Trends."""
192
- driver = setup_driver()
193
- results = click_and_scrape(driver,url)
194
- return results
195
-
196
- if __name__ == "__main__":
197
- results = crawl_url()
198
- try:
199
- with open("results.json", "w", encoding="utf-8") as f:
200
- json.dump(results, f, ensure_ascii=False, indent=4)
201
- except Exception as e:
202
- print(f"Error writing results to JSON: {e}")
 
1
+ from selenium import webdriver
2
+ from selenium.webdriver.common.by import By
3
+ from selenium.webdriver.chrome.service import Service as ChromeService
4
+ from selenium.common.exceptions import ElementClickInterceptedException
5
+ from selenium.webdriver.chrome.options import Options
6
+ from selenium.webdriver.support.ui import WebDriverWait
7
+ from selenium.webdriver.support import expected_conditions as EC
8
+ from bs4 import BeautifulSoup
9
+ from webdriver_manager.chrome import ChromeDriverManager
10
+ import time
11
+ import json
12
+
13
+ # Configure Chrome options
14
+ chrome_options = Options()
15
+ chrome_options.add_argument("--headless") # Run in headless mode
16
+ chrome_options.add_argument("--disable-gpu")
17
+
18
+
19
+ def setup_driver():
20
+ options = webdriver.ChromeOptions()
21
+ options.add_argument('--headless')
22
+ options.add_argument('--no-sandbox')
23
+ options.add_argument('--disable-dev-shm-usage')
24
+ wd = webdriver.Chrome(options=options)
25
+ return wd
26
+
27
+ def click_and_scrape(driver, url):
28
+ """Click each li element and scrape data."""
29
+ result_dict = {}
30
+ try:
31
+ driver.get(url)
32
+
33
+ for attempt in range(4):
34
+ try:
35
+ # Write the page source into an HTML file
36
+ with open("output.html", "w", encoding="utf-8") as file:
37
+ file.write(page_source)
38
+
39
+ print("Page source has been written to output.html.")
40
+ button = WebDriverWait(driver, 20).until(
41
+ EC.element_to_be_clickable((
42
+ By.XPATH,
43
+ "//button[@aria-label='Alle Kategorien, Kategorie auswählen']"
44
+ ))
45
+ )
46
+ print("Button located.")
47
+
48
+ # Scroll into view to ensure visibility
49
+ driver.execute_script("arguments[0].scrollIntoView();", button)
50
+ print(button.get_attribute("outerHTML"))
51
+
52
+
53
+ button.click()
54
+ print("Button clicked successfully.")
55
+ break
56
+ except ElementClickInterceptedException:
57
+ print(f"Attempt {attempt + 1}: Click intercepted. Retrying...")
58
+
59
+
60
+
61
+ # Wait for the ul element to load
62
+ try:
63
+ # Wait for the ul element with the specific aria-label to load
64
+ ul_element = WebDriverWait(driver, 20).until(
65
+ EC.presence_of_element_located((
66
+ By.XPATH,
67
+ "//ul[@aria-label='Kategorie']"
68
+ ))
69
+ )
70
+ li_elements = ul_element.find_elements(By.TAG_NAME, "li")
71
+ except Exception as e:
72
+ print(f"Error locating ul_element: {e}")
73
+ selected_elements = [li_elements[2]] + li_elements[4:]
74
+ for index, li in enumerate(selected_elements):
75
+ try:
76
+ # Scroll each li element into view
77
+ driver.execute_script("arguments[0].scrollIntoView();", li)
78
+ # Click the <li> using JavaScript
79
+ driver.execute_script("arguments[0].click();", li)
80
+ print(f"Clicked LI {index} using JavaScript.")
81
+ time.sleep(2)
82
+ try:
83
+ span = li.find_element(By.CLASS_NAME, "W7g1Rb-rymPhb-fpDzbe-fmcmS")
84
+ span_content = span.get_attribute("innerText")
85
+ print(f"Extracted span content for LI {index}: {span_content}")
86
+ data = scrape_google_trends(driver)
87
+ result_dict[f"{span_content}"] = data
88
+ except Exception as e:
89
+ print(f"Could not find or extract span content in LI {index}: {e}")
90
+ span_content = f"iteration_{index}"
91
+ result_dict[f"{span_content}"] = []
92
+
93
+ except Exception as e:
94
+ print(f"Error interacting with LI {index}: {e}")
95
+
96
+
97
+ except Exception as e:
98
+ print(f"Error during click and scrape: {e}")
99
+
100
+ finally:
101
+ driver.quit()
102
+
103
+ return result_dict
104
+
105
+ def process_selenium_row(index, rows, driver):
106
+ """Extract dynamic data using Selenium by clicking on the row."""
107
+ max_retries = 3
108
+ for attempt in range(max_retries):
109
+ try:
110
+ articles = {}
111
+
112
+ driver.execute_script("arguments[0].click();", rows[index]) # Use JavaScript click for stability
113
+
114
+ # Wait for the articles to load dynamically
115
+ WebDriverWait(driver, 10).until(
116
+ EC.presence_of_all_elements_located((By.CLASS_NAME, "xZCHj"))
117
+ )
118
+
119
+ # Fetch only the newly loaded articles
120
+ articles = driver.find_elements(By.CLASS_NAME, "xZCHj")
121
+ # Extract data from the current row only
122
+ dynamic_data = {
123
+ "article": [
124
+ {
125
+ "href": article.get_attribute("href"),
126
+ "title": article.text
127
+ }
128
+ for article in articles
129
+ ]
130
+ }
131
+
132
+ # Clear previously fetched articles and return current ones
133
+ return dynamic_data
134
+
135
+ except Exception as e:
136
+ error = e
137
+
138
+ print(f"Failed to process row {index} after {max_retries} attempts.")
139
+ return {"article": []}
140
+
141
+ def scrape_google_trends(driver):
142
+ """Scrape data dynamically from the current page."""
143
+ all_data = []
144
+ try:
145
+ selenium_rows = None
146
+ WebDriverWait(driver, 2).until(
147
+ EC.presence_of_element_located((By.CSS_SELECTOR, '[jsname="oKdM2c"]'))
148
+ )
149
+ soup = BeautifulSoup(driver.page_source, "html.parser")
150
+ selenium_rows = driver.find_elements(By.CSS_SELECTOR, '[jsname="oKdM2c"]')
151
+ tables = soup.select('[jsname="cC57zf"]')
152
+
153
+ for table in tables:
154
+ rows_bs = table.find_all("tr")
155
+ for index, row_bs in enumerate(rows_bs):
156
+ static_data = [
157
+ [div.get_text(strip=True) for div in cell.find_all("div")]
158
+ for cell in row_bs.find_all("td")[1:4]
159
+ ]
160
+ dynamic_data = process_selenium_row(index, selenium_rows, driver)
161
+ combined_row = {
162
+ "static_data": static_data,
163
+ "dynamic_data": dynamic_data
164
+ }
165
+ all_data.append(combined_row)
166
+
167
+ return all_data
168
+
169
+ except Exception as e:
170
+ with open(f"page_source_debug.html", "w", encoding="utf-8") as f:
171
+ f.write(driver.page_source)
172
+ print(f"An error occurred during scraping: {e}")
173
+ return []
174
+
175
+ def crawl_url(url="https://trends.google.com/trends/trendingsearches/daily?geo=AT"):
176
+ """Main function to crawl dynamically and scrape Google Trends."""
177
+ driver = setup_driver()
178
+ results = click_and_scrape(driver,url)
179
+ return results
180
+
181
+ if __name__ == "__main__":
182
+ results = crawl_url()
183
+ try:
184
+ with open("results.json", "w", encoding="utf-8") as f:
185
+ json.dump(results, f, ensure_ascii=False, indent=4)
186
+ except Exception as e:
187
+ print(f"Error writing results to JSON: {e}")