AWS LambdaとBright Dataでスケーリング:大規模スクレイピングシステム構築ガイド

約 16 分
AWS Lambda
Bright Data
大規模スクレイピング
サーバーレス

AWS LambdaとBright Dataを組み合わせた大規模スクレイピングシステムの構築方法を解説。コスト最適化、エラーハンドリング、スケーラビリティを実現する実装例とベストプラクティスを紹介します。

なぜAWS Lambda + Bright Dataなのか?

大規模なWebスクレイピングにおいて、従来のEC2ベースのソリューションではインフラ管理の複雑さとコストが課題でした。AWS LambdaとBright Dataの組み合わせは、これらの問題を根本から解決します。

従来の課題と解決策

従来の課題

  • 24/7稼働するEC2インスタンスの高額な運用コスト
  • スケーリング時の複雑な設定管理
  • IPブロック対策の手動運用

AWS Lambda + Bright Data の解決策

  • 使用した分だけの完全従量課金
  • 自動スケーリング(最大1,000同時実行)
  • グローバルプロキシネットワークによる自動IP管理

AWS Lambdaスケーリングの仕組み

AWS Lambda スケーリング動作

スケーリング仕様(2025年最新)

AWSの公式ドキュメントによると、Lambdaのスケーリング動作は以下の通りです:

  • 同時実行スケーリングレート: 10秒ごとに1,000インスタンス
  • リクエスト処理能力: 1秒あたり10,000リクエスト(10秒間隔)
  • 関数レベル制限: 各関数が独立してスケール可能

これにより、理論上は100万リクエスト/分の処理が可能です。

実装アーキテクチャ

基本構成

SQS Queue ──> Lambda Function ──> Bright Data ──> Target Websites
    ↓              ↓                    ↓
DynamoDB      CloudWatch          Result Storage
(State)       (Monitoring)         (S3/RDS)

コンポーネント解説

  1. SQS: スクレイピングタスクのキュー管理
  2. Lambda: 実際のスクレイピング処理
  3. Bright Data: プロキシとCAPTCHA回避
  4. DynamoDB: 状態管理とレート制限
  5. S3/RDS: 結果データの永続化

Lambda関数の実装

メイン処理関数

import json
import boto3
import requests
from datetime import datetime, timedelta
import os

# 環境変数
BRIGHT_DATA_USERNAME = os.environ['BRIGHT_DATA_USERNAME']
BRIGHT_DATA_PASSWORD = os.environ['BRIGHT_DATA_PASSWORD']
BRIGHT_DATA_ENDPOINT = os.environ['BRIGHT_DATA_ENDPOINT']

def lambda_handler(event, context):
    """
    メインのLambda関数
    SQSからのメッセージを処理してスクレイピングを実行
    """
    
    results = []
    
    # SQSバッチメッセージ処理
    for record in event['Records']:
        try:
            # メッセージ解析
            message = json.loads(record['body'])
            url = message['url']
            scraping_config = message.get('config', {})
            
            # スクレイピング実行
            result = scrape_with_bright_data(url, scraping_config)
            
            # 結果保存
            save_result_to_s3(result, url)
            
            results.append({
                'url': url,
                'status': 'success',
                'data_size': len(result)
            })
            
        except Exception as e:
            print(f"エラー処理: {url} - {str(e)}")
            results.append({
                'url': url,
                'status': 'error',
                'error': str(e)
            })
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'processed': len(results),
            'results': results
        })
    }

def scrape_with_bright_data(url, config):
    """
    Bright Dataプロキシを使用したスクレイピング
    """
    
    # Bright Data プロキシ設定
    proxy_config = {
        'http': f'http://{BRIGHT_DATA_USERNAME}:{BRIGHT_DATA_PASSWORD}@{BRIGHT_DATA_ENDPOINT}',
        'https': f'http://{BRIGHT_DATA_USERNAME}:{BRIGHT_DATA_PASSWORD}@{BRIGHT_DATA_ENDPOINT}'
    }
    
    # ヘッダー設定(検出回避)
    headers = {
        'User-Agent': config.get('user_agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'),
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'ja,en-US;q=0.7,en;q=0.3',
        'Accept-Encoding': 'gzip, deflate',
        'Connection': 'keep-alive'
    }
    
    try:
        response = requests.get(
            url,
            proxies=proxy_config,
            headers=headers,
            timeout=30,
            verify=True
        )
        
        response.raise_for_status()
        
        return {
            'url': url,
            'status_code': response.status_code,
            'content': response.text,
            'headers': dict(response.headers),
            'timestamp': datetime.now().isoformat()
        }
        
    except requests.exceptions.RequestException as e:
        raise Exception(f"スクレイピングエラー: {str(e)}")

