|
import requests |
|
from bs4 import BeautifulSoup |
|
import pandas as pd |
|
from urllib.parse import quote |
|
import streamlit as st |
|
import json |
|
import time |
|
import logging |
|
import random |
|
from lxml_html_clean import Cleaner |
|
from requests_html import HTMLSession |
|
|
|
from selenium import webdriver |
|
from selenium.common.exceptions import WebDriverException |
|
from selenium.webdriver.common.by import By |
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
|
|
@st.cache_data |
|
def scrape_klikindomaret(nama_barang, num_items): |
|
products = [] |
|
page = 1 |
|
query = quote(nama_barang) |
|
|
|
|
|
while len(products) < num_items : |
|
if len (products) > num_items : |
|
products = products[:num_items] |
|
break |
|
url = f"https://www.klikindomaret.com/search/?key={query}&categories=&productbrandid=&sortcol=&pagesize=54&page={page}&startprice=&endprice=&attributes=&ShowItem=" |
|
response = requests.get(url) |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
product_list = soup.find_all('a', href=True) |
|
|
|
for product in product_list: |
|
|
|
product_href = product['href'] |
|
if '/product/' in product_href: |
|
product_name = product.find('div', class_='title').text.strip() |
|
product_price = product.find('span', class_='normal price-value').text.strip() |
|
|
|
|
|
discount_element = product.find('span', class_='strikeout disc-price') |
|
discount_percentage = "" |
|
original_price = "" |
|
if discount_element: |
|
discount_percentage = discount_element.find('span', class_='discount').text.strip() |
|
original_price = discount_element.text.replace(discount_percentage, '').strip() |
|
else: |
|
|
|
discount_percentage = "0%" |
|
original_price = product_price |
|
|
|
product_link = f"https://www.klikindomaret.com{product_href}" |
|
products.append({ |
|
'product': product_name, |
|
'original_price': original_price, |
|
'discount_percentage': discount_percentage, |
|
'price': product_price, |
|
'link': product_link |
|
}) |
|
if len (products) > num_items : |
|
products = products[:num_items] |
|
break |
|
page += 1 |
|
|
|
return products |
|
|
|
@st.cache_data |
|
def scrape_tokped(nama_barang, num_items): |
|
products = [] |
|
page = 1 |
|
query = quote(nama_barang) |
|
while len(products) < num_items : |
|
url = f'https://www.tokopedia.com/search?navsource=&page={page}&q={query}&srp_component_id=02.01.00.00&srp_page_id=&srp_page_title=&st=' |
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', |
|
'Accept-Encoding': 'gzip, deflate, br', |
|
'Accept-Language': 'en-US,en;q=0.9,id-ID;q=0.8,id;q=0.7,ja;q=0.6,ru;q=0.5,zh-CN;q=0.4,zh;q=0.3,af;q=0.2,nl;q=0.1', |
|
'Cache-Control': 'max-age=0', |
|
'Upgrade-Insecure-Requests': '1', |
|
} |
|
timeout = 10 |
|
try : |
|
response = requests.get(url, headers = headers, timeout = timeout) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
product_container_list = soup.find_all('a', class_="pcv3__info-content css-gwkf0u", href = True) |
|
|
|
for product_info in product_container_list: |
|
link = product_info['href'] |
|
title_element = product_info.find('div', class_="prd_link-product-name css-3um8ox") |
|
title = title_element.text.strip() if title_element else None |
|
|
|
harga_element = product_info.find('div', class_="prd_link-product-price css-h66vau") |
|
harga = harga_element.text.strip() if harga_element else None |
|
|
|
terjual_element = product_info.find('span', class_="prd_label-integrity css-1sgek4h") |
|
terjual = terjual_element.text if terjual_element else None |
|
|
|
rating_element = product_info.find('span', class_='prd_rating-average-text css-t70v7i') |
|
rating = rating_element.text if rating_element else None |
|
|
|
toko_element = product_info.find('span', class_="prd_link-shop-name css-1kdc32b flip") |
|
toko = toko_element.text.strip() if toko_element else None |
|
|
|
asal_product_element = product_info.find('span', class_="prd_link-shop-loc css-1kdc32b flip") |
|
asal_product = asal_product_element.text.strip() if asal_product_element else None |
|
|
|
products.append({ |
|
'link': link, |
|
'produk' : title, |
|
'harga' : harga, |
|
'terjual' : terjual, |
|
'rating' : rating, |
|
'toko' : toko, |
|
'asal_product' : asal_product, |
|
}) |
|
if len(products) >= num_items: |
|
products = products[:num_items] |
|
break |
|
|
|
except Exception as e: |
|
st.error(f"Terjadi kesalahan yang tidak diketahui: {e}") |
|
st.write("Jalankan script ini di IDE/colab.research.google.com Anda :") |
|
code = ''' |
|
!pip install beautifulsoup4 |
|
!pip install requests |
|
!pip install streamlit |
|
from bs4 import BeautifulSoup |
|
import requests |
|
from urllib.parse import quote |
|
import pandas as pd |
|
import streamlit as st |
|
def scrape_tokped(nama_barang, num_items): |
|
products = [] |
|
page = 1 |
|
query = quote(nama_barang) |
|
while len(products) < num_items : |
|
url = f'https://www.tokopedia.com/search?navsource=&page={page}&q={query}&srp_component_id=02.01.00.00&srp_page_id=&srp_page_title=&st=' |
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
} |
|
timeout = 10 |
|
try : |
|
response = requests.get(url, headers = headers, timeout = timeout) |
|
response.raise_for_status() |
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
product_container_list = soup.find_all('a', class_="pcv3__info-content css-gwkf0u", href = True) |
|
|
|
for product_info in product_container_list: |
|
link = product_info['href'] |
|
title_element = product_info.find('div', class_="prd_link-product-name css-3um8ox") |
|
title = title_element.text.strip() if title_element else None |
|
|
|
harga_element = product_info.find('div', class_="prd_link-product-price css-h66vau") |
|
harga = harga_element.text.strip() if harga_element else None |
|
|
|
terjual_element = product_info.find('span', class_="prd_label-integrity css-1sgek4h") |
|
terjual = terjual_element.text if terjual_element else None |
|
|
|
rating_element = product_info.find('span', class_='prd_rating-average-text css-t70v7i') |
|
rating = rating_element.text if rating_element else None |
|
|
|
toko_element = product_info.find('span', class_="prd_link-shop-name css-1kdc32b flip") |
|
toko = toko_element.text.strip() if toko_element else None |
|
|
|
asal_product_element = product_info.find('span', class_="prd_link-shop-loc css-1kdc32b flip") |
|
asal_product = asal_product_element.text.strip() if asal_product_element else None |
|
|
|
products.append({ |
|
'link': link, |
|
'produk' : title, |
|
'harga' : harga, |
|
'terjual' : terjual, |
|
'rating' : rating, |
|
'toko' : toko, |
|
'asal_product' : asal_product, |
|
}) |
|
if len(products) >= num_items: |
|
products = products[:num_items] |
|
break |
|
|
|
except requests.exceptions.RequestException as e: |
|
logging.error(f"Terjadi kesalahan saat mengirim permintaan: {e}") |
|
break |
|
except requests.exceptions.HTTPError as e: |
|
logging.error(f"HTTP Error: {e}") |
|
break |
|
except Exception as e: |
|
logging.error(f"Terjadi kesalahan yang tidak diketahui: {e}") |
|
break |
|
page += 1 |
|
return products) |
|
|
|
nama_barang = input("Masukkan nama barang: ") |
|
num_items = int(input("Masukkan jumlah barang yang ingin diambil: ")) |
|
|
|
# Melakukan scraping menggunakan fungsi scrape_tokped |
|
hasil = scrape_tokped(nama_barang, num_items) |
|
pd.DataFrame(hasil)''' |
|
st.code(code, language='python') |
|
break |
|
page += 1 |
|
return products |
|
|
|
@st.cache_data |
|
def scrape_tokped_with_selenium(nama_barang, num_items): |
|
products = [] |
|
page = 1 |
|
query = quote(nama_barang) |
|
|
|
options = webdriver.ChromeOptions() |
|
options.add_argument('--no-sandbox') |
|
options.add_argument('--headless') |
|
options.add_argument('--disable-notifications') |
|
options.add_argument('--disable-infobars') |
|
options.add_argument('--disable-dev-shm-usage') |
|
|
|
driver = webdriver.Chrome(options=options) |
|
|
|
while len(products) < num_items : |
|
try : |
|
url = f'https://www.tokopedia.com/search?navsource=&page={page}&q={query}&srp_component_id=02.01.00.00&srp_page_id=&srp_page_title=&st=' |
|
|
|
driver.get(url) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
driver.implicitly_wait(20) |
|
|
|
|
|
|
|
product_container_xpath = "//body//*" |
|
|
|
|
|
html = driver.execute_script("return document.getElementsByTagName('html')[0].innerHTML") |
|
st.write(html) |
|
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
|
|
|
product_container_list = soup.find_all('a', class_="pcv3__info-content css-gwkf0u", href=True) |
|
|
|
for product_info in product_container_list: |
|
link = product_info['href'] |
|
st.write(link) |
|
title_element = product_info.find('div', class_="prd_link-product-name css-3um8ox") |
|
title = title_element.text.strip() if title_element else None |
|
|
|
harga_element = product_info.find('div', class_="prd_link-product-price css-h66vau") |
|
harga = harga_element.text.strip() if harga_element else None |
|
|
|
terjual_element = product_info.find('span', class_="prd_label-integrity css-1sgek4h") |
|
terjual = terjual_element.text if terjual_element else None |
|
|
|
rating_element = product_info.find('span', class_='prd_rating-average-text css-t70v7i') |
|
rating = rating_element.text if rating_element else None |
|
|
|
toko_element = product_info.find('span', class_="prd_link-shop-name css-1kdc32b flip") |
|
toko = toko_element.text.strip() if toko_element else None |
|
|
|
asal_product_element = product_info.find('span', class_="prd_link-shop-loc css-1kdc32b flip") |
|
asal_product = asal_product_element.text.strip() if asal_product_element else None |
|
|
|
products.append({ |
|
'link': link, |
|
'produk' : title, |
|
'harga' : harga, |
|
'terjual' : terjual, |
|
'rating' : rating, |
|
'toko' : toko, |
|
'asal_product' : asal_product, |
|
}) |
|
if len(products) >= num_items: |
|
products = products[:num_items] |
|
break |
|
|
|
except requests.exceptions.RequestException as e: |
|
logging.error(f"Terjadi kesalahan saat mengirim permintaan: {e}") |
|
st.error(f"Terjadi kesalahan saat mengirim permintaan: {e}") |
|
break |
|
except requests.exceptions.HTTPError as e: |
|
logging.error(f"HTTP Error: {e}") |
|
st.error(f"HTTP Error: {e}") |
|
break |
|
except Exception as e: |
|
logging.error(f"Terjadi kesalahan yang tidak diketahui: {e}") |
|
st.error(f"Terjadi kesalahan yang tidak diketahui: {e}") |
|
break |
|
except WebDriverException as e: |
|
st.error(f"An error occurred: {e}") |
|
break |
|
finally: |
|
if driver: |
|
driver.quit() |
|
page += 1 |
|
return products |
|
|
|
|
|
|
|
st.title("Scraping E-Commerce") |
|
|
|
with st.expander("Settings :"): |
|
|
|
selected_site = st.selectbox("Pilih Situs Web :", ["klikindomaret.com", "tokopedia.com", "tokopedia.com(selenium)"]) |
|
|
|
nama_barang = st.text_input("Masukkan Nama Barang :") |
|
num_items = st.number_input("Masukkan Estimasi Banyak Data :", min_value = 1, step = 1, placeholder="Type a number...") |
|
|
|
download_format = st.selectbox("Pilih Format Unduhan :", ["XLSX", "CSV", "JSON"]) |
|
st.info('Tekan "Mulai Scraping" kembali jika tampilan menghilang ', icon="ℹ️") |
|
|
|
|
|
hidden_data = [] |
|
|
|
scraping_done = False |
|
|
|
if selected_site == "klikindomaret.com": |
|
if st.button("Mulai Scraping"): |
|
if not nama_barang: |
|
st.error("Mohon isi Nama Barang.") |
|
else: |
|
scraped_products = scrape_klikindomaret(nama_barang, num_items) |
|
hidden_data = scraped_products |
|
scraping_done = True |
|
|
|
if selected_site =="tokopedia.com": |
|
st.warning("Jika mengalami error karena sedang dalam pengembangan. Silahkan pilih situs yang lain", icon="⚠️") |
|
if st.button("Mulai Scraping"): |
|
if not nama_barang: |
|
st.error("Mohon isi Nama Barang.") |
|
else: |
|
scraped_products = scrape_tokped(nama_barang, num_items) |
|
hidden_data = scraped_products |
|
scraping_done = True |
|
|
|
if selected_site == "tokopedia.com(selenium)": |
|
st.warning("Jika mengalami error karena sedang dalam pengembangan. Silahkan pilih situs yang lain", icon="⚠️") |
|
if st.button("Mulai Scraping"): |
|
if not nama_barang: |
|
st.error("Mohon isi Nama Barang.") |
|
else: |
|
scraped_products = scrape_tokped_with_selenium(nama_barang, num_items) |
|
hidden_data = scraped_products |
|
scraping_done = True |
|
|
|
|
|
output_file = f"scraped_{selected_site}_{nama_barang}.xlsx" |
|
output_file_csv = f"scraped_{selected_site}_{nama_barang}.csv" |
|
output_file_json = f"scraped_{selected_site}_{nama_barang}.json" |
|
|
|
|
|
|
|
|
|
|
|
if scraping_done: |
|
if hidden_data: |
|
|
|
with st.expander(f"Hasil Scraping {selected_site} :"): |
|
st.write(pd.DataFrame(scraped_products)) |
|
if download_format == "XLSX": |
|
df = pd.DataFrame(scraped_products) |
|
df.to_excel(output_file, index=False) |
|
st.download_button(label=f"Unduh XLSX ({len(hidden_data)} data)", data=open(output_file, "rb").read(), key="xlsx_download", file_name=output_file) |
|
elif download_format == "CSV": |
|
df = pd.DataFrame(scraped_products) |
|
csv = df.to_csv(index=False) |
|
st.download_button(label=f"Unduh CSV ({len(hidden_data)} data)", data=csv, key="csv_download", file_name=output_file_csv) |
|
elif download_format == "JSON": |
|
json_data = pd.DataFrame(scraped_products).to_json(orient="records") |
|
st.download_button(label=f"Unduh JSON ({len(hidden_data)} data)", data=json_data, key="json_download", file_name=output_file_json) |
|
elif not hidden_data: |
|
st.warning(f"Tidak ada data pada query '{nama_barang}'", icon="⚠️") |
|
|
|
if not scraping_done: |
|
st.write("Tidak ada data untuk diunduh.") |
|
|
|
st.divider() |
|
github_link = "https://github.com/naufalnashif/" |
|
st.markdown(f"GitHub: [{github_link}]({github_link})") |
|
instagram_link = "https://www.instagram.com/naufal.nashif/" |
|
st.markdown(f"Instagram: [{instagram_link}]({instagram_link})") |
|
st.write('Terima kasih telah mencoba demo ini!') |
|
|