Skip to content
Go back

バルクCSV一括ダウンロード:バックフィル4時間を数分に短縮する

Edit page

日本株システムトレードのバックフィル処理(過去データ一括取得)は、J-Quants /prices/daily_quotes APIを1日ずつ呼び出す方式で約4時間かかっていた。J-Quants V2で追加された /bulk/ APIを活用し、月次gzip CSVを一括ダウンロードすることで、約50倍の高速化(4時間→数分)を達成した。

加えて、Standard プランの1,000リクエスト/日枠を日次取得に温存できるため、レートリミットの余裕も確保できた。


Before:1日ずつAPI呼び出しの問題

既存のバックフィルフロー

2015年1月〜2026年3月(約2,750営業日)をバックフィルする場合:

for date in date_range("2015-01-01", "2026-03-31"):
    response = jquants_client.get_daily_quotes(date=date)
    # → 1リクエスト/日 × 2,750日 = 2,750リクエスト
    # → レートリミット考慮で約4時間
    transform(response)
    validate(response)
    save_to_db(response)

問題点:

  1. 所要時間: 約4時間(レートリミット待ち含む)
  2. API枠消費: 2,750リクエスト = Standard プランの約3日分の枠
  3. エラーリカバリ: 途中で失敗すると最初からやり直し
  4. メモリ: 問題なし(1日分ずつ処理するため)

After:バルクCSV一括ダウンロード

J-Quants /bulk/ API

V2で追加されたバルクAPIは、月単位(historical)または日単位(live)のgzip圧縮CSVを一括ダウンロードできる。

API呼び出しフロー:

1. GET /bulk/list?endpoint=daily_quotes&mode=historical
   → ファイルキーの一覧を取得(約130件 = 10年分の月次ファイル)

2. GET /bulk/get?key={file_key}
   → 各ファイルのダウンロードURLを取得

3. GET {download_url}
   → gzip CSVをダウンロード(各ファイル 2-5MB)

合計リクエスト数: 130 × 2 + 1 = 約261リクエスト(従来の1/10)
合計所要時間: 約10分(従来の1/24)

ファイルキーの形式

# historical(月次、過去データ)
equities/bars/daily/historical/2024/equities_bars_daily_202401.csv.gz
equities/bars/daily/historical/2024/equities_bars_daily_202402.csv.gz
...

# live(日次、直近データ)
equities/bars/daily/live/equities_bars_daily_20260328.csv.gz
equities/bars/daily/live/equities_bars_daily_20260331.csv.gz

実装

BulkDataFetcher

# _getJQuantsStocks/data/bulk_fetcher.py

class BulkDataFetcher:
    def __init__(self, client: JQuantsClient, db_writer: DBWriter):
        self._client = client
        self._db_writer = db_writer

    def fetch_and_store(
        self,
        start_date: date | None = None,
        end_date: date | None = None,
    ) -> int:
        """バルク一括取得・DB保存"""
        total_rows = 0

        # 1. ファイルリスト取得
        file_keys = self._client.get_bulk_list("daily_quotes")
        logger.info(f"Bulk list: {len(file_keys)} files")

        # 2. 日付範囲でフィルタリング
        filtered_keys = [
            key for key in file_keys
            if self._is_in_range(key, start_date, end_date)
        ]
        logger.info(f"Filtered to {len(filtered_keys)} files")

        # 3. 各ファイルをダウンロード・パース・保存
        for i, key in enumerate(filtered_keys):
            try:
                df = self._download_and_parse_csv(key)
                if df is not None and len(df) > 0:
                    self._db_writer.write(df)
                    total_rows += len(df)
                    logger.info(
                        f"[{i+1}/{len(filtered_keys)}] "
                        f"{key}: {len(df)} rows"
                    )
            except Exception as e:
                logger.error(f"Failed to process {key}: {e}")
                # 個別ファイルの失敗は続行(エラーリカバリ)

            # メモリ管理: 各ファイル処理後にGC
            import gc
            gc.collect()

        return total_rows

    def _download_and_parse_csv(self, key: str) -> pd.DataFrame | None:
        """gzip CSV をダウンロードしてDataFrameに変換"""
        # ダウンロードURL取得
        url = self._client.get_bulk_download_url(key)

        # gzip CSVダウンロード・パース
        response = requests.get(url, timeout=120)
        response.raise_for_status()

        with gzip.open(io.BytesIO(response.content), 'rt') as f:
            df = pd.read_csv(f)

        return df

    def _parse_date_from_key(self, key: str) -> date | None:
        """ファイルキーから日付を抽出"""
        # historical: ...equities_bars_daily_202401.csv.gz → 2024-01-01
        # live: ...equities_bars_daily_20260328.csv.gz → 2026-03-28
        import re

        match = re.search(r'(\d{6,8})\.csv\.gz$', key)
        if match:
            digits = match.group(1)
            if len(digits) == 6:  # YYYYMM
                return date(int(digits[:4]), int(digits[4:6]), 1)
            elif len(digits) == 8:  # YYYYMMDD
                return date(int(digits[:4]), int(digits[4:6]), int(digits[6:8]))
        return None

