Python + Selenium ウェブスクレイピング完全ガイド:2025年版

約 17 分
python
selenium
ウェブスクレイピング
自動化

PythonとSeleniumによる高度なウェブスクレイピングを習得。動的コンテンツの取得、プロキシ統合、ベストプラクティスまで実践的に解説します。

なぜ Python + Selenium なのか?

2025 年のウェブ環境では、多くのサイトが JavaScript で動的にコンテンツを生成しています。従来の requests + BeautifulSoup では取得できないデータも、Selenium ならブラウザーレンダリングで JavaScript 実行後のコンテンツが取得可能です。

Selenium の主な利点

  • JavaScript 完全対応: React、Vue.js、Angular サイトも対応
  • リアルブラウザー操作: 人間のユーザー行動を完全模倣
  • インタラクション豊富: クリック、スクロール、フォーム入力
  • スクリーンショット機能: デバッグとモニタリングに活用

環境セットアップ

必要なライブラリインストール

# Python 3.12以上推奨
pip install selenium webdriver-manager requests beautifulsoup4

ChromeDriver の自動管理

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import json

# Chrome オプション設定
def setup_chrome_options():
    chrome_options = Options()
    chrome_options.add_argument('--headless')  # ヘッドレスモード
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--disable-dev-shm-usage')
    chrome_options.add_argument('--disable-gpu')
    chrome_options.add_argument('--window-size=1920,1080')

    # User-Agent設定(検出回避)
    chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')

    return chrome_options

# WebDriver初期化
def initialize_driver():
    chrome_options = setup_chrome_options()
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=chrome_options)
    return driver

基本的なスクレイピング実装

静的コンテンツの取得

def scrape_basic_content(url):
    driver = initialize_driver()

    try:
        # ページアクセス
        driver.get(url)

        # ページ読み込み待機
        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.TAG_NAME, "body"))
        )

        # タイトル取得
        title = driver.title

        # 特定要素の取得
        elements = driver.find_elements(By.CLASS_NAME, "product-item")

        results = []
        for element in elements:
            name = element.find_element(By.TAG_NAME, "h3").text
            price = element.find_element(By.CLASS_NAME, "price").text

            results.append({
                "name": name,
                "price": price
            })

        return {
            "title": title,
            "products": results
        }

    finally:
        driver.quit()

# 使用例
url = "https://example-ecommerce.com/products"
data = scrape_basic_content(url)
print(json.dumps(data, ensure_ascii=False, indent=2))

動的コンテンツの高度な取得

無限スクロール対応

def scrape_infinite_scroll(url, max_scrolls=5):
    driver = initialize_driver()

    try:
        driver.get(url)

        # 初期コンテンツ読み込み待機
        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CLASS_NAME, "content-item"))
        )

        all_items = []
        scroll_count = 0

        while scroll_count < max_scrolls:
            # 現在のアイテム数を記録
            current_items = driver.find_elements(By.CLASS_NAME, "content-item")
            items_before = len(current_items)

            # ページ最下部までスクロール
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

            # 新しいコンテンツの読み込み待機
            time.sleep(2)

            # 新しいアイテムが追加されたかチェック
            new_items = driver.find_elements(By.CLASS_NAME, "content-item")
            items_after = len(new_items)

            # 新しいアイテムがない場合は終了
            if items_after == items_before:
                break

            scroll_count += 1

        # 全アイテムのデータ抽出
        final_items = driver.find_elements(By.CLASS_NAME, "content-item")

        for item in final_items:
            try:
                title = item.find_element(By.TAG_NAME, "h2").text
                description = item.find_element(By.CLASS_NAME, "description").text

                all_items.append({
                    "title": title,
                    "description": description
                })
            except Exception as e:
                continue  # エラーアイテムはスキップ

        return all_items

    finally:
        driver.quit()

JavaScript 実行と AJAX 待機

def scrape_ajax_content(url):
    driver = initialize_driver()

    try:
        driver.get(url)

        # ボタンクリックでAJAXローディング
        load_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.ID, "load-more-button"))
        )
        load_button.click()

        # AJAXレスポンス待機(カスタムJavaScript実行)
        WebDriverWait(driver, 15).until(
            lambda driver: driver.execute_script(
                "return document.querySelector('.ajax-content') && "
                "document.querySelector('.ajax-content').children.length > 0"
            )
        )

        # 動的に追加されたコンテンツ取得
        ajax_content = driver.find_element(By.CLASS_NAME, "ajax-content")

        # データ抽出
        items = ajax_content.find_elements(By.CLASS_NAME, "dynamic-item")

        results = []
        for item in items:
            # JavaScript経由でデータ属性取得
            data_id = driver.execute_script("return arguments[0].dataset.id;", item)
            data_value = driver.execute_script("return arguments[0].dataset.value;", item)

            results.append({
                "id": data_id,
                "value": data_value,
                "text": item.text
            })

        return results

    finally:
        driver.quit()

