naufalnashif's picture
Update app.py
486ab89 verified
raw
history blame
19.5 kB
import requests
from bs4 import BeautifulSoup
import pandas as pd
from urllib.parse import quote
import streamlit as st
import json
import time
import logging
import random
from lxml_html_clean import Cleaner
from requests_html import HTMLSession
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.common.by import By
logging.basicConfig(level=logging.DEBUG)
@st.cache_data
def scrape_klikindomaret(nama_barang, num_items):
products = []
page = 1
query = quote(nama_barang)
while len(products) < num_items :
if len (products) > num_items :
products = products[:num_items]
break
url = f"https://www.klikindomaret.com/search/?key={query}&categories=&productbrandid=&sortcol=&pagesize=54&page={page}&startprice=&endprice=&attributes=&ShowItem="
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
product_list = soup.find_all('a', href=True)
for product in product_list:
product_href = product['href']
if '/product/' in product_href:
product_name = product.find('div', class_='title').text.strip()
product_price = product.find('span', class_='normal price-value').text.strip()
# Cek apakah ada harga sebelum diskon dan persentase diskon
discount_element = product.find('span', class_='strikeout disc-price')
discount_percentage = ""
original_price = ""
if discount_element:
discount_percentage = discount_element.find('span', class_='discount').text.strip()
original_price = discount_element.text.replace(discount_percentage, '').strip()
else:
# Jika tidak ada diskon, set discount_percentage ke "0%" dan original_price ke product_price
discount_percentage = "0%"
original_price = product_price
product_link = f"https://www.klikindomaret.com{product_href}"
products.append({
'product': product_name,
'original_price': original_price,
'discount_percentage': discount_percentage,
'price': product_price,
'link': product_link
})
if len (products) > num_items :
products = products[:num_items]
break
page += 1
return products
@st.cache_data
def scrape_tokped(nama_barang, num_items):
products = []
page = 1
query = quote(nama_barang)
while len(products) < num_items :
url = f'https://www.tokopedia.com/search?navsource=&page={page}&q={query}&srp_component_id=02.01.00.00&srp_page_id=&srp_page_title=&st='
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,id-ID;q=0.8,id;q=0.7,ja;q=0.6,ru;q=0.5,zh-CN;q=0.4,zh;q=0.3,af;q=0.2,nl;q=0.1',
'Cache-Control': 'max-age=0',
'Upgrade-Insecure-Requests': '1',
}
timeout = 10
try :
response = requests.get(url, headers = headers, timeout = timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
product_container_list = soup.find_all('a', class_="pcv3__info-content css-gwkf0u", href = True)
for product_info in product_container_list:
link = product_info['href']
title_element = product_info.find('div', class_="prd_link-product-name css-3um8ox")
title = title_element.text.strip() if title_element else None
harga_element = product_info.find('div', class_="prd_link-product-price css-h66vau")
harga = harga_element.text.strip() if harga_element else None
terjual_element = product_info.find('span', class_="prd_label-integrity css-1sgek4h")
terjual = terjual_element.text if terjual_element else None
rating_element = product_info.find('span', class_='prd_rating-average-text css-t70v7i')
rating = rating_element.text if rating_element else None
toko_element = product_info.find('span', class_="prd_link-shop-name css-1kdc32b flip")
toko = toko_element.text.strip() if toko_element else None
asal_product_element = product_info.find('span', class_="prd_link-shop-loc css-1kdc32b flip")
asal_product = asal_product_element.text.strip() if asal_product_element else None
products.append({
'link': link,
'produk' : title,
'harga' : harga,
'terjual' : terjual,
'rating' : rating,
'toko' : toko,
'asal_product' : asal_product,
})
if len(products) >= num_items:
products = products[:num_items]
break
except Exception as e:
st.error(f"Terjadi kesalahan yang tidak diketahui: {e}")
st.write("Jalankan script ini di IDE/colab.research.google.com Anda :")
code = '''
!pip install beautifulsoup4
!pip install requests
!pip install streamlit
from bs4 import BeautifulSoup
import requests
from urllib.parse import quote
import pandas as pd
import streamlit as st
def scrape_tokped(nama_barang, num_items):
products = []
page = 1
query = quote(nama_barang)
while len(products) < num_items :
url = f'https://www.tokopedia.com/search?navsource=&page={page}&q={query}&srp_component_id=02.01.00.00&srp_page_id=&srp_page_title=&st='
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
}
timeout = 10
try :
response = requests.get(url, headers = headers, timeout = timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
product_container_list = soup.find_all('a', class_="pcv3__info-content css-gwkf0u", href = True)
for product_info in product_container_list:
link = product_info['href']
title_element = product_info.find('div', class_="prd_link-product-name css-3um8ox")
title = title_element.text.strip() if title_element else None
harga_element = product_info.find('div', class_="prd_link-product-price css-h66vau")
harga = harga_element.text.strip() if harga_element else None
terjual_element = product_info.find('span', class_="prd_label-integrity css-1sgek4h")
terjual = terjual_element.text if terjual_element else None
rating_element = product_info.find('span', class_='prd_rating-average-text css-t70v7i')
rating = rating_element.text if rating_element else None
toko_element = product_info.find('span', class_="prd_link-shop-name css-1kdc32b flip")
toko = toko_element.text.strip() if toko_element else None
asal_product_element = product_info.find('span', class_="prd_link-shop-loc css-1kdc32b flip")
asal_product = asal_product_element.text.strip() if asal_product_element else None
products.append({
'link': link,
'produk' : title,
'harga' : harga,
'terjual' : terjual,
'rating' : rating,
'toko' : toko,
'asal_product' : asal_product,
})
if len(products) >= num_items:
products = products[:num_items]
break
except requests.exceptions.RequestException as e:
logging.error(f"Terjadi kesalahan saat mengirim permintaan: {e}")
break
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP Error: {e}")
break
except Exception as e:
logging.error(f"Terjadi kesalahan yang tidak diketahui: {e}")
break
page += 1
return products)
nama_barang = input("Masukkan nama barang: ")
num_items = int(input("Masukkan jumlah barang yang ingin diambil: "))
# Melakukan scraping menggunakan fungsi scrape_tokped
hasil = scrape_tokped(nama_barang, num_items)
pd.DataFrame(hasil)'''
st.code(code, language='python')
break
page += 1
return products
@st.cache_data
def scrape_tokped_with_selenium(nama_barang, num_items):
products = []
page = 1
query = quote(nama_barang)
options = webdriver.ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('--headless')
options.add_argument('--disable-notifications')
options.add_argument('--disable-infobars')
options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=options)
while len(products) < num_items :
try :
url = f'https://www.tokopedia.com/search?navsource=&page={page}&q={query}&srp_component_id=02.01.00.00&srp_page_id=&srp_page_title=&st='
driver.get(url)
# Eksekusi JavaScript untuk mengatur header
#driver.execute_script(
#"""
#var xhr = new XMLHttpRequest();
#xhr.open('GET', arguments[0], false);
#xhr.setRequestHeader('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36');
#xhr.send(null);
#"""
#, url
#)
# Dapatkan sumber halaman setelah eksekusi JavaScript
# Tunggu hingga halaman selesai dimuat (opsional, tergantung kebutuhan)
driver.implicitly_wait(20) # Tunggu maksimal 20 detik
# Temukan elemen kontainer produk berdasarkan XPath atau CSS selector
# Di sini, saya menggunakan XPath sebagai contoh:
product_container_xpath = "//body//*" # Ganti dengan XPath yang sesuai
#html = driver.find_elements_by_xpath('//body//*')
#html = driver.find_element(By.XPATH, product_container_xpath)
html = driver.execute_script("return document.getElementsByTagName('html')[0].innerHTML")
st.write(html)
# Gunakan BeautifulSoup untuk melakukan parsing HTML
soup = BeautifulSoup(html, "html.parser")
# Cari semua elemen yang sesuai
product_container_list = soup.find_all('a', class_="pcv3__info-content css-gwkf0u", href=True)
for product_info in product_container_list:
link = product_info['href']
st.write(link)
title_element = product_info.find('div', class_="prd_link-product-name css-3um8ox")
title = title_element.text.strip() if title_element else None
harga_element = product_info.find('div', class_="prd_link-product-price css-h66vau")
harga = harga_element.text.strip() if harga_element else None
terjual_element = product_info.find('span', class_="prd_label-integrity css-1sgek4h")
terjual = terjual_element.text if terjual_element else None
rating_element = product_info.find('span', class_='prd_rating-average-text css-t70v7i')
rating = rating_element.text if rating_element else None
toko_element = product_info.find('span', class_="prd_link-shop-name css-1kdc32b flip")
toko = toko_element.text.strip() if toko_element else None
asal_product_element = product_info.find('span', class_="prd_link-shop-loc css-1kdc32b flip")
asal_product = asal_product_element.text.strip() if asal_product_element else None
products.append({
'link': link,
'produk' : title,
'harga' : harga,
'terjual' : terjual,
'rating' : rating,
'toko' : toko,
'asal_product' : asal_product,
})
if len(products) >= num_items:
products = products[:num_items]
break
except requests.exceptions.RequestException as e:
logging.error(f"Terjadi kesalahan saat mengirim permintaan: {e}")
st.error(f"Terjadi kesalahan saat mengirim permintaan: {e}")
break
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP Error: {e}")
st.error(f"HTTP Error: {e}")
break
except Exception as e:
logging.error(f"Terjadi kesalahan yang tidak diketahui: {e}")
st.error(f"Terjadi kesalahan yang tidak diketahui: {e}")
break
except WebDriverException as e:
st.error(f"An error occurred: {e}")
break
finally:
if driver:
driver.quit()
page += 1
return products
#---------------------------------------------------User Interface----------------------------------------------------------------------
# Streamlit UI
st.title("Scraping E-Commerce")
with st.expander("Settings :"):
# Pilihan untuk memilih situs web
selected_site = st.selectbox("Pilih Situs Web :", ["klikindomaret.com", "tokopedia.com", "tokopedia.com(selenium)"])
nama_barang = st.text_input("Masukkan Nama Barang :")
num_items = st.number_input("Masukkan Estimasi Banyak Data :", min_value = 1, step = 1, placeholder="Type a number...")
download_format = st.selectbox("Pilih Format Unduhan :", ["XLSX", "CSV", "JSON"])
st.info('Tekan "Mulai Scraping" kembali jika tampilan menghilang ', icon="ℹ️")
# Variabel tersembunyi untuk menyimpan hasil scraping
hidden_data = []
scraping_done = False # Tambahkan variabel ini
if selected_site == "klikindomaret.com":
if st.button("Mulai Scraping"):
if not nama_barang:
st.error("Mohon isi Nama Barang.")
else:
scraped_products = scrape_klikindomaret(nama_barang, num_items)
hidden_data = scraped_products # Simpan data ke dalam variabel tersembunyi
scraping_done = True # Set scraping_done menjadi True
if selected_site =="tokopedia.com":
st.warning("Jika mengalami error karena sedang dalam pengembangan. Silahkan pilih situs yang lain", icon="⚠️")
if st.button("Mulai Scraping"):
if not nama_barang:
st.error("Mohon isi Nama Barang.")
else:
scraped_products = scrape_tokped(nama_barang, num_items)
hidden_data = scraped_products # Simpan data ke dalam variabel tersembunyi
scraping_done = True # Set scraping_done menjadi True
if selected_site == "tokopedia.com(selenium)":
st.warning("Jika mengalami error karena sedang dalam pengembangan. Silahkan pilih situs yang lain", icon="⚠️")
if st.button("Mulai Scraping"):
if not nama_barang:
st.error("Mohon isi Nama Barang.")
else:
scraped_products = scrape_tokped_with_selenium(nama_barang, num_items)
hidden_data = scraped_products # Simpan data ke dalam variabel tersembunyi
scraping_done = True # Set scraping_done menjadi True
# Simpan DataFrame ke dalam file
output_file = f"scraped_{selected_site}_{nama_barang}.xlsx"
output_file_csv = f"scraped_{selected_site}_{nama_barang}.csv"
output_file_json = f"scraped_{selected_site}_{nama_barang}.json"
#---------------------------------------------------Download File & Hasil Scraping----------------------------------------------------------------------
# Tampilkan hasil scraping
if scraping_done:
if hidden_data:
# Menampilkan hasil sentimen dalam kotak yang dapat diperluas
with st.expander(f"Hasil Scraping {selected_site} :"):
st.write(pd.DataFrame(scraped_products))
if download_format == "XLSX":
df = pd.DataFrame(scraped_products)
df.to_excel(output_file, index=False)
st.download_button(label=f"Unduh XLSX ({len(hidden_data)} data)", data=open(output_file, "rb").read(), key="xlsx_download", file_name=output_file)
elif download_format == "CSV":
df = pd.DataFrame(scraped_products)
csv = df.to_csv(index=False)
st.download_button(label=f"Unduh CSV ({len(hidden_data)} data)", data=csv, key="csv_download", file_name=output_file_csv)
elif download_format == "JSON":
json_data = pd.DataFrame(scraped_products).to_json(orient="records")
st.download_button(label=f"Unduh JSON ({len(hidden_data)} data)", data=json_data, key="json_download", file_name=output_file_json)
elif not hidden_data:
st.warning(f"Tidak ada data pada query '{nama_barang}'", icon="⚠️")
if not scraping_done:
st.write("Tidak ada data untuk diunduh.")
st.divider()
github_link = "https://github.com/naufalnashif/"
st.markdown(f"GitHub: [{github_link}]({github_link})")
instagram_link = "https://www.instagram.com/naufal.nashif/"
st.markdown(f"Instagram: [{instagram_link}]({instagram_link})")
st.write('Terima kasih telah mencoba demo ini!')