jpskill.com
🛠️ 開発・MCP コミュニティ

logstash

Logstashを使って、ログの取り込みから解析、変換、Elasticsearchなどへの出力までをスムーズに行い、ログ処理のパイプライン構築や複雑なログ解析を効率化するSkill。

📜 元の英語説明(参考)

Configure Logstash for log ingestion, parsing, transformation, and output to Elasticsearch and other destinations. Use when a user needs to build log processing pipelines, write Grok patterns, parse unstructured logs, enrich events, or set up multi-pipeline Logstash deployments.

🇯🇵 日本人クリエイター向け解説

一言でいうと

Logstashを使って、ログの取り込みから解析、変換、Elasticsearchなどへの出力までをスムーズに行い、ログ処理のパイプライン構築や複雑なログ解析を効率化するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o logstash.zip https://jpskill.com/download/15086.zip && unzip -o logstash.zip && rm logstash.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/15086.zip -OutFile "$d\logstash.zip"; Expand-Archive "$d\logstash.zip" -DestinationPath $d -Force; ri "$d\logstash.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して logstash.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → logstash フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

Logstash

概要

ログデータを取り込み、解析、変換、ルーティングするための Logstash パイプラインを構築します。Grok パターンの作成、マルチパイプライン構成、input/output プラグイン、および本番環境へのデプロイメントのためのパフォーマンスチューニングについて説明します。

手順

タスク A: 基本的なパイプライン構成

# /etc/logstash/conf.d/main.conf — 基本的なログ処理パイプライン
input {
  beats {
    port => 5044
    ssl_enabled => false
  }

  tcp {
    port => 5000
    codec => json_lines
    tags => ["tcp-input"]
  }
}

filter {
  if [fields][type] == "nginx" {
    grok {
      match => {
        "message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status:int} %{NUMBER:bytes:int} "%{DATA:referrer}" "%{DATA:user_agent}"'
      }
    }
    date {
      match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
    geoip {
      source => "client_ip"
      target => "geo"
    }
    useragent {
      source => "user_agent"
      target => "ua"
    }
    mutate {
      remove_field => ["message", "timestamp", "user_agent"]
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{[fields][type]}-%{+YYYY.MM.dd}"
    manage_template => true
    template_overwrite => true
  }
}

タスク B: 高度な Grok パターン

# /etc/logstash/conf.d/app-logs.conf — アプリケーションログ形式の解析
filter {
  if [fields][type] == "app" {
    # 構造化された JSON ログの解析
    if [message] =~ /^\{/ {
      json {
        source => "message"
        target => "app"
      }
    } else {
      # カスタムアプリケーションログ形式の解析: 2026-02-19 12:00:00.123 [INFO] [req-abc123] OrderService - Order created
      grok {
        match => {
          "message" => "^%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] \[%{DATA:request_id}\] %{DATA:class} - %{GREEDYDATA:log_message}"
        }
      }
    }

    # 重要度マッピングによるエンリッチ
    translate {
      field => "level"
      destination => "severity_number"
      dictionary => {
        "TRACE" => "1"
        "DEBUG" => "5"
        "INFO" => "9"
        "WARN" => "13"
        "ERROR" => "17"
        "FATAL" => "21"
      }
    }

    # 「processed in 245ms」のようなログメッセージから期間を抽出
    grok {
      match => { "log_message" => "processed in %{NUMBER:duration_ms:float}ms" }
      tag_on_failure => []
    }

    date {
      match => ["timestamp", "yyyy-MM-dd HH:mm:ss.SSS", "ISO8601"]
      target => "@timestamp"
    }
  }
}
# /etc/logstash/patterns/custom — カスタム Grok パターン定義
JAVA_STACKTRACE (?:(?:\s+at\s+[\w.$]+\([^)]*\)\n?)+)
POSTGRES_LOG %{TIMESTAMP_ISO8601:timestamp} %{WORD:timezone} \[%{INT:pid}\] %{WORD:user}@%{WORD:database} %{LOGLEVEL:level}:  %{GREEDYDATA:message}
SPRING_LOG %{TIMESTAMP_ISO8601:timestamp}\s+%{LOGLEVEL:level}\s+%{INT:pid}\s+---\s+\[%{DATA:thread}\]\s+%{DATA:logger}\s+:\s+%{GREEDYDATA:message}

タスク C: マルチパイプライン構成

# /etc/logstash/pipelines.yml — 複数の独立したパイプラインの実行
- pipeline.id: nginx-pipeline
  path.config: "/etc/logstash/conf.d/nginx.conf"
  pipeline.workers: 2
  pipeline.batch.size: 250

- pipeline.id: app-pipeline
  path.config: "/etc/logstash/conf.d/app-logs.conf"
  pipeline.workers: 4
  pipeline.batch.size: 500

- pipeline.id: audit-pipeline
  path.config: "/etc/logstash/conf.d/audit.conf"
  pipeline.workers: 1
  queue.type: persisted
  queue.max_bytes: 4gb
# /etc/logstash/conf.d/audit.conf — デッドレターキューを使用した監査ログパイプライン
input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["audit-events"]
    group_id => "logstash-audit"
    codec => json
    consumer_threads => 3
  }
}