APIクライアントの拡張

# _getJQuantsStocks/api/client.py(拡張部分)

class JQuantsClient:
    def get_bulk_list(self, endpoint: str) -> list[str]:
        """バルクファイルリストを取得"""
        response = self._request(
            "GET",
            "/bulk/list",
            params={"endpoint": endpoint},
        )
        return [item["key"] for item in response.get("files", [])]

    def get_bulk_download_url(self, key: str) -> str:
        """バルクファイルのダウンロードURLを取得"""
        response = self._request(
            "GET",
            "/bulk/get",
            params={"key": key},
        )
        return response["download_url"]

CLIインターフェース

# _getJQuantsStocks/main.py

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--use-bulk", action="store_true",
                        help="Use bulk API for backfill (faster)")
    parser.add_argument("--start-date", type=str, default=None)
    parser.add_argument("--end-date", type=str, default=None)
    args = parser.parse_args()

    if args.use_bulk:
        # バルクAPI使用(バックフィル向け)
        fetcher = BulkDataFetcher(client, db_writer)
        total = fetcher.fetch_and_store(
            start_date=parse_date(args.start_date),
            end_date=parse_date(args.end_date),
        )
        logger.info(f"Bulk fetch completed: {total} rows")
    else:
        # 既存の日次API呼び出し(日常運用向け)
        daily_fetcher.fetch_and_store()

V2認証への移行

V1(Bearer token)からV2(x-api-key)への認証方式の変更にも対応した。

# V1(旧)
headers = {"Authorization": f"Bearer {self._refresh_token()}"}

# V2(新)
headers = {"x-api-key": self._api_key}
# → トークンリフレッシュ不要、シンプルかつ高速

パフォーマンス比較

指標Before(日次API)After(バルクAPI)
10年分バックフィル約4時間約10分
APIリクエスト数約2,750約261
ネットワーク転送量約500MB(JSON)約250MB(gzip CSV)
メモリピーク低い(1日ずつ)約500MB(月ごとにGC)
エラーリカバリ最初からやり直し失敗ファイルのみスキップ
API枠消費3日分の枠1日分の枠以下

テスト戦略

テスト構成(36テスト + E2E検証):
├── BulkDataFetcher テスト(18件)
│   ├── fetch_and_store 正常系
│   ├── 日付範囲フィルタリング
│   ├── 空のファイルリスト
│   ├── ダウンロード失敗時の続行
│   └── メモリ管理(GC呼び出し確認)
├── ファイルキーパース テスト(8件)
│   ├── historical形式(YYYYMM)
│   ├── live形式(YYYYMMDD)
│   ├── 不正なキー形式
│   └── 境界値テスト
├── APIクライアント テスト(6件)
│   ├── get_bulk_list 正常系
│   ├── get_bulk_download_url 正常系
│   ├── 認証エラー
│   └── タイムアウト
├── 統合テスト(4件)
│   ├── 既存DataTransformerとの互換性
│   ├── 既存DataValidatorとの互換性
│   ├── DBWriter書き込み確認
│   └── --use-bulk フラグ動作
└── E2E検証
    └── 実際のJ-Quants APIでの動作確認

まとめ

  1. J-Quants V2の /bulk/ APIを活用し、バックフィル処理を4時間→約10分(約24倍高速化)に短縮した。月次gzip CSVの一括ダウンロードにより、APIリクエスト数も2,750→261に削減

  2. --use-bulk フラグで有効化する設計により、日常運用(日次API呼び出し)とバックフィル(バルクAPI)を明確に分離。既存のDataTransformer・DataValidator・DBWriterをそのまま再利用し、変更箇所を最小化

  3. 各ファイル処理後のGC実行でメモリピークを500MB以下に抑制。個別ファイルの失敗をスキップして続行するエラーリカバリにより、途中失敗時もやり直しが不要


関連記事


Edit page
Share this post on:

Previous Post
適応的ドローダウン回復モード:3段階ソフトストップで資産を守る
Next Post
デイトレードのシグナル改善:セッション限定フィルターで勝率30%→37%に引き上げた