import requests from bs4 import BeautifulSoup import pandas as pd from urllib.parse import quote import streamlit as st import json import time import logging from requests_html import HTMLSession from selenium import webdriver from selenium.common.exceptions import WebDriverException from selenium.webdriver.common.by import By logging.basicConfig(level=logging.DEBUG) @st.cache_data def scrape_klikindomaret(nama_barang, num_items): products = [] page = 1 query = quote(nama_barang) while len(products) < num_items : if len (products) > num_items : products = products[:num_items] break url = f"https://www.klikindomaret.com/search/?key={query}&categories=&productbrandid=&sortcol=&pagesize=54&page={page}&startprice=&endprice=&attributes=&ShowItem=" response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser') product_list = soup.find_all('a', href=True) for product in product_list: product_href = product['href'] if '/product/' in product_href: product_name = product.find('div', class_='title').text.strip() product_price = product.find('span', class_='normal price-value').text.strip() # Cek apakah ada harga sebelum diskon dan persentase diskon discount_element = product.find('span', class_='strikeout disc-price') discount_percentage = "" original_price = "" if discount_element: discount_percentage = discount_element.find('span', class_='discount').text.strip() original_price = discount_element.text.replace(discount_percentage, '').strip() else: # Jika tidak ada diskon, set discount_percentage ke "0%" dan original_price ke product_price discount_percentage = "0%" original_price = product_price product_link = f"https://www.klikindomaret.com{product_href}" products.append({ 'product': product_name, 'original_price': original_price, 'discount_percentage': discount_percentage, 'price': product_price, 'link': product_link }) if len (products) > num_items : products = products[:num_items] break page += 1 return products @st.cache_data def scrape_tokped(nama_barang, num_items): products = [] page = 1 query = quote(nama_barang) while len(products) < num_items : url = f'https://www.tokopedia.com/search?navsource=&page={page}&q={query}&srp_component_id=02.01.00.00&srp_page_id=&srp_page_title=&st=' headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive' } timeout = 10 try : response = requests.get(url, headers = headers, timeout = timeout) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') product_container_list = soup.find_all('a', class_="pcv3__info-content css-gwkf0u", href = True) for product_info in product_container_list: link = product_info['href'] st.write(link) title_element = product_info.find('div', class_="prd_link-product-name css-3um8ox") title = title_element.text.strip() if title_element else None harga_element = product_info.find('div', class_="prd_link-product-price css-h66vau") harga = harga_element.text.strip() if harga_element else None terjual_element = product_info.find('span', class_="prd_label-integrity css-1sgek4h") terjual = terjual_element.text if terjual_element else None rating_element = product_info.find('span', class_='prd_rating-average-text css-t70v7i') rating = rating_element.text if rating_element else None toko_element = product_info.find('span', class_="prd_link-shop-name css-1kdc32b flip") toko = toko_element.text.strip() if toko_element else None asal_product_element = product_info.find('span', class_="prd_link-shop-loc css-1kdc32b flip") asal_product = asal_product_element.text.strip() if asal_product_element else None products.append({ 'link': link, 'produk' : title, 'harga' : harga, 'terjual' : terjual, 'rating' : rating, 'toko' : toko, 'asal_product' : asal_product, }) if len(products) >= num_items: products = products[:num_items] break except requests.exceptions.RequestException as e: logging.error(f"Terjadi kesalahan saat mengirim permintaan: {e}") st.error(f"Terjadi kesalahan saat mengirim permintaan: {e}") break except requests.exceptions.HTTPError as e: logging.error(f"HTTP Error: {e}") st.error(f"HTTP Error: {e}") break except Exception as e: logging.error(f"Terjadi kesalahan yang tidak diketahui: {e}") st.error(f"Terjadi kesalahan yang tidak diketahui: {e}") break page += 1 return products @st.cache_data def scrape_tokped_with_selenium(nama_barang, num_items): products = [] page = 1 query = quote(nama_barang) options = webdriver.ChromeOptions() options.add_argument('--no-sandbox') #options.add_argument('--headless') options.add_argument('--disable-notifications') options.add_argument('--disable-infobars') options.add_argument('--disable-dev-shm-usage') driver = webdriver.Chrome(options=options) while len(products) < num_items : try : url = f'https://www.tokopedia.com/search?navsource=&page={page}&q={query}&srp_component_id=02.01.00.00&srp_page_id=&srp_page_title=&st=' driver.get(url) # Eksekusi JavaScript untuk mengatur header driver.execute_script( """ var xhr = new XMLHttpRequest(); xhr.open('GET', arguments[0], false); xhr.setRequestHeader('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'); xhr.send(null); """ , url ) # Dapatkan sumber halaman setelah eksekusi JavaScript html = driver.page_source # Gunakan BeautifulSoup untuk melakukan parsing HTML soup = BeautifulSoup(html, "html.parser") # Cari semua elemen yang sesuai product_container_list = soup.find_all('a', class_="pcv3__info-content css-gwkf0u", href=True) for product_info in product_container_list: link = product_info['href'] st.write(link) title_element = product_info.find('div', class_="prd_link-product-name css-3um8ox") title = title_element.text.strip() if title_element else None harga_element = product_info.find('div', class_="prd_link-product-price css-h66vau") harga = harga_element.text.strip() if harga_element else None terjual_element = product_info.find('span', class_="prd_label-integrity css-1sgek4h") terjual = terjual_element.text if terjual_element else None rating_element = product_info.find('span', class_='prd_rating-average-text css-t70v7i') rating = rating_element.text if rating_element else None toko_element = product_info.find('span', class_="prd_link-shop-name css-1kdc32b flip") toko = toko_element.text.strip() if toko_element else None asal_product_element = product_info.find('span', class_="prd_link-shop-loc css-1kdc32b flip") asal_product = asal_product_element.text.strip() if asal_product_element else None products.append({ 'link': link, 'produk' : title, 'harga' : harga, 'terjual' : terjual, 'rating' : rating, 'toko' : toko, 'asal_product' : asal_product, }) if len(products) >= num_items: products = products[:num_items] break except requests.exceptions.RequestException as e: logging.error(f"Terjadi kesalahan saat mengirim permintaan: {e}") st.error(f"Terjadi kesalahan saat mengirim permintaan: {e}") break except requests.exceptions.HTTPError as e: logging.error(f"HTTP Error: {e}") st.error(f"HTTP Error: {e}") break except Exception as e: logging.error(f"Terjadi kesalahan yang tidak diketahui: {e}") st.error(f"Terjadi kesalahan yang tidak diketahui: {e}") break except WebDriverException as e: st.error(f"An error occurred: {e}") break finally: if driver: driver.quit() page += 1 return products #---------------------------------------------------User Interface---------------------------------------------------------------------- # Streamlit UI st.title("Scraping E-Commerce") with st.expander("Settings :"): # Pilihan untuk memilih situs web selected_site = st.selectbox("Pilih Situs Web :", ["klikindomaret.com", "tokopedia.com", "tokopedia.com(selenium)"]) nama_barang = st.text_input("Masukkan Nama Barang :") num_items = st.number_input("Masukkan Estimasi Banyak Data :", min_value = 1, step = 1, placeholder="Type a number...") download_format = st.selectbox("Pilih Format Unduhan :", ["XLSX", "CSV", "JSON"]) st.info('Tekan "Mulai Scraping" kembali jika tampilan menghilang ', icon="ℹ️") # Variabel tersembunyi untuk menyimpan hasil scraping hidden_data = [] scraping_done = False # Tambahkan variabel ini if selected_site == "klikindomaret.com": if st.button("Mulai Scraping"): if not nama_barang: st.error("Mohon isi Nama Barang.") else: scraped_products = scrape_klikindomaret(nama_barang, num_items) hidden_data = scraped_products # Simpan data ke dalam variabel tersembunyi scraping_done = True # Set scraping_done menjadi True if selected_site =="tokopedia.com": st.error("Jika mengalami error karena sedang dalam pengembangan. Silahkan pilih situs yang lain") if st.button("Mulai Scraping"): if not nama_barang: st.error("Mohon isi Nama Barang.") else: scraped_products = scrape_tokped(nama_barang, num_items) hidden_data = scraped_products # Simpan data ke dalam variabel tersembunyi scraping_done = True # Set scraping_done menjadi True if selected_site == "tokopedia.com(selenium)": st.error("Jika mengalami error karena sedang dalam pengembangan. Silahkan pilih situs yang lain") if st.button("Mulai Scraping"): if not nama_barang: st.error("Mohon isi Nama Barang.") else: scraped_products = scrape_tokped_with_selenium(nama_barang, num_items) hidden_data = scraped_products # Simpan data ke dalam variabel tersembunyi scraping_done = True # Set scraping_done menjadi True # Simpan DataFrame ke dalam file output_file = f"scraped_{selected_site}_{nama_barang}.xlsx" output_file_csv = f"scraped_{selected_site}_{nama_barang}.csv" output_file_json = f"scraped_{selected_site}_{nama_barang}.json" #---------------------------------------------------Download File & Hasil Scraping---------------------------------------------------------------------- # Tampilkan hasil scraping if scraping_done: if hidden_data: # Menampilkan hasil sentimen dalam kotak yang dapat diperluas with st.expander(f"Hasil Scraping {selected_site} :"): st.write(pd.DataFrame(scraped_products)) if download_format == "XLSX": df = pd.DataFrame(scraped_products) df.to_excel(output_file, index=False) st.download_button(label=f"Unduh XLSX ({len(hidden_data)} data)", data=open(output_file, "rb").read(), key="xlsx_download", file_name=output_file) elif download_format == "CSV": df = pd.DataFrame(scraped_products) csv = df.to_csv(index=False) st.download_button(label=f"Unduh CSV ({len(hidden_data)} data)", data=csv, key="csv_download", file_name=output_file_csv) elif download_format == "JSON": json_data = pd.DataFrame(scraped_products).to_json(orient="records") st.download_button(label=f"Unduh JSON ({len(hidden_data)} data)", data=json_data, key="json_download", file_name=output_file_json) elif not hidden_data: st.warning(f"Tidak ada data pada query '{nama_barang}'", icon="⚠️") if not scraping_done: st.write("Tidak ada data untuk diunduh.") st.divider() github_link = "https://github.com/naufalnashif/" st.markdown(f"GitHub: [{github_link}]({github_link})") instagram_link = "https://www.instagram.com/naufal.nashif/" st.markdown(f"Instagram: [{instagram_link}]({instagram_link})") st.write('Terima kasih telah mencoba demo ini!')