ECサイトをスクレイピングする方法:2024年安全実践ガイド
ECサイトスクレイピングの法的ガイドライン、技術的ベストプラクティス、リスク回避方法を詳しく解説。安全で効果的なデータ収集を実現。
EC サイトスクレイピングは利用規約遵守、適切なレート制限、公開データのみの収集を基本とし、法的リスクを最小化して実施する。
EC サイトスクレイピングの基本
EC サイトのスクレイピングは、商品情報、価格データ、在庫状況などの貴重なビジネス情報を収集する強力な手法です。しかし、法的・技術的な課題も多く、適切な知識と準備が不可欠です。
まずスクレイピングの法的問題を理解してから、具体的な実装に進みましょう。
法的コンプライアンス
利用規約の確認
import requests
from urllib.robotparser import RobotFileParser
class LegalComplianceChecker:
def __init__(self, base_url):
self.base_url = base_url
self.robots_parser = RobotFileParser()
self.setup_robots()
def setup_robots(self):
robots_url = f"{self.base_url}/robots.txt"
self.robots_parser.set_url(robots_url)
try:
self.robots_parser.read()
print(f"robots.txt確認完了: {robots_url}")
except:
print("robots.txt読み込み失敗")
def can_scrape_url(self, url, user_agent='*'):
return self.robots_parser.can_fetch(user_agent, url)
def get_crawl_delay(self, user_agent='*'):
return self.robots_parser.crawl_delay(user_agent) or 1.0
# 使用例
checker = LegalComplianceChecker("https://example-shop.com")
if checker.can_scrape_url("/products"):
print("スクレイピング許可")
else:
print("スクレイピング禁止")
GDPR・個人情報保護対応
import re
class PersonalDataFilter:
def __init__(self):
self.sensitive_patterns = [
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # Email
r'\b\d{3}-\d{4}-\d{4}\b', # 電話番号
r'\b\d{4}-\d{4}-\d{4}-\d{4}\b', # クレジットカード
]
def contains_personal_data(self, text):
for pattern in self.sensitive_patterns:
if re.search(pattern, text):
return True
return False
def sanitize_data(self, data):
if isinstance(data, dict):
return {k: v for k, v in data.items()
if not self.contains_personal_data(str(v))}
return data
# 使用例
filter = PersonalDataFilter()
clean_data = filter.sanitize_data(scraped_data)
技術的ベストプラクティス
レート制限の実装
import time
import random
from datetime import datetime, timedelta
class ECommerceRateLimiter:
def __init__(self, requests_per_minute=10, burst_limit=3):
self.requests_per_minute = requests_per_minute
self.burst_limit = burst_limit
self.request_times = []
self.last_request = 0
def wait_if_needed(self):
current_time = time.time()
# 1分以内のリクエスト数をチェック
minute_ago = current_time - 60
self.request_times = [t for t in self.request_times if t > minute_ago]
if len(self.request_times) >= self.requests_per_minute:
sleep_time = 60 - (current_time - self.request_times[0])
print(f"レート制限: {sleep_time:.1f}秒待機")
time.sleep(sleep_time)
# バースト制限のチェック
if current_time - self.last_request < (60 / self.requests_per_minute):
base_delay = 60 / self.requests_per_minute
jitter = random.uniform(0.5, 1.5)
sleep_time = base_delay * jitter
time.sleep(sleep_time)
self.request_times.append(current_time)
self.last_request = current_time
# 使用例
rate_limiter = ECommerceRateLimiter(requests_per_minute=15)
セッション管理と Cookie 処理
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class ECommerceScraper:
def __init__(self):
self.session = requests.Session()
self.setup_session()
self.rate_limiter = ECommerceRateLimiter()
def setup_session(self):
# リトライ戦略
retry_strategy = Retry(
total=3,
backoff_factor=2,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# 現実的なヘッダー設定
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ja,en-US;q=0.7,en;q=0.3',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
})
def scrape_product(self, product_url):
self.rate_limiter.wait_if_needed()
try:
response = self.session.get(product_url, timeout=10)
response.raise_for_status()
return response
except requests.RequestException as e:
print(f"エラー: {product_url} - {e}")
return None
# 使用例
scraper = ECommerceScraper()
商品データ収集の実装
商品情報の構造化
from dataclasses import dataclass
from typing import Optional, List
import json
@dataclass
class ProductData:
name: str
price: Optional[float]
currency: str
availability: str
description: Optional[str]
images: List[str]
ratings: Optional[float]
reviews_count: Optional[int]
category: Optional[str]
brand: Optional[str]
sku: Optional[str]
def to_dict(self):
return {
'name': self.name,
'price': self.price,
'currency': self.currency,
'availability': self.availability,
'description': self.description,
'images': self.images,
'ratings': self.ratings,
'reviews_count': self.reviews_count,
'category': self.category,
'brand': self.brand,
'sku': self.sku
}
class ProductParser:
def __init__(self):
pass
def parse_product_page(self, html_content, url):
# BeautifulSoupやlxmlを使用してパース
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# 構造化データの抽出
product_data = self.extract_structured_data(soup)
# フォールバックとしてHTMLから直接抽出
if not product_data:
product_data = self.extract_from_html(soup)
return product_data
def extract_structured_data(self, soup):
# JSON-LDまたはMicrodataから抽出
json_ld = soup.find('script', type='application/ld+json')
if json_ld:
try:
data = json.loads(json_ld.string)
if data.get('@type') == 'Product':
return self.parse_json_ld_product(data)
except:
pass
return None
def parse_json_ld_product(self, data):
offers = data.get('offers', {})
if isinstance(offers, list):
offers = offers[0] if offers else {}
return ProductData(
name=data.get('name', ''),
price=float(offers.get('price', 0)) if offers.get('price') else None,
currency=offers.get('priceCurrency', 'JPY'),
availability=offers.get('availability', 'Unknown'),
description=data.get('description', ''),
images=[img.get('url', '') for img in data.get('image', [])],
ratings=float(data.get('aggregateRating', {}).get('ratingValue', 0)) or None,
reviews_count=int(data.get('aggregateRating', {}).get('reviewCount', 0)) or None,
category=data.get('category', ''),
brand=data.get('brand', {}).get('name', '') if isinstance(data.get('brand', {}), dict) else str(data.get('brand', '')),
sku=data.get('sku', '')
)
# 使用例
parser = ProductParser()
product = parser.parse_product_page(html_content, product_url)
プロキシと IP ローテーション
EC サイトスクレイピングでは、IP ブロック回避が重要です。
import random
from itertools import cycle
class ECommerceProxyManager:
def __init__(self, proxy_list):
self.proxy_cycle = cycle(proxy_list)
self.failed_proxies = set()
self.success_count = {}
def get_proxy(self):
for _ in range(len(self.proxy_list)):
proxy = next(self.proxy_cycle)
if proxy not in self.failed_proxies:
return proxy
return None # すべてのプロキシが失敗
def mark_proxy_success(self, proxy):
self.success_count[proxy] = self.success_count.get(proxy, 0) + 1
def mark_proxy_failure(self, proxy):
self.failed_proxies.add(proxy)
print(f"プロキシ失敗: {proxy}")
def get_proxy_stats(self):
return {
'total_proxies': len(self.proxy_list),
'failed_proxies': len(self.failed_proxies),
'success_counts': self.success_count
}
# 使用例
proxy_manager = ECommerceProxyManager([
"proxy1.example.com:8000",
"proxy2.example.com:8000"
])
エラーハンドリングと復旧
import logging
from datetime import datetime, timedelta
class ScrapingErrorHandler:
def __init__(self):
self.setup_logging()
self.failed_urls = []
self.retry_queue = []
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('ecommerce_scraping.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def handle_error(self, url, error, retry_count=0):
self.logger.error(f"URL: {url}, Error: {error}, Retry: {retry_count}")
if retry_count < 3:
# 指数バックオフで再試行
delay = (2 ** retry_count) * 60 # 1, 2, 4分
retry_time = datetime.now() + timedelta(seconds=delay)
self.retry_queue.append({
'url': url,
'retry_time': retry_time,
'retry_count': retry_count + 1
})
else:
self.failed_urls.append(url)
def get_ready_retries(self):
now = datetime.now()
ready = [item for item in self.retry_queue if item['retry_time'] <= now]
self.retry_queue = [item for item in self.retry_queue if item['retry_time'] > now]
return ready
# 使用例
error_handler = ScrapingErrorHandler()
データ品質とバリデーション
from typing import Union
import re
class DataValidator:
def __init__(self):
self.price_pattern = re.compile(r'[\d,]+\.?\d*')
def validate_product_data(self, product: ProductData) -> dict:
issues = []
# 必須フィールドの確認
if not product.name or len(product.name.strip()) < 3:
issues.append("商品名が無効")
if product.price is not None and (product.price < 0 or product.price > 10000000):
issues.append("価格が異常")
if not product.currency or len(product.currency) != 3:
issues.append("通貨コードが無効")
# 画像URLの検証
valid_images = []
for img_url in product.images:
if self.is_valid_image_url(img_url):
valid_images.append(img_url)
product.images = valid_images
return {
'valid': len(issues) == 0,
'issues': issues,
'product': product
}
def is_valid_image_url(self, url: str) -> bool:
if not url or not url.startswith(('http://', 'https://')):
return False
return url.lower().endswith(('.jpg', '.jpeg', '.png', '.webp', '.gif'))
def normalize_price(self, price_text: str) -> Union[float, None]:
if not price_text:
return None
# 数字以外を除去
clean_price = re.sub(r'[^\d.,]', '', price_text)
clean_price = clean_price.replace(',', '')
try:
return float(clean_price)
except ValueError:
return None
# 使用例
validator = DataValidator()
validation_result = validator.validate_product_data(product)
継続的監視とメンテナンス
import time
from datetime import datetime
import json
class ScrapingMonitor:
def __init__(self):
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'products_scraped': 0,
'start_time': datetime.now()
}
def log_request(self, success: bool):
self.stats['total_requests'] += 1
if success:
self.stats['successful_requests'] += 1
else:
self.stats['failed_requests'] += 1
def log_product_scraped(self):
self.stats['products_scraped'] += 1
def get_success_rate(self) -> float:
if self.stats['total_requests'] == 0:
return 0.0
return self.stats['successful_requests'] / self.stats['total_requests']
def get_runtime_stats(self) -> dict:
runtime = datetime.now() - self.stats['start_time']
return {
**self.stats,
'runtime_minutes': runtime.total_seconds() / 60,
'success_rate': self.get_success_rate(),
'products_per_minute': self.stats['products_scraped'] / max(runtime.total_seconds() / 60, 1)
}
def should_stop_scraping(self) -> bool:
# 成功率が50%を下回ったら停止
if self.stats['total_requests'] > 50 and self.get_success_rate() < 0.5:
return True
return False
# 使用例
monitor = ScrapingMonitor()
よくある質問
Q1. EC サイトスクレイピングは合法ですか? A. 公開データの収集は一般的に合法ですが、利用規約の遵守と個人情報保護法の遵守が必要です。
Q2. どの程度の頻度でアクセスすべきですか? A. サイトの負荷を考慮し、1 分間に 10-15 リクエスト程度を上限とすることを推奨します。
Q3. API が提供されている場合は? A. 公式 API が利用可能な場合は、必ず API を使用してください。より安全で信頼性が高いです。
Q4. プロキシは必須ですか? A. 大規模なスクレイピングでは推奨されますが、小規模であれば適切なレート制限で対応可能です。
Q5. データの保存期間に制限はありますか? A. 個人情報を含む場合は、GDPR 等の規制に従って適切な保存期間を設定する必要があります。
まとめ
EC サイトスクレイピングは適切な法的・技術的配慮により、安全に実施可能です。常に最新の法規制と技術動向に注意を払い、責任あるデータ収集を心がけてください。
詳細な技術実装については、ローテーティングプロキシの活用法も合わせてご確認ください。