jpskill.com
🛠️ 開発・MCP コミュニティ

worldmonitor

AIを活用し、ニュースや地政学的な出来事、インフラ情報を集約したリアルタイムな情報ダッシュボードを構築することで、グローバルな出来事を監視し、状況認識を高めるシステムを構築するSkill。

📜 元の英語説明(参考)

Build real-time intelligence dashboards that aggregate news, geopolitical events, and infrastructure data using AI. Use when: building news aggregation systems, monitoring global events, creating situational awareness dashboards.

🇯🇵 日本人クリエイター向け解説

一言でいうと

AIを活用し、ニュースや地政学的な出来事、インフラ情報を集約したリアルタイムな情報ダッシュボードを構築することで、グローバルな出来事を監視し、状況認識を高めるシステムを構築するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o worldmonitor.zip https://jpskill.com/download/15574.zip && unzip -o worldmonitor.zip && rm worldmonitor.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/15574.zip -OutFile "$d\worldmonitor.zip"; Expand-Archive "$d\worldmonitor.zip" -DestinationPath $d -Force; ri "$d\worldmonitor.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して worldmonitor.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → worldmonitor フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

World Monitor

概要

複数のソース(RSS、ニュースAPI、ソーシャルメディア)からデータを取り込み、AIを使用して分類、要約、重複排除、重要度スコアリングを行い、ライブダッシュボードに更新をプッシュする、リアルタイムのインテリジェンスダッシュボードを構築します。AIを活用した独自の状況室と考えてください。

指示

ユーザーがニュース監視システム、インテリジェンスダッシュボード、またはイベント集約フィードの構築を依頼した場合:

  1. スコープの定義 — 監視するトピック/地域/業界は?
  2. ソースの選択 — RSSフィード、NewsAPI、ソーシャルAPI、カスタムスクレイパー
  3. パイプラインのセットアップ — 取り込み → 重複排除 → 分類 → 要約 → スコアリング
  4. 出力の構築 — リアルタイムプッシュ用のAPI + WebSocket、アラートルール

ソース取り込み (Python)

"""Fetch from multiple source types in parallel."""
import asyncio, hashlib, feedparser, httpx
from datetime import datetime, timezone

class NewsItem:
    def __init__(self, title: str, content: str, source: str, url: str, published: datetime):
        self.title, self.content, self.source, self.url = title, content, source, url
        self.published = published
        self.id = hashlib.sha256(f"{title}{url}".encode()).hexdigest()[:16]

async def fetch_rss(feeds: list[str]) -> list[NewsItem]:
    items = []
    async with httpx.AsyncClient(timeout=15) as client:
        responses = await asyncio.gather(*[client.get(url) for url in feeds], return_exceptions=True)
    for resp in responses:
        if isinstance(resp, Exception):
            continue
        feed = feedparser.parse(resp.text)
        for entry in feed.entries[:20]:
            items.append(NewsItem(
                title=entry.get("title", ""), content=entry.get("summary", ""),
                source=feed.feed.get("title", "RSS"), url=entry.get("link", ""),
                published=datetime.now(timezone.utc),
            ))
    return items

async def fetch_newsapi(query: str, api_key: str) -> list[NewsItem]:
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://newsapi.org/v2/everything",
            params={"q": query, "sortBy": "publishedAt", "pageSize": 50},
            headers={"X-Api-Key": api_key})
        data = resp.json()
    return [
        NewsItem(title=a["title"], content=a.get("description", ""),
                 source=a["source"]["name"], url=a["url"],
                 published=datetime.fromisoformat(a["publishedAt"].replace("Z", "+00:00")))
        for a in data.get("articles", [])
    ]

重複排除

from difflib import SequenceMatcher

def deduplicate(items: list[NewsItem], threshold: float = 0.75) -> list[NewsItem]:
    unique, seen_titles = [], []
    for item in sorted(items, key=lambda x: x.published, reverse=True):
        is_dup = any(SequenceMatcher(None, item.title.lower(), s.lower()).ratio() > threshold for s in seen_titles)
        if not is_dup:
            unique.append(item)
            seen_titles.append(item.title)
    return unique

AI分類と要約

import json
from openai import OpenAI

client = OpenAI()
CATEGORIES = ["geopolitics", "technology", "finance", "security", "climate", "health", "regulation", "market-move"]

def analyze_article(item: NewsItem) -> dict:
    response = client.chat.completions.create(
        model="gpt-4o-mini", response_format={"type": "json_object"},
        messages=[
            {"role": "system", "content": f"Analyze this news. Return JSON: {{category: one of {CATEGORIES}, severity: 1-10, summary: '2-3 sentences', entities: [], sentiment: 'positive|negative|neutral', actionable: bool}}"},
            {"role": "user", "content": f"Title: {item.title}\n\nContent: {item.content[:2000]}"},
        ],
    )
    analysis = json.loads(response.choices[0].message.content)
    return {**analysis, "id": item.id, "title": item.title, "url": item.url, "source": item.source}

ダッシュボードAPI (Node.js)