filter {
  fingerprint {
    source => ["user_id", "action", "@timestamp"]
    target => "event_fingerprint"
    method => "SHA256"
  }
  mutate {
    add_field => {
      "[@metadata][index]" => "audit-%{+YYYY.MM}"
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "%{[@metadata][index]}"
    document_id => "%{event_fingerprint}"
    action => "create"
  }
}

タスク D: Docker デプロイメント

# docker-compose.yml — カスタム構成とパターンを使用した Logstash
services:
  logstash:
    image: docker.elastic.co/logstash/logstash:8.12.0
    environment:
      - LS_JAVA_OPTS=-Xms1g -Xmx1g
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    volumes:
      - ./logstash/conf.d:/etc/logstash/conf.d
      - ./logstash/patterns:/etc/logstash/patterns
      - ./logstash/pipelines.yml:/usr/share/logstash/config/pipelines.yml
    ports:
      - "5044:5044"
      - "5000:5000"
      - "9600:9600"   # Monitoring API

タスク E: パフォーマンス監視

# Logstash パイプラインの統計情報の確認
curl -s "http://localhost:9600/_node/stats/pipelines" | \
  jq '.pipelines | to_entries[] | {
    pipeline: .key,
    events_in: .value.events.in,
    events_out: .value.events.out,
    events_filtered: .value.events.filtered,
    queue_size: .value.queue.events_count
  }'
# パフォーマンスの問題に関するホットスレッドの確認
curl -s "http://localhost:9600/_node/hot_threads?human=true"

ベストプラクティス

  • 重要なデータを処理するパイプラインには、永続キュー (queue.type: persisted) を使用します。
  • デプロイする前に、Kibana の grokdebugger で Grok パターンをテストします。
  • オプションの Grok マッチングには tag_on_failure => [] を使用して、_grokparsefailure タグを回避します。
  • 障害を分離し、ワーカーを個別に調整するために、データソースごとにパイプラインを分離します。
  • スループットを向上させるには pipeline.batch.size を大きく (500-1000) 設定し、レイテンシーを短縮するには小さく (125) 設定します。
  • [@metadata] フィールドはルーティングロジックに使用します。これらは出力に送信されません。
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Logstash

Overview

Build Logstash pipelines to ingest, parse, transform, and route log data. Covers Grok pattern writing, multi-pipeline configuration, input/output plugins, and performance tuning for production deployments.

Instructions

Task A: Basic Pipeline Configuration

# /etc/logstash/conf.d/main.conf — Basic log processing pipeline
input {
  beats {
    port => 5044
    ssl_enabled => false
  }

  tcp {
    port => 5000
    codec => json_lines
    tags => ["tcp-input"]
  }
}