プロキシ統合と IP 制限回避

Bright Data プロキシとの連携

def setup_proxy_driver(proxy_host, proxy_port, proxy_user, proxy_pass):
    chrome_options = Options()
    chrome_options.add_argument('--headless')

    # プロキシ設定
    chrome_options.add_argument(f'--proxy-server=http://{proxy_host}:{proxy_port}')

    # 認証が必要な場合の拡張機能作成
    proxy_auth_extension = create_proxy_auth_extension(proxy_user, proxy_pass)
    chrome_options.add_extension(proxy_auth_extension)

    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=chrome_options)

    return driver

def create_proxy_auth_extension(username, password):
    """プロキシ認証用のChrome拡張機能を作成"""
    import zipfile
    import os

    manifest_json = """
    {
        "version": "1.0.0",
        "manifest_version": 2,
        "name": "Proxy Authentication",
        "permissions": [
            "proxy",
            "tabs",
            "unlimitedStorage",
            "storage",
            "<all_urls>",
            "webRequest",
            "webRequestBlocking"
        ],
        "background": {
            "scripts": ["background.js"]
        }
    }
    """

    background_js = f"""
    var config = {{
        mode: "fixed_servers",
        rules: {{
            singleProxy: {{
                scheme: "http",
                host: "{proxy_host}",
                port: parseInt({proxy_port})
            }},
            bypassList: ["localhost"]
        }}
    }};

    chrome.proxy.settings.set({{value: config, scope: "regular"}}, function() {{}});

    function callbackFn(details) {{
        return {{
            authCredentials: {{
                username: "{username}",
                password: "{password}"
            }}
        }};
    }}

    chrome.webRequest.onAuthRequired.addListener(
        callbackFn,
        {{urls: ["<all_urls>"]}},
        ['blocking']
    );
    """

    # 一時的な拡張機能ファイル作成
    plugin_file = 'proxy_auth_plugin.zip'
    with zipfile.ZipFile(plugin_file, 'w') as zp:
        zp.writestr("manifest.json", manifest_json)
        zp.writestr("background.js", background_js)

    return plugin_file

# プロキシ使用例
def scrape_with_proxy(urls):
    # Bright Data の設定例
    proxy_config = {
        "proxy_host": "brd-customer-c_xxxxxxx-zone-residential.brd.superproxy.io",
        "proxy_port": 22225,
        "proxy_user": "your-username",
        "proxy_pass": "your-password"
    }

    results = []

    for url in urls:
        driver = setup_proxy_driver(**proxy_config)

        try:
            driver.get(url)

            # IP確認(デバッグ用)
            driver.get("http://httpbin.org/ip")
            ip_info = driver.find_element(By.TAG_NAME, "pre").text
            print(f"Current IP: {ip_info}")

            # メインコンテンツの取得
            driver.get(url)
            data = extract_page_data(driver)
            results.append(data)

            # リクエスト間隔調整
            time.sleep(2)

        finally:
            driver.quit()

    return results

エラーハンドリングと最適化

堅牢なスクレイピング実装

from selenium.common.exceptions import TimeoutException, NoSuchElementException
import logging

class RobustScraper:
    def __init__(self, headless=True, timeout=10):
        self.headless = headless
        self.timeout = timeout
        self.driver = None

        # ログ設定
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def __enter__(self):
        self.driver = self.initialize_driver()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.driver:
            self.driver.quit()

    def initialize_driver(self):
        chrome_options = Options()
        if self.headless:
            chrome_options.add_argument('--headless')

        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--disable-blink-features=AutomationControlled')
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)

        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service, options=chrome_options)

        # WebDriver検出対策
        driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")

        return driver

    def safe_find_element(self, by, value, timeout=None):
        """安全な要素取得"""
        timeout = timeout or self.timeout

        try:
            element = WebDriverWait(self.driver, timeout).until(
                EC.presence_of_element_located((by, value))
            )
            return element
        except TimeoutException:
            self.logger.warning(f"Element not found: {by}={value}")
            return None

    def safe_click(self, by, value, timeout=None):
        """安全なクリック操作"""
        try:
            element = WebDriverWait(self.driver, timeout or self.timeout).until(
                EC.element_to_be_clickable((by, value))
            )

            # JavaScriptクリックを試行
            self.driver.execute_script("arguments[0].click();", element)
            return True

        except Exception as e:
            self.logger.error(f"Click failed: {e}")
            return False

    def scrape_with_retry(self, url, max_retries=3):
        """リトライ機能付きスクレイピング"""
        for attempt in range(max_retries):
            try:
                self.driver.get(url)

                # ページ読み込み完了待機
                WebDriverWait(self.driver, self.timeout).until(
                    lambda driver: driver.execute_script("return document.readyState") == "complete"
                )

                # メインコンテンツの存在確認
                main_content = self.safe_find_element(By.TAG_NAME, "main")
                if main_content:
                    return self.extract_data()
                else:
                    raise Exception("Main content not found")

            except Exception as e:
                self.logger.warning(f"Attempt {attempt + 1} failed: {e}")
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)  # 指数バックオフ

    def extract_data(self):
        """データ抽出のメインロジック"""
        try:
            # タイトル取得
            title = self.driver.title

            # メタ情報取得
            meta_description = ""
            meta_element = self.safe_find_element(By.XPATH, "//meta[@name='description']")
            if meta_element:
                meta_description = meta_element.get_attribute("content")

            # スクリーンショット保存(デバッグ用)
            self.driver.save_screenshot(f"screenshot_{int(time.time())}.png")

            return {
                "title": title,
                "meta_description": meta_description,
                "url": self.driver.current_url,
                "timestamp": time.time()
            }

        except Exception as e:
            self.logger.error(f"Data extraction failed: {e}")
            return None

