jpskill.com
💼 ビジネス コミュニティ

batch-processor

複数のドキュメントを並行処理でまとめて処理し、ファイルの一括変換や名前変更、ディレクトリ内のファイルを同時に処理する際に、エラー処理や進捗管理を行いながら効率的に作業を進めるSkill。

📜 元の英語説明(参考)

Process multiple documents in bulk with parallel execution. Use when a user asks to batch process files, convert many documents at once, run parallel file operations, bulk rename, bulk transform, or process a directory of files concurrently. Covers parallel execution, error handling, and progress tracking.

🇯🇵 日本人クリエイター向け解説

一言でいうと

複数のドキュメントを並行処理でまとめて処理し、ファイルの一括変換や名前変更、ディレクトリ内のファイルを同時に処理する際に、エラー処理や進捗管理を行いながら効率的に作業を進めるSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o batch-processor.zip https://jpskill.com/download/14682.zip && unzip -o batch-processor.zip && rm batch-processor.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/14682.zip -OutFile "$d\batch-processor.zip"; Expand-Archive "$d\batch-processor.zip" -DestinationPath $d -Force; ri "$d\batch-processor.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して batch-processor.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → batch-processor フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

バッチプロセッサ

概要

並列実行を使用して、複数のドキュメントとファイルをまとめて処理します。構成可能な並行性、エラーリカバリ、および進捗状況のレポートにより、数百または数千のファイルにわたるフォーマット変換、データ抽出、変換、および検証を含む大規模なファイル操作を処理します。

指示

ユーザーがバッチ処理を要求した場合、どの方法が彼らのニーズに合っているかを判断します。

タスク A: シェルツールを使用した並列ファイル処理

単純な変換には、xargs または GNU parallel を使用します。

# ImageMagick を使用してすべての PNG ファイルを JPEG に変換する (8 並列ジョブ)
find ./images -name "*.png" | xargs -P 8 -I {} bash -c \
  'convert "$1" "${1%.png}.jpg"' _ {}

# GNU parallel とプログレスバーを使用してファイルを処理する
find ./docs -name "*.csv" | parallel --bar --jobs 8 \
  'python transform.py {} {.}_processed.csv'

# PDF をまとめて圧縮する (4 並列ジョブ)
find ./reports -name "*.pdf" | xargs -P 4 -I {} bash -c \
  'gs -sDEVICE=pdfwrite -dCompatibilityLevel=1.4 -dPDFSETTINGS=/ebook \
   -dNOPAUSE -dBATCH -sOutputFile="{}.compressed" "{}" && mv "{}.compressed" "{}"'

タスク B: 並行性制御を備えた Python バッチプロセッサ

再利用可能なバッチ処理スクリプトを作成します。

import asyncio
import os
from pathlib import Path
from dataclasses import dataclass, field

@dataclass
class BatchResult:
    total: int = 0
    success: int = 0
    failed: int = 0
    errors: list = field(default_factory=list)

async def process_file(filepath: Path, semaphore: asyncio.Semaphore) -> tuple[bool, str]:
    async with semaphore:
        try:
            # 実際の処理ロジックに置き換えてください
            content = filepath.read_text()
            output = content.upper()  # 変換の例
            out_path = filepath.with_suffix('.processed' + filepath.suffix)
            out_path.write_text(output)
            return True, str(filepath)
        except Exception as e:
            return False, f"{filepath}: {e}"

async def batch_process(
    input_dir: str,
    pattern: str = "*.*",
    max_concurrent: int = 10
) -> BatchResult:
    semaphore = asyncio.Semaphore(max_concurrent)
    files = list(Path(input_dir).glob(pattern))
    result = BatchResult(total=len(files))

    tasks = [process_file(f, semaphore) for f in files]
    for coro in asyncio.as_completed(tasks):
        success, msg = await coro
        if success:
            result.success += 1
        else:
            result.failed += 1
            result.errors.append(msg)
        # 進捗状況のレポート
        done = result.success + result.failed
        print(f"\rProgress: {done}/{result.total}", end="", flush=True)

    print()  # 進捗状況の後の改行
    return result

if __name__ == "__main__":
    result = asyncio.run(batch_process("./input", pattern="*.txt", max_concurrent=8))
    print(f"Done: {result.success} succeeded, {result.failed} failed")
    for err in result.errors:
        print(f"  ERROR: {err}")

タスク C: エラーリカバリによるバッチ処理

長時間実行されるジョブの場合は、進捗状況を追跡し、再開を許可します。

import json
from pathlib import Path

PROGRESS_FILE = ".batch_progress.json"

def load_progress() -> set:
    if Path(PROGRESS_FILE).exists():
        return set(json.loads(Path(PROGRESS_FILE).read_text()))
    return set()

def save_progress(completed: set):
    Path(PROGRESS_FILE).write_text(json.dumps(list(completed)))