エラーハンドリングとリトライ機能

import time
import random
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
    """
    指数バックオフでリトライするデコレーター
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise e
                    
                    # 指数バックオフ + ジッター
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    jitter = random.uniform(0, 0.1) * delay
                    time.sleep(delay + jitter)
                    
                    print(f"リトライ {attempt + 1}/{max_retries}: {str(e)}")
            
            return None
        return wrapper
    return decorator

@retry_with_backoff(max_retries=3)
def robust_scrape(url, config):
    """
    エラーハンドリング強化版スクレイピング
    """
    return scrape_with_bright_data(url, config)

DynamoDBによる状態管理

テーブル設計

import boto3
from boto3.dynamodb.conditions import Key, Attr
from decimal import Decimal

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('scraping-state')

def manage_scraping_state(url, action='get'):
    """
    スクレイピング状態の管理
    重複実行防止とレート制限に使用
    """
    
    url_hash = hashlib.md5(url.encode()).hexdigest()
    
    if action == 'lock':
        # 実行中状態をセット
        try:
            table.put_item(
                Item={
                    'url_hash': url_hash,
                    'url': url,
                    'status': 'processing',
                    'timestamp': Decimal(str(time.time())),
                    'ttl': int(time.time() + 3600)  # 1時間TTL
                },
                ConditionExpression='attribute_not_exists(url_hash)'
            )
            return True
        except ClientError as e:
            if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
                return False  # 既に処理中
            raise
    
    elif action == 'complete':
        # 完了状態に更新
        table.update_item(
            Key={'url_hash': url_hash},
            UpdateExpression='SET #status = :status, completion_time = :time',
            ExpressionAttributeNames={'#status': 'status'},
            ExpressionAttributeValues={
                ':status': 'completed',
                ':time': Decimal(str(time.time()))
            }
        )
        return True
    
    elif action == 'get':
        # 状態取得
        response = table.get_item(Key={'url_hash': url_hash})
        return response.get('Item')

コスト最適化戦略

1. Lambdaコスト削減

def optimize_lambda_performance():
    """
    Lambda関数のパフォーマンス最適化
    """
    
    # メモリ設定最適化(1769MB推奨)
    # - CPU性能とメモリのバランス
    # - ネットワーク帯域幅の最適化
    
    # 同時実行数制限(コスト制御)
    RESERVED_CONCURRENCY = 100  # 初期設定
    
    # バッチ処理でSQSコスト削減
    MAX_BATCH_SIZE = 10  # SQSメッセージ
    
    return {
        'memory': 1769,
        'timeout': 120,  # 2分
        'reserved_concurrency': RESERVED_CONCURRENCY,
        'batch_size': MAX_BATCH_SIZE
    }

2. Bright Dataコスト管理

def track_bright_data_usage():
    """
    Bright Data使用量の追跡と制限
    """
    
    # 使用量トラッキング
    usage_table = dynamodb.Table('bright-data-usage')
    
    def log_request(url, bytes_used):
        today = datetime.now().strftime('%Y-%m-%d')
        
        usage_table.update_item(
            Key={'date': today},
            UpdateExpression='ADD total_bytes :bytes, request_count :count',
            ExpressionAttributeValues={
                ':bytes': bytes_used,
                ':count': 1
            }
        )
    
    def check_daily_limit():
        today = datetime.now().strftime('%Y-%m-%d')
        response = usage_table.get_item(Key={'date': today})
        
        if 'Item' in response:
            total_bytes = response['Item'].get('total_bytes', 0)
            # 日次制限: 100GB
            return total_bytes < (100 * 1024 * 1024 * 1024)
        
        return True
    
    return {
        'log_request': log_request,
        'check_limit': check_daily_limit
    }

3. 費用対効果の計算

構成要素従来(EC2)Lambda + Bright Data削減率
インフラ$500/月$50/月90% ↓
プロキシ$300/月$200/月33% ↓
管理工数40時間/月5時間/月87% ↓
合計$800/月$250/月69% ↓

監視とアラート設定

CloudWatch監視

import boto3

cloudwatch = boto3.client('cloudwatch')

def setup_monitoring():
    """
    CloudWatch監視設定
    """
    
    # カスタムメトリクス
    metrics = [
        {
            'MetricName': 'ScrapingSuccessRate',
            'Namespace': 'ScrapingSystem',
            'Value': success_rate,
            'Unit': 'Percent'
        },
        {
            'MetricName': 'AverageResponseTime',
            'Namespace': 'ScrapingSystem',
            'Value': avg_response_time,
            'Unit': 'Milliseconds'
        },
        {
            'MetricName': 'BrightDataUsage',
            'Namespace': 'ScrapingSystem',
            'Value': bytes_used,
            'Unit': 'Bytes'
        }
    ]
    
    for metric in metrics:
        cloudwatch.put_metric_data(
            Namespace=metric['Namespace'],
            MetricData=[metric]
        )

def create_alarms():
    """
    CloudWatchアラーム設定
    """
    
    alarms = [
        {
            'AlarmName': 'HighErrorRate',
            'MetricName': 'Errors',
            'Threshold': 10,
            'ComparisonOperator': 'GreaterThanThreshold'
        },
        {
            'AlarmName': 'HighCostAlert',
            'MetricName': 'BrightDataUsage',
            'Threshold': 80 * 1024 * 1024 * 1024,  # 80GB
            'ComparisonOperator': 'GreaterThanThreshold'
        }
    ]
    
    return alarms

デプロイとCI/CD

SAMテンプレート例

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Parameters:
  BrightDataUsername:
    Type: String
    Description: Bright Data Username
  BrightDataPassword:
    Type: String
    NoEcho: true
    Description: Bright Data Password

Resources:
  ScrapingFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/
      Handler: lambda_function.lambda_handler
      Runtime: python3.9
      MemorySize: 1769
      Timeout: 120
      ReservedConcurrencyLimit: 100
      Environment:
        Variables:
          BRIGHT_DATA_USERNAME: !Ref BrightDataUsername
          BRIGHT_DATA_PASSWORD: !Ref BrightDataPassword
          BRIGHT_DATA_ENDPOINT: brd-customer-hl_123456-zone-datacenter.brd.superproxy.io:22225
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt ScrapingQueue.Arn
            BatchSize: 10

  ScrapingQueue:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeoutSeconds: 180
      MessageRetentionPeriod: 1209600  # 14日

  StateTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: scraping-state
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: url_hash
          AttributeType: S
      KeySchema:
        - AttributeName: url_hash
          KeyType: HASH
      TimeToLiveSpecification:
        AttributeName: ttl
        Enabled: true

パフォーマンスチューニング

1. 同時実行数の最適化

def calculate_optimal_concurrency(target_rps, avg_duration):
    """
    最適な同時実行数を計算
    
    Args:
        target_rps: 目標リクエスト/秒
        avg_duration: 平均実行時間(秒)
    
    Returns:
        推奨同時実行数
    """
    
    # Little's Law: L = λ × W
    # L = 同時実行数, λ = 到着率, W = 処理時間
    optimal_concurrency = target_rps * avg_duration
    
    # AWS Lambda制限を考慮
    max_concurrency = min(optimal_concurrency, 1000)
    
    return {
        'recommended': max_concurrency,
        'scaling_time': max_concurrency / 100,  # 100/秒でスケール
        'max_rps': max_concurrency / avg_duration
    }

# 使用例
performance = calculate_optimal_concurrency(
    target_rps=100,  # 100リクエスト/秒
    avg_duration=5   # 5秒/リクエスト
)
print(f"推奨同時実行数: {performance['recommended']}")

2. メモリ使用量最適化

import psutil
import tracemalloc

def memory_profiler(func):
    """
    メモリ使用量プロファイラー
    """
    def wrapper(*args, **kwargs):
        tracemalloc.start()
        
        # 実行前メモリ
        process = psutil.Process()
        memory_before = process.memory_info().rss / 1024 / 1024  # MB
        
        try:
            result = func(*args, **kwargs)
        finally:
            # 実行後メモリ
            memory_after = process.memory_info().rss / 1024 / 1024  # MB
            current, peak = tracemalloc.get_traced_memory()
            tracemalloc.stop()
            
            print(f"メモリ使用量: {memory_after - memory_before:.2f}MB")
            print(f"ピークメモリ: {peak / 1024 / 1024:.2f}MB")
        
        return result
    return wrapper

よくある問題と解決策

Q1: Lambda関数がタイムアウトする

原因: 大きなページの処理時間が長い 解決策:

def handle_large_pages(url, max_size=10*1024*1024):  # 10MB制限
    """
    大きなページの処理制限
    """
    response = requests.get(url, stream=True)
    
    content = b""
    for chunk in response.iter_content(chunk_size=8192):
        content += chunk
        if len(content) > max_size:
            raise Exception(f"ページサイズが制限を超過: {len(content)} bytes")
    
    return content.decode('utf-8')

Q2: Bright Dataの料金が予想より高い

原因: 帯域幅の計算方法を理解していない 解決策:

def optimize_bandwidth_usage():
    """
    帯域幅使用量最適化
    """
    
    # 必要なヘッダーのみ取得
    headers = {
        'Range': 'bytes=0-1023'  # 最初の1KBのみ
    }
    
    # 圧縮を有効化
    headers['Accept-Encoding'] = 'gzip, deflate, br'
    
    # 不要なリソースをフィルタ
    headers['Accept'] = 'text/html'  # HTMLのみ
    
    return headers

Q3: 同時実行制限に達する

原因: 急激なトラフィック増加 解決策:

def implement_rate_limiting():
    """
    レート制限の実装
    """
    
    # SQSでメッセージ配信速度を制御
    # DelaySeconds とVisibilityTimeoutを調整
    
    sqs_config = {
        'DelaySeconds': 1,  # 1秒遅延
        'VisibilityTimeoutSeconds': 180,  # 3分
        'MaxReceiveCount': 3  # 最大リトライ回数
    }
    
    return sqs_config

実際の導入事例

EC物価監視システム

要件:

  • 1,000サイト × 10,000商品の価格監視
  • 1時間ごとの更新
  • 99.9%の可用性

実装結果:

  • 処理能力: 10万リクエスト/時間
  • 成功率: 99.7%
  • 月額コスト: $180(従来の$800から削減)
  • レスポンス時間: 平均3.2秒
# 実際の設定例
PRODUCTION_CONFIG = {
    'lambda_memory': 1769,
    'lambda_timeout': 120,
    'reserved_concurrency': 200,
    'sqs_batch_size': 10,
    'retry_attempts': 3,
    'bright_data_pool': 'datacenter',
    'monitoring_interval': 300  # 5分
}

まとめ:次世代スクレイピングアーキテクチャ

AWS LambdaとBright Dataの組み合わせは、大規模スクレイピングの新しいスタンダードです。

主な利点:

  1. コスト効率: 従来比69%のコスト削減
  2. スケーラビリティ: 最大100万リクエスト/分の処理能力
  3. 運用負荷軽減: サーバー管理不要
  4. 高い可用性: AWSの99.99%SLAを活用

推奨される用途:

  • E-commerce価格監視
  • 不動産情報収集
  • ニュース・SNS監視
  • SEO競合分析

この技術スタックにより、これまで大企業でしか実現できなかった大規模データ収集が、中小企業でも手軽に導入できるようになりました。

今すぐ始める: Bright Data無料トライアルでプロキシサービスを試し、AWS LambdaとBright Dataを使った次世代スクレイピングシステムを構築しましょう。詳細な実装についてはBright Data料金体系の解説もご参照ください。


本記事の情報は2025年1月6日時点のものです。AWS Lambda及びBright Dataのサービス仕様は変更される可能性があるため、最新情報は各公式ドキュメントをご確認ください。

この記事が役に立ちましたら、ぜひシェアしてください。

関連記事

関連記事の実装を準備中です。