File size: 8,051 Bytes
b4b645c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import os
import re
from playwright.sync_api import sync_playwright
import requests
import sys
from PIL import Image, UnidentifiedImageError
from io import BytesIO
log_file = "app_log.txt" # ログファイルのパス
# ログフォーマットの定義
log_format = '%(asctime)s - %(levelname)s - %(message)s'
import logging
file_handler = logging.FileHandler(log_file, encoding='utf-8')
# ログの設定
logging.basicConfig(
level=logging.INFO, # ログレベルをINFOに設定
format='%(asctime)s - %(levelname)s - %(message)s', # ログのフォーマットを指定
handlers=[
logging.StreamHandler(sys.stdout), # 標準出力にログを出力
file_handler,
]
)
logger = logging.getLogger(__name__)
# 安全なフォルダ名を生成する関数
def generate_safe_folder_name(url):
# URLから安全なフォルダ名を生成(ファイル名に使えない文字を除去)
safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '_', url)
return safe_name
# 画像を保存する関数 (JPG 80%の品質で保存)
def save_image_as_jpg(image_url, save_folder, image_name):
if not os.path.exists(save_folder):
os.makedirs(save_folder)
logger.info(f"フォルダを作成しました: {save_folder}")
try:
response = requests.get(image_url, timeout=10)
response.raise_for_status() # HTTPエラーが発生した場合例外を投げる
except requests.exceptions.RequestException as e:
logger.error(f"画像のダウンロード中にエラーが発生しました: {e}")
return
try:
image = Image.open(BytesIO(response.content))
except UnidentifiedImageError:
logger.warning(f"未識別の画像ファイル: {image_url}. スキップします。")
return
except Exception as e:
logger.error(f"画像のオープン中にエラーが発生しました: {e}")
return
# 保存時に JPG に変換し、品質80%で保存
image_path = os.path.join(save_folder, image_name)
try:
image.convert("RGB").save(image_path, "JPEG", quality=80)
logger.info(f"画像を保存しました: {image_path}")
except Exception as e:
logger.error(f"画像の保存中にエラーが発生しました: {e}")
# 画像の再帰的取得
def scrape_images_by_page(url, folder_name='scraped_images'):
# URLが"/"で終わっている場合、スラッシュを削除
original_url = url
url = url.rstrip('/')
logger.info(f"処理するURL: {url}")
with sync_playwright() as p:
browser = p.chromium.launch(headless=False) # ブラウザを表示して操作
page = browser.new_page()
# 初期ページにアクセス
page.goto(url)
logger.info(f"ページにアクセスしました: {url}")
# ページが完全に読み込まれるまで待機
page.wait_for_load_state('networkidle')
logger.info("ページの読み込みが完了しました。")
# lazy-loading属性を無効にするためのJavaScriptを挿入
try:
page.evaluate("""
document.querySelectorAll('img[loading="lazy"]').forEach(img => {
img.setAttribute('loading', 'eager');
img.src = img.src; // 画像を強制的にリロード
});
""")
logger.info("lazy-loadingを無効化しました。")
except Exception as eval_error:
logger.warning(f"JavaScriptの評価中にエラーが発生しました: {eval_error}")
# フォルダ名を生成
safe_folder_name = generate_safe_folder_name(url)
folder_path = os.path.join(folder_name, safe_folder_name)
logger.info(f"保存先フォルダ: {folder_path}")
# ページ数を取得
try:
# ページ数が格納されているセレクタからテキストを取得
page_count_selector = 'div.tag-container:nth-child(8) > span:nth-child(1) > a:nth-child(1) > span:nth-child(1)'
page_count_text = page.locator(page_count_selector).text_content().strip()
num_pages = int(re.search(r'\d+', page_count_text).group())
logger.info(f"セレクタ '{page_count_selector}' からページ数を取得: {num_pages}")
except Exception as e:
logger.warning(f"セレクタ '{page_count_selector}' からページ数を取得できませんでした: {e}")
# セレクタが見つからない場合のフォールバック
try:
fallback_selector = 'section.reader-bar:nth-child(2) > div:nth-child(2) > button:nth-child(3) > span:nth-child(3)'
page.wait_for_selector(fallback_selector, timeout=5000)
num_pages_text = page.locator(fallback_selector).text_content().strip()
num_pages = int(re.search(r'\d+', num_pages_text).group())
logger.info(f"セレクタ '{fallback_selector}' からページ数を取得: {num_pages}")
except Exception as e2:
logger.error(f"ページ数の取得に失敗しました: {e2}")
num_pages = 1 # デフォルトで1ページとする
logger.info(f"総ページ数: {num_pages}")
# 各ページにアクセスして画像を取得
for i in range(1, num_pages + 1):
page_url = f"{url}/{i}"
page.goto(page_url)
logger.info(f"ページにアクセスしました: {page_url}")
# ページが完全に読み込まれるまで待機
page.wait_for_load_state('networkidle')
logger.info(f"ページ {i} の読み込みが完了しました。")
try:
# 画像を取得するセレクタ
img_selector = '#image-container > a > img'
img_elements = page.locator(img_selector)
img_count = img_elements.count()
logger.info(f"ページ {i} の画像数: {img_count}")
if img_count == 0:
logger.warning(f"ページ {i} に画像が見つかりません。")
continue
for j in range(img_count):
try:
image_element = img_elements.nth(j)
image_url = image_element.get_attribute('src')
if not image_url:
# data-srcなどに画像URLが格納されている場合
image_url = image_element.get_attribute('data-src')
logger.info(f"取得した画像URL (ページ {i}, 画像 {j + 1}): {image_url}")
if image_url:
# ファイル名にページ番号と画像番号を含め、位取りを適用
image_name = f'page_{str(i).zfill(5)}_img_{str(j + 1).zfill(5)}.jpg'
save_image_as_jpg(image_url, folder_path, image_name)
except Exception as e:
logger.error(f"ページ {i}, 画像 {j + 1} の処理中にエラーが発生しました: {e}")
continue
except Exception as e:
logger.error(f"ページ {i} の画像取得中にエラーが発生しました: {e}")
continue
browser.close()
logger.info("ブラウザを閉じました。")
if __name__ == "__main__":
if len(sys.argv) < 2:
logger.error("使用方法: python scrape_images_worker.py <URL>")
sys.exit(1)
url = sys.argv[1] # コマンドライン引数でURLを受け取る
folder_name = 'scraped_images' # デフォルトのフォルダ名
scrape_images_by_page(url, folder_name)
|