import express from "express";
import { WebSocketServer, WebSocket } from "ws";
import { createServer } from "http";

interface IntelItem {
  id: string; title: string; summary: string; category: string;
  severity: number; source: string; url: string; timestamp: string;
}

const app = express();
const server = createServer(app);
const wss = new WebSocketServer({ server });
let feed: IntelItem[] = [];
const clients = new Set<WebSocket>();

wss.on("connection", (ws) => {
  clients.add(ws);
  ws.send(JSON.stringify({ type: "init", items: feed.slice(0, 50) }));
  ws.on("close", () => clients.delete(ws));
});

function broadcast(item: IntelItem) {
  const msg = JSON.stringify({ type: "new", item });
  clients.forEach((ws) => { if (ws.readyState === WebSocket.OPEN) ws.send(msg); });
}

app.post("/api/ingest", express.json(), (req, res) => {
  const item: IntelItem = { ...req.body, timestamp: new Date().toISOString() };
  feed.unshift(item);
  feed = feed.slice(0, 1000);
  broadcast(item);
  res.json({ ok: true });
});

app.get("/api/feed", (req, res) => {
  let items = feed;
  const { category, minSeverity, limit } = req.query;
  if (category) items = items.filter((i) => i.category === category);
  if (minSeverity) items = items.filter((i) => i.severity >= Number(minSeverity));
  res.json(items.slice(0, Number(limit) || 50));
});

server.listen(3000, () => console.log("Intelligence dashboard on :3000"));

アラートルール


import httpx

ALERT_RULES = [
    {"category": "security", "min_severity": 7, "channel": "slack"},
    {"category": "market-move", "min_severity": 8, "channel": "email"},
    {"category": "*", "min_severity": 9, "channel": "all"},
]

async def check_alerts(item: dict):
    for rule in ALERT_RULES:
        cat_match = rule["category"] == "*" or rule["category"] == item["category"]
        if cat_match and item["severity"] >= rule["min_severity"]:
            await send_alert(rule["channel"], item)

async def send_alert(channel: str, item: dict):
    msg = f"[{item['category'].upper()}] Severity {item[
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

World Monitor

Overview

Build real-time intelligence dashboards that ingest data from multiple sources (RSS, news APIs, social media), use AI to categorize, summarize, deduplicate, and score severity, then push updates to a live dashboard. Think of it as your own AI-powered situation room.

Instructions

When a user asks to build a news monitoring system, intelligence dashboard, or event aggregation feed:

  1. Define scope — What topics/regions/industries to monitor?
  2. Select sources — RSS feeds, NewsAPI, social APIs, custom scrapers
  3. Set up pipeline — Ingest → Deduplicate → Classify → Summarize → Score
  4. Build output — API + WebSocket for real-time push, alert rules

Source Ingestion (Python)

"""Fetch from multiple source types in parallel."""
import asyncio, hashlib, feedparser, httpx
from datetime import datetime, timezone

class NewsItem:
    def __init__(self, title: str, content: str, source: str, url: str, published: datetime):
        self.title, self.content, self.source, self.url = title, content, source, url
        self.published = published
        self.id = hashlib.sha256(f"{title}{url}".encode()).hexdigest()[:16]

async def fetch_rss(feeds: list[str]) -> list[NewsItem]:
    items = []
    async with httpx.AsyncClient(timeout=15) as client:
        responses = await asyncio.gather(*[client.get(url) for url in feeds], return_exceptions=True)
    for resp in responses:
        if isinstance(resp, Exception):
            continue
        feed = feedparser.parse(resp.text)
        for entry in feed.entries[:20]:
            items.append(NewsItem(
                title=entry.get("title", ""), content=entry.get("summary", ""),
                source=feed.feed.get("title", "RSS"), url=entry.get("link", ""),
                published=datetime.now(timezone.utc),
            ))
    return items

async def fetch_newsapi(query: str, api_key: str) -> list[NewsItem]:
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://newsapi.org/v2/everything",
            params={"q": query, "sortBy": "publishedAt", "pageSize": 50},
            headers={"X-Api-Key": api_key})
        data = resp.json()
    return [
        NewsItem(title=a["title"], content=a.get("description", ""),
                 source=a["source"]["name"], url=a["url"],
                 published=datetime.fromisoformat(a["publishedAt"].replace("Z", "+00:00")))
        for a in data.get("articles", [])
    ]

Deduplication

from difflib import SequenceMatcher

def deduplicate(items: list[NewsItem], threshold: float = 0.75) -> list[NewsItem]:
    unique, seen_titles = [], []
    for item in sorted(items, key=lambda x: x.published, reverse=True):
        is_dup = any(SequenceMatcher(None, item.title.lower(), s.lower()).ratio() > threshold for s in seen_titles)
        if not is_dup:
            unique.append(item)
            seen_titles.append(item.title)
    return unique

AI Classification & Summarization

import json
from openai import OpenAI

client = OpenAI()
CATEGORIES = ["geopolitics", "technology", "finance", "security", "climate", "health", "regulation", "market-move"]