def batch_with_resume(input_dir: str, pattern: str = "*.*"):
    completed = load_progress()
    files = [f for f in Path(input_dir).glob(pattern) if str(f) not in completed]
    print(f"Resuming: {len(completed)} done, {len(files)} remaining")

    for i, filepath in enumerate(files):
        try:
            process_single_file(filepath)  # あなたの処理関数
            completed.add(str(filepath))
            if i % 10 == 0:  # 10 ファイルごとにチェックポイント
                save_progress(completed)
        except KeyboardInterrupt:
            save_progress(completed)
            print(f"\nSaved progress at {len(completed)} files")
            raise
        except Exception as e:
            print(f"Error on {filepath}: {e}")

    save_progress(completed)
    Path(PROGRESS_FILE).unlink()  # 完了時にクリーンアップします

タスク D: ロギングによるシェルベースのバッチ

#!/bin/bash
INPUT_DIR="$1"
OUTPUT_DIR="$2"
LOG_FILE="batch_$(date +%Y%m%d_%H%M%S).log"
PARALLEL_JOBS=8
TOTAL=$(find "$INPUT_DIR" -type f | wc -l)
COUNT=0

mkdir -p "$OUTPUT_DIR"

process_file() {
    local file="$1"
    local outfile="$OUTPUT_DIR/$(basename "$file")"
    # 処理コマンドに置き換えてください
    cp "$file" "$outfile" 2>&1
    echo $?
}

export -f process_file
export OUTPUT_DIR

find "$INPUT_DIR" -type f | parallel --jobs "$PARALLEL_JOBS" --bar \
  --joblog "$LOG_FILE" process_file {}

echo "Results logged to $LOG_FILE"
awk 'NR>1 {if($7!=0) fail++; else ok++} END {print ok" succeeded, "fail" failed"}' "$LOG_FILE"

例 1: Markdown ファイルのディレクトリを PDF に変換する

ユーザーリクエスト: "docs/ 内のすべての 200 個の Markdown ファイルを PDF に変換する"

# 必要に応じて pandoc をインストールします
# 6 ワーカーで並行して処理します
find ./docs -name "*.md" | parallel --bar --jobs 6 \
  'pandoc {} -o {.}.pdf --pdf-engine=xelatex'
echo "Conversion complete. Check for errors above."

例 2: 数百の画像からテキストを抽出する

ユーザーリクエスト: "scans/ フォルダ内のすべてのスキャンされたドキュメントを OCR 処理する"

# tesseract を並列処理で使用する
find ./scans -name "*.png" -o -name "*.jpg" | parallel --bar --jobs 4 \
  'tesseract {} {.} -l eng 2>/dev/null && echo "OK: {}"'

例 3: Web 用に画像をまとめてリサイズする

ユーザーリクエスト: "すべての製品画像を幅 800px にリサイズし、アスペクト比を維持する"

mkdir -p ./resized
find ./products -name "*.jpg" | xargs -P 8 -I {} bash -c \
  'convert "$1" -resize 800x -quality 85 "./resized/$(basename $1)"' _ {}
echo "Resized $(ls ./resized | wc -l) images"

ガイドライン

  • 常にバッチ操作をテストしてください

(原文がここで切り詰められています)

📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Batch Processor

Overview

Process multiple documents and files in bulk using parallel execution. Handles large-scale file operations including format conversion, data extraction, transformation, and validation across hundreds or thousands of files with configurable concurrency, error recovery, and progress reporting.

Instructions

When a user asks for batch processing, determine which approach fits their needs:

Task A: Parallel file processing with shell tools

For simple transformations, use xargs or GNU parallel:

# Convert all PNG files to JPEG using ImageMagick (8 parallel jobs)
find ./images -name "*.png" | xargs -P 8 -I {} bash -c \
  'convert "$1" "${1%.png}.jpg"' _ {}

# Process files with GNU parallel and progress bar
find ./docs -name "*.csv" | parallel --bar --jobs 8 \
  'python transform.py {} {.}_processed.csv'

# Bulk compress PDFs (4 parallel jobs)
find ./reports -name "*.pdf" | xargs -P 4 -I {} bash -c \
  'gs -sDEVICE=pdfwrite -dCompatibilityLevel=1.4 -dPDFSETTINGS=/ebook \
   -dNOPAUSE -dBATCH -sOutputFile="{}.compressed" "{}" && mv "{}.compressed" "{}"'

Task B: Python batch processor with concurrency control

Create a reusable batch processing script:

import asyncio
import os
from pathlib import Path
from dataclasses import dataclass, field

@dataclass
class BatchResult:
    total: int = 0
    success: int = 0
    failed: int = 0
    errors: list = field(default_factory=list)

async def process_file(filepath: Path, semaphore: asyncio.Semaphore) -> tuple[bool, str]:
    async with semaphore:
        try:
            # Replace with actual processing logic
            content = filepath.read_text()
            output = content.upper()  # Example transformation
            out_path = filepath.with_suffix('.processed' + filepath.suffix)
            out_path.write_text(output)
            return True, str(filepath)
        except Exception as e:
            return False, f"{filepath}: {e}"