filter {
  if [fields][type] == "nginx" {
    grok {
      match => {
        "message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status:int} %{NUMBER:bytes:int} "%{DATA:referrer}" "%{DATA:user_agent}"'
      }
    }
    date {
      match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
    geoip {
      source => "client_ip"
      target => "geo"
    }
    useragent {
      source => "user_agent"
      target => "ua"
    }
    mutate {
      remove_field => ["message", "timestamp", "user_agent"]
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{[fields][type]}-%{+YYYY.MM.dd}"
    manage_template => true
    template_overwrite => true
  }
}

Task B: Advanced Grok Patterns

# /etc/logstash/conf.d/app-logs.conf — Parse application log formats
filter {
  if [fields][type] == "app" {
    # Parse structured JSON logs
    if [message] =~ /^\{/ {
      json {
        source => "message"
        target => "app"
      }
    } else {
      # Parse custom app log format: 2026-02-19 12:00:00.123 [INFO] [req-abc123] OrderService - Order created
      grok {
        match => {
          "message" => "^%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] \[%{DATA:request_id}\] %{DATA:class} - %{GREEDYDATA:log_message}"
        }
      }
    }

    # Enrich with severity mapping
    translate {
      field => "level"
      destination => "severity_number"
      dictionary => {
        "TRACE" => "1"
        "DEBUG" => "5"
        "INFO" => "9"
        "WARN" => "13"
        "ERROR" => "17"
        "FATAL" => "21"
      }
    }

    # Extract duration from log messages like "processed in 245ms"
    grok {
      match => { "log_message" => "processed in %{NUMBER:duration_ms:float}ms" }
      tag_on_failure => []
    }

    date {
      match => ["timestamp", "yyyy-MM-dd HH:mm:ss.SSS", "ISO8601"]
      target => "@timestamp"
    }
  }
}
# /etc/logstash/patterns/custom — Custom Grok pattern definitions
JAVA_STACKTRACE (?:(?:\s+at\s+[\w.$]+\([^)]*\)\n?)+)
POSTGRES_LOG %{TIMESTAMP_ISO8601:timestamp} %{WORD:timezone} \[%{INT:pid}\] %{WORD:user}@%{WORD:database} %{LOGLEVEL:level}:  %{GREEDYDATA:message}
SPRING_LOG %{TIMESTAMP_ISO8601:timestamp}\s+%{LOGLEVEL:level}\s+%{INT:pid}\s+---\s+\[%{DATA:thread}\]\s+%{DATA:logger}\s+:\s+%{GREEDYDATA:message}

Task C: Multi-Pipeline Configuration

# /etc/logstash/pipelines.yml — Run multiple independent pipelines
- pipeline.id: nginx-pipeline
  path.config: "/etc/logstash/conf.d/nginx.conf"
  pipeline.workers: 2
  pipeline.batch.size: 250

- pipeline.id: app-pipeline
  path.config: "/etc/logstash/conf.d/app-logs.conf"
  pipeline.workers: 4
  pipeline.batch.size: 500

- pipeline.id: audit-pipeline
  path.config: "/etc/logstash/conf.d/audit.conf"
  pipeline.workers: 1
  queue.type: persisted
  queue.max_bytes: 4gb
# /etc/logstash/conf.d/audit.conf — Audit log pipeline with dead letter queue
input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["audit-events"]
    group_id => "logstash-audit"
    codec => json
    consumer_threads => 3
  }
}

filter {
  fingerprint {
    source => ["user_id", "action", "@timestamp"]
    target => "event_fingerprint"
    method => "SHA256"
  }
  mutate {
    add_field => {
      "[@metadata][index]" => "audit-%{+YYYY.MM}"
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "%{[@metadata][index]}"
    document_id => "%{event_fingerprint}"
    action => "create"
  }
}

Task D: Docker Deployment

# docker-compose.yml — Logstash with custom config and patterns
services:
  logstash:
    image: docker.elastic.co/logstash/logstash:8.12.0
    environment:
      - LS_JAVA_OPTS=-Xms1g -Xmx1g
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    volumes:
      - ./logstash/conf.d:/etc/logstash/conf.d
      - ./logstash/patterns:/etc/logstash/patterns
      - ./logstash/pipelines.yml:/usr/share/logstash/config/pipelines.yml
    ports:
      - "5044:5044"
      - "5000:5000"
      - "9600:9600"   # Monitoring API

Task E: Performance Monitoring

# Check Logstash pipeline stats
curl -s "http://localhost:9600/_node/stats/pipelines" | \
  jq '.pipelines | to_entries[] | {
    pipeline: .key,
    events_in: .value.events.in,
    events_out: .value.events.out,
    events_filtered: .value.events.filtered,
    queue_size: .value.queue.events_count
  }'
# Check hot threads for performance issues
curl -s "http://localhost:9600/_node/hot_threads?human=true"

Best Practices

  • Use persisted queues (queue.type: persisted) for pipelines processing critical data
  • Test Grok patterns with grokdebugger in Kibana before deploying
  • Use tag_on_failure => [] for optional Grok matches to avoid _grokparsefailure tags
  • Separate pipelines by data source to isolate failures and tune workers independently
  • Set pipeline.batch.size higher (500-1000) for throughput, lower (125) for latency
  • Use [@metadata] fields for routing logic — they are not sent to outputs