def analyze_article(item: NewsItem) -> dict:
    response = client.chat.completions.create(
        model="gpt-4o-mini", response_format={"type": "json_object"},
        messages=[
            {"role": "system", "content": f"Analyze this news. Return JSON: {{category: one of {CATEGORIES}, severity: 1-10, summary: '2-3 sentences', entities: [], sentiment: 'positive|negative|neutral', actionable: bool}}"},
            {"role": "user", "content": f"Title: {item.title}\n\nContent: {item.content[:2000]}"},
        ],
    )
    analysis = json.loads(response.choices[0].message.content)
    return {**analysis, "id": item.id, "title": item.title, "url": item.url, "source": item.source}

Dashboard API (Node.js)

import express from "express";
import { WebSocketServer, WebSocket } from "ws";
import { createServer } from "http";

interface IntelItem {
  id: string; title: string; summary: string; category: string;
  severity: number; source: string; url: string; timestamp: string;
}

const app = express();
const server = createServer(app);
const wss = new WebSocketServer({ server });
let feed: IntelItem[] = [];
const clients = new Set<WebSocket>();

wss.on("connection", (ws) => {
  clients.add(ws);
  ws.send(JSON.stringify({ type: "init", items: feed.slice(0, 50) }));
  ws.on("close", () => clients.delete(ws));
});

function broadcast(item: IntelItem) {
  const msg = JSON.stringify({ type: "new", item });
  clients.forEach((ws) => { if (ws.readyState === WebSocket.OPEN) ws.send(msg); });
}

app.post("/api/ingest", express.json(), (req, res) => {
  const item: IntelItem = { ...req.body, timestamp: new Date().toISOString() };
  feed.unshift(item);
  feed = feed.slice(0, 1000);
  broadcast(item);
  res.json({ ok: true });
});

app.get("/api/feed", (req, res) => {
  let items = feed;
  const { category, minSeverity, limit } = req.query;
  if (category) items = items.filter((i) => i.category === category);
  if (minSeverity) items = items.filter((i) => i.severity >= Number(minSeverity));
  res.json(items.slice(0, Number(limit) || 50));
});

server.listen(3000, () => console.log("Intelligence dashboard on :3000"));

Alert Rules

import httpx

ALERT_RULES = [
    {"category": "security", "min_severity": 7, "channel": "slack"},
    {"category": "market-move", "min_severity": 8, "channel": "email"},
    {"category": "*", "min_severity": 9, "channel": "all"},
]

async def check_alerts(item: dict):
    for rule in ALERT_RULES:
        cat_match = rule["category"] == "*" or rule["category"] == item["category"]
        if cat_match and item["severity"] >= rule["min_severity"]:
            await send_alert(rule["channel"], item)

async def send_alert(channel: str, item: dict):
    msg = f"[{item['category'].upper()}] Severity {item['severity']}/10\n{item['title']}\n{item['summary']}"
    if channel in ("slack", "all"):
        await httpx.AsyncClient().post("https://hooks.slack.com/services/YOUR/WEBHOOK", json={"text": msg})

Examples

Example 1: Tech Industry News Monitor

RSS_FEEDS = [
    "https://feeds.arstechnica.com/arstechnica/index",
    "https://techcrunch.com/feed/",
    "https://www.theverge.com/rss/index.xml",
]

async def run_tech_monitor():
    items = await fetch_rss(RSS_FEEDS)
    unique = deduplicate(items)
    for item in unique[:20]:
        analysis = analyze_article(item)
        async with httpx.AsyncClient() as c:
            await c.post("http://localhost:3000/api/ingest", json=analysis)
        await check_alerts(analysis)
        # Output: {"category": "technology", "severity": 4, "summary": "Apple announced..."}

Example 2: Geopolitical Event Monitoring with Scheduled Polling

async def monitor_loop(interval_minutes: int = 15):
    """Run the full pipeline on a schedule."""
    while True:
        items = await fetch_rss(RSS_FEEDS)
        newsapi_items = await fetch_newsapi("geopolitics OR sanctions", NEWSAPI_KEY)
        all_items = items + newsapi_items
        unique = deduplicate(all_items)
        for item in unique:
            analysis = analyze_article(item)
            async with httpx.AsyncClient() as c:
                await c.post("http://localhost:3000/api/ingest", json=analysis)
            await check_alerts(analysis)
        await asyncio.sleep(interval_minutes * 60)
        # Runs every 15 min, deduplicates across sources, alerts on severity >= 7

Guidelines

  1. Dedup aggressively — The same story appears across 20+ outlets. Dedup by title similarity
  2. Batch AI calls — Process 10 articles per LLM call instead of 1 to save ~90% on API costs
  3. Severity calibration — Periodically review severity scores. LLMs tend to over-rate severity
  4. Source diversity — Mix mainstream, niche, and social sources for balanced coverage
  5. Rate limit respect — Cache RSS feeds for 15-30 min. Don't hammer free APIs
  6. Historical storage — Keep analyzed articles in a DB for trend analysis over time

Dependencies

pip install feedparser httpx openai     # Python pipeline
npm install express ws                   # Node.js dashboard