async def batch_process(
    input_dir: str,
    pattern: str = "*.*",
    max_concurrent: int = 10
) -> BatchResult:
    semaphore = asyncio.Semaphore(max_concurrent)
    files = list(Path(input_dir).glob(pattern))
    result = BatchResult(total=len(files))

    tasks = [process_file(f, semaphore) for f in files]
    for coro in asyncio.as_completed(tasks):
        success, msg = await coro
        if success:
            result.success += 1
        else:
            result.failed += 1
            result.errors.append(msg)
        # Progress reporting
        done = result.success + result.failed
        print(f"\rProgress: {done}/{result.total}", end="", flush=True)

    print()  # Newline after progress
    return result

if __name__ == "__main__":
    result = asyncio.run(batch_process("./input", pattern="*.txt", max_concurrent=8))
    print(f"Done: {result.success} succeeded, {result.failed} failed")
    for err in result.errors:
        print(f"  ERROR: {err}")

Task C: Batch processing with error recovery

For long-running jobs, track progress and allow resuming:

import json
from pathlib import Path

PROGRESS_FILE = ".batch_progress.json"

def load_progress() -> set:
    if Path(PROGRESS_FILE).exists():
        return set(json.loads(Path(PROGRESS_FILE).read_text()))
    return set()

def save_progress(completed: set):
    Path(PROGRESS_FILE).write_text(json.dumps(list(completed)))

def batch_with_resume(input_dir: str, pattern: str = "*.*"):
    completed = load_progress()
    files = [f for f in Path(input_dir).glob(pattern) if str(f) not in completed]
    print(f"Resuming: {len(completed)} done, {len(files)} remaining")

    for i, filepath in enumerate(files):
        try:
            process_single_file(filepath)  # Your processing function
            completed.add(str(filepath))
            if i % 10 == 0:  # Checkpoint every 10 files
                save_progress(completed)
        except KeyboardInterrupt:
            save_progress(completed)
            print(f"\nSaved progress at {len(completed)} files")
            raise
        except Exception as e:
            print(f"Error on {filepath}: {e}")

    save_progress(completed)
    Path(PROGRESS_FILE).unlink()  # Clean up on completion

Task D: Shell-based batch with logging

#!/bin/bash
INPUT_DIR="$1"
OUTPUT_DIR="$2"
LOG_FILE="batch_$(date +%Y%m%d_%H%M%S).log"
PARALLEL_JOBS=8
TOTAL=$(find "$INPUT_DIR" -type f | wc -l)
COUNT=0

mkdir -p "$OUTPUT_DIR"

process_file() {
    local file="$1"
    local outfile="$OUTPUT_DIR/$(basename "$file")"
    # Replace with your processing command
    cp "$file" "$outfile" 2>&1
    echo $?
}

export -f process_file
export OUTPUT_DIR

find "$INPUT_DIR" -type f | parallel --jobs "$PARALLEL_JOBS" --bar \
  --joblog "$LOG_FILE" process_file {}

echo "Results logged to $LOG_FILE"
awk 'NR>1 {if($7!=0) fail++; else ok++} END {print ok" succeeded, "fail" failed"}' "$LOG_FILE"

Examples

Example 1: Convert a directory of Markdown files to PDF

User request: "Convert all 200 Markdown files in docs/ to PDF"

# Install pandoc if needed
# Process in parallel with 6 workers
find ./docs -name "*.md" | parallel --bar --jobs 6 \
  'pandoc {} -o {.}.pdf --pdf-engine=xelatex'
echo "Conversion complete. Check for errors above."

Example 2: Extract text from hundreds of images

User request: "OCR all scanned documents in the scans/ folder"

# Using tesseract with parallel processing
find ./scans -name "*.png" -o -name "*.jpg" | parallel --bar --jobs 4 \
  'tesseract {} {.} -l eng 2>/dev/null && echo "OK: {}"'

Example 3: Bulk resize images for web

User request: "Resize all product images to 800px wide, keep aspect ratio"

mkdir -p ./resized
find ./products -name "*.jpg" | xargs -P 8 -I {} bash -c \
  'convert "$1" -resize 800x -quality 85 "./resized/$(basename $1)"' _ {}
echo "Resized $(ls ./resized | wc -l) images"

Guidelines

  • Always test batch operations on a small subset (5-10 files) before processing the full set.
  • Set a reasonable concurrency limit. Start with CPU core count for CPU-bound tasks, or 2-4x for I/O-bound tasks.
  • Implement progress reporting so users can monitor long-running jobs.
  • Write errors to a log file rather than stopping the entire batch.
  • Create a checkpoint/resume mechanism for batches over 100 files.
  • Back up original files or write output to a separate directory; never overwrite in place without confirmation.
  • Use --dry-run flags in scripts to preview operations before executing.
  • Monitor system resources (RAM, disk space) during large batch operations.