# 使用例
def main():
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]

    results = []

    with RobustScraper(headless=True, timeout=15) as scraper:
        for url in urls:
            try:
                data = scraper.scrape_with_retry(url)
                if data:
                    results.append(data)
                    print(f"Successfully scraped: {url}")
                else:
                    print(f"Failed to scrape: {url}")
            except Exception as e:
                print(f"Error scraping {url}: {e}")
                continue

    # 結果保存
    with open("scraped_data.json", "w", encoding="utf-8") as f:
        json.dump(results, f, ensure_ascii=False, indent=2)

    print(f"Scraped {len(results)} pages successfully")

if __name__ == "__main__":
    main()

パフォーマンス最適化のベストプラクティス

1. メモリ使用量の最適化

# ページリソース制限
chrome_options.add_argument('--disable-images')  # 画像読み込み無効
chrome_options.add_argument('--disable-javascript')  # JS無効(必要に応じて)
chrome_options.add_argument('--disable-plugins')
chrome_options.add_argument('--disable-extensions')

2. 並列処理の実装

from concurrent.futures import ThreadPoolExecutor
import queue

def parallel_scraping(urls, max_workers=3):
    results = []

    def scrape_single_url(url):
        with RobustScraper() as scraper:
            return scraper.scrape_with_retry(url)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(scrape_single_url, url) for url in urls]

        for future in futures:
            try:
                result = future.result(timeout=60)
                if result:
                    results.append(result)
            except Exception as e:
                print(f"Parallel scraping error: {e}")

    return results

3. レート制限の実装

import time
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, max_requests_per_minute=10):
        self.max_requests = max_requests_per_minute
        self.requests = []

    def wait_if_needed(self):
        now = datetime.now()
        # 1分以内のリクエストをフィルタ
        self.requests = [req_time for req_time in self.requests
                        if now - req_time < timedelta(minutes=1)]

        if len(self.requests) >= self.max_requests:
            # 最古のリクエストから1分経過するまで待機
            sleep_time = 60 - (now - self.requests[0]).total_seconds()
            if sleep_time > 0:
                time.sleep(sleep_time)

        self.requests.append(now)

よくある問題と対策

CAPTCHA 検出対策

def setup_stealth_driver():
    chrome_options = Options()

    # 検出回避設定
    chrome_options.add_argument('--disable-blink-features=AutomationControlled')
    chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
    chrome_options.add_experimental_option('useAutomationExtension', False)

    # ランダムUser-Agent
    user_agents = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    ]

    chrome_options.add_argument(f'--user-agent={random.choice(user_agents)}')

    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=chrome_options)

    # WebDriver痕跡除去
    driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")

    return driver

まとめ:効果的な Selenium スクレイピング

成功の鍵

  1. 適切な待機戦略: WebDriverWait を活用
  2. エラーハンドリング: 堅牢なリトライ機構
  3. プロキシ活用: IP 制限回避
  4. レート制限: サーバー負荷軽減
  5. 検出対策: 人間らしい動作模倣

2025 年のトレンド

  • ヘッドレスブラウザ高速化
  • AI CAPTCHA ソリューション統合
  • クラウドベースの実行環境
  • リアルタイムプロキシローテーション

Selenium + Python の組み合わせは、動的ウェブサイトのスクレイピングにおいて最も効果的なソリューションです。適切な実装と最適化により、安定したデータ取得が可能になります。

実践的な学習: この記事のコード例を基に、実際のプロジェクトで段階的に機能を追加していくことをお勧めします。


本記事のコード例は 2025 年 1 月時点の Selenium 4.x 系で動作確認済みです。最新バージョンでの変更点は公式ドキュメントをご確認ください。

この記事が役に立ちましたら、ぜひシェアしてください。

関連記事

関連記事の実装を準備中です。