jpskill.com
💼 ビジネス コミュニティ

audit-logging

コンプライアンス要件を満たすために、誰がいつ何をしたかを追跡し、改ざん検知が可能な監査ログを実装し、規制要件に合わせたイベントログの保存を可能にするSkill。

📜 元の英語説明(参考)

Implement tamper-evident audit logs for compliance (SOC 2, HIPAA, PCI DSS). Use when building compliance audit trails, tracking who did what and when, or implementing immutable event logs that satisfy regulatory retention requirements.

🇯🇵 日本人クリエイター向け解説

一言でいうと

コンプライアンス要件を満たすために、誰がいつ何をしたかを追跡し、改ざん検知が可能な監査ログを実装し、規制要件に合わせたイベントログの保存を可能にするSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o audit-logging.zip https://jpskill.com/download/14657.zip && unzip -o audit-logging.zip && rm audit-logging.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/14657.zip -OutFile "$d\audit-logging.zip"; Expand-Archive "$d\audit-logging.zip" -DestinationPath $d -Force; ri "$d\audit-logging.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して audit-logging.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → audit-logging フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

監査ログ

概要

コンプライアンス監査ログは、が、いつどこからどのリソースに対して、を行い、どのような結果になったのかを答えなければなりません。また、改ざんの証拠となるものでなければなりません。監査人は、ログが修正されていないことを検証できなければなりません。

法規制による保持要件: | 基準 | 保持期間 | |----------|-----------| | HIPAA | 6年 | | PCI DSS | 12ヶ月オンライン + 12ヶ月アーカイブ | | SOC 2 (一般的) | 1年 | | GDPR | 目的によって定義 (多くの場合1〜3年) |

必須ログフィールド

interface AuditLogEntry {
  // コア必須フィールド
  id: string;             // UUID — 一意のイベント識別子
  timestamp: string;      // ISO 8601 UTC — イベントが発生した日時
  actor_id: string;       // アクションを実行したユーザー/サービス
  actor_type: 'user' | 'service' | 'admin' | 'system';
  action: string;         // 発生したこと: "read" | "write" | "delete" | "login" | "export"
  resource_type: string;  // "patient_record" | "invoice" | "user_account"
  resource_id: string;    // 影響を受けたリソースのID
  result: 'success' | 'failure' | 'denied';

  // コンテキストフィールド
  ip_address: string;     // ソースIP
  user_agent?: string;    // ブラウザ/クライアント識別子
  session_id?: string;    // セッション識別子

  // オプションフィールド
  changed_fields?: string[];    // 書き込み操作の場合: どのフィールドが変更されたか
  old_values?: Record<string, unknown>;  // 以前の値 (PIIに注意)
  reason?: string;              // 臨床上の正当性 (HIPAA)、ビジネス上の理由

  // 改ざんの証拠
  prev_hash?: string;     // 前のログエントリのハッシュ (ハッシュチェーン)
  hash: string;           // このエントリの SHA-256
}

ハッシュチェーンの実装

ハッシュチェーンは、ログの改ざんを検出可能にします。各エントリには、前のエントリのハッシュが含まれています。いずれかのエントリを変更すると、チェーンが壊れます。

import json
import hashlib
import uuid
from datetime import datetime, timezone
from typing import Optional

class AuditLogger:
    def __init__(self, db_connection):
        self.db = db_connection
        self._last_hash: Optional[str] = None

    def _compute_hash(self, entry: dict, prev_hash: Optional[str]) -> str:
        """Compute SHA-256 of the log entry for tamper-evidence."""
        hashable = {
            "id": entry["id"],
            "timestamp": entry["timestamp"],
            "actor_id": entry["actor_id"],
            "action": entry["action"],
            "resource_type": entry["resource_type"],
            "resource_id": entry["resource_id"],
            "result": entry["result"],
            "prev_hash": prev_hash or "GENESIS"
        }
        content = json.dumps(hashable, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()

    async def log(
        self,
        actor_id: str,
        actor_type: str,
        action: str,
        resource_type: str,
        resource_id: str,
        result: str,
        ip_address: str,
        **kwargs
    ) -> dict:
        """Create and persist a tamper-evident audit log entry."""
        # Get last hash for chaining
        prev_hash = self._last_hash or await self._get_last_hash()

        entry = {
            "id": str(uuid.uuid4()),
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "actor_id": actor_id,
            "actor_type": actor_type,
            "action": action,
            "resource_type": resource_type,
            "resource_id": resource_id,
            "result": result,
            "ip_address": ip_address,
            "prev_hash": prev_hash or "GENESIS",
            **kwargs
        }

        entry["hash"] = self._compute_hash(entry, prev_hash)
        self._last_hash = entry["hash"]

        # Persist to append-only storage
        await self.db.audit_logs.insert_one(entry)
        return entry

    async def verify_chain(self, limit: int = 1000) -> bool:
        """Verify the hash chain integrity."""
        entries = await self.db.audit_logs.find(
            sort=[("timestamp", 1)], 
            limit=limit
        )

        prev_hash = "GENESIS"
        for i, entry in enumerate(entries):
            expected_hash = self._compute_hash(entry, prev_hash)
            if entry["hash"] != expected_hash:
                print(f"❌ Chain broken at entry {i}: id={entry['id']}")
                return False
            if entry["prev_hash"] != prev_hash:
                print(f"❌ prev_hash mismatch at entry {i}")
                return False
            prev_hash = entry["hash"]

        print(f"✅ Chain verified: {len(entries)} entries intact")
        return True

    async def _get_last_hash(self) -> Optional[str]:
        last = await self.db.audit_logs.find_one(sort=[("timestamp", -1)])
        return last["hash"] if last else None

Express.js 監査ミドルウェア


const { v4: uuidv4 } = require('uuid');
const crypto = require('crypto');

class AuditLogger {
  constructor(db) {
    this.db = db;
  }

  async log({ actorId, actorType = 'user', action, resourceType, resourceId, 
               result, ipAddress, sessionId, reason, changedFields }) {
    const prevHash = await this.getLastHash();

    const entry = {
      id: uuidv4(),
      timestamp: new Date().toISOString(),
      actor_id: actorId,
      actor_type: actorType,
      action,
      resource_type: resourceType,
      resource_id: resourceId,
      result,
      ip_address: ipAddress,
      session_id: sessionId,
      reason,
      changed_fields: changedFields,
      prev_hash: prevHash || 'GENESIS',
    };

    entry.hash = this.computeHash(entry);
    await this.db.auditLogs.create(entry);
    return entry;
  }

  computeHash(entry) {
    const hashable = JSON.stringify({
      id: entry.id,
      timestamp: entry.timestamp,
      actor_id: entry.actor_id,
      action: entry.action,
      resource_type: entry.resource_type,
      resource_id: entry.resource_id,
      result:

(原文がここで切り詰められています)
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Audit Logging

Overview

Compliance audit logs must answer: who did what to which resource, when, from where, and with what result. They must also be tamper-evident — an auditor must be able to verify logs haven't been modified.

Regulatory retention requirements: | Standard | Retention | |----------|-----------| | HIPAA | 6 years | | PCI DSS | 12 months online + 12 months archive | | SOC 2 (typical) | 1 year | | GDPR | Defined by purpose (often 1-3 years) |

Required Log Fields

interface AuditLogEntry {
  // Core required fields
  id: string;             // UUID — unique event identifier
  timestamp: string;      // ISO 8601 UTC — when the event occurred
  actor_id: string;       // User/service that performed the action
  actor_type: 'user' | 'service' | 'admin' | 'system';
  action: string;         // What happened: "read" | "write" | "delete" | "login" | "export"
  resource_type: string;  // "patient_record" | "invoice" | "user_account"
  resource_id: string;    // ID of the affected resource
  result: 'success' | 'failure' | 'denied';

  // Context fields
  ip_address: string;     // Source IP
  user_agent?: string;    // Browser/client identifier
  session_id?: string;    // Session identifier

  // Optional fields
  changed_fields?: string[];    // For write operations: which fields changed
  old_values?: Record<string, unknown>;  // Previous values (be careful with PII)
  reason?: string;              // Clinical justification (HIPAA), business reason

  // Tamper-evidence
  prev_hash?: string;     // Hash of previous log entry (hash chaining)
  hash: string;           // SHA-256 of this entry
}

Hash Chain Implementation

Hash chaining makes log tampering detectable: each entry includes a hash of the previous entry. Modifying any entry breaks the chain.

import json
import hashlib
import uuid
from datetime import datetime, timezone
from typing import Optional

class AuditLogger:
    def __init__(self, db_connection):
        self.db = db_connection
        self._last_hash: Optional[str] = None

    def _compute_hash(self, entry: dict, prev_hash: Optional[str]) -> str:
        """Compute SHA-256 of the log entry for tamper-evidence."""
        hashable = {
            "id": entry["id"],
            "timestamp": entry["timestamp"],
            "actor_id": entry["actor_id"],
            "action": entry["action"],
            "resource_type": entry["resource_type"],
            "resource_id": entry["resource_id"],
            "result": entry["result"],
            "prev_hash": prev_hash or "GENESIS"
        }
        content = json.dumps(hashable, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()

    async def log(
        self,
        actor_id: str,
        actor_type: str,
        action: str,
        resource_type: str,
        resource_id: str,
        result: str,
        ip_address: str,
        **kwargs
    ) -> dict:
        """Create and persist a tamper-evident audit log entry."""
        # Get last hash for chaining
        prev_hash = self._last_hash or await self._get_last_hash()

        entry = {
            "id": str(uuid.uuid4()),
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "actor_id": actor_id,
            "actor_type": actor_type,
            "action": action,
            "resource_type": resource_type,
            "resource_id": resource_id,
            "result": result,
            "ip_address": ip_address,
            "prev_hash": prev_hash or "GENESIS",
            **kwargs
        }

        entry["hash"] = self._compute_hash(entry, prev_hash)
        self._last_hash = entry["hash"]

        # Persist to append-only storage
        await self.db.audit_logs.insert_one(entry)
        return entry

    async def verify_chain(self, limit: int = 1000) -> bool:
        """Verify the hash chain integrity."""
        entries = await self.db.audit_logs.find(
            sort=[("timestamp", 1)], 
            limit=limit
        )

        prev_hash = "GENESIS"
        for i, entry in enumerate(entries):
            expected_hash = self._compute_hash(entry, prev_hash)
            if entry["hash"] != expected_hash:
                print(f"❌ Chain broken at entry {i}: id={entry['id']}")
                return False
            if entry["prev_hash"] != prev_hash:
                print(f"❌ prev_hash mismatch at entry {i}")
                return False
            prev_hash = entry["hash"]

        print(f"✅ Chain verified: {len(entries)} entries intact")
        return True

    async def _get_last_hash(self) -> Optional[str]:
        last = await self.db.audit_logs.find_one(sort=[("timestamp", -1)])
        return last["hash"] if last else None

Express.js Audit Middleware

const { v4: uuidv4 } = require('uuid');
const crypto = require('crypto');

class AuditLogger {
  constructor(db) {
    this.db = db;
  }

  async log({ actorId, actorType = 'user', action, resourceType, resourceId, 
               result, ipAddress, sessionId, reason, changedFields }) {
    const prevHash = await this.getLastHash();

    const entry = {
      id: uuidv4(),
      timestamp: new Date().toISOString(),
      actor_id: actorId,
      actor_type: actorType,
      action,
      resource_type: resourceType,
      resource_id: resourceId,
      result,
      ip_address: ipAddress,
      session_id: sessionId,
      reason,
      changed_fields: changedFields,
      prev_hash: prevHash || 'GENESIS',
    };

    entry.hash = this.computeHash(entry);
    await this.db.auditLogs.create(entry);
    return entry;
  }

  computeHash(entry) {
    const hashable = JSON.stringify({
      id: entry.id,
      timestamp: entry.timestamp,
      actor_id: entry.actor_id,
      action: entry.action,
      resource_type: entry.resource_type,
      resource_id: entry.resource_id,
      result: entry.result,
      prev_hash: entry.prev_hash,
    });
    return crypto.createHash('sha256').update(hashable).digest('hex');
  }

  async getLastHash() {
    const last = await this.db.auditLogs.findOne({ order: [['timestamp', 'DESC']] });
    return last?.hash || null;
  }
}

// Express middleware — auto-log all requests
const createAuditMiddleware = (auditLogger) => async (req, res, next) => {
  const start = Date.now();

  // Capture response
  const originalSend = res.send;
  res.send = function(body) {
    res.send = originalSend;

    // Log after response sent
    setImmediate(async () => {
      try {
        await auditLogger.log({
          actorId: req.user?.id || 'anonymous',
          actorType: req.user ? 'user' : 'anonymous',
          action: `${req.method.toLowerCase()}_${req.route?.path || req.path}`,
          resourceType: req.params.resourceType || 'api_request',
          resourceId: req.params.id || req.path,
          result: res.statusCode < 400 ? 'success' : 
                  res.statusCode === 403 ? 'denied' : 'failure',
          ipAddress: req.ip,
          sessionId: req.session?.id,
          duration_ms: Date.now() - start,
        });
      } catch (err) {
        console.error('Audit log failed:', err);
        // Never let audit logging errors break the main flow
      }
    });

    return originalSend.apply(this, arguments);
  };

  next();
};

PostgreSQL Append-Only Table

-- Append-only audit log table
-- The CHECK constraint and trigger prevent UPDATE/DELETE
CREATE TABLE audit_logs (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  actor_id TEXT NOT NULL,
  actor_type TEXT NOT NULL CHECK (actor_type IN ('user', 'service', 'admin', 'system')),
  action TEXT NOT NULL,
  resource_type TEXT NOT NULL,
  resource_id TEXT NOT NULL,
  result TEXT NOT NULL CHECK (result IN ('success', 'failure', 'denied')),
  ip_address INET,
  session_id TEXT,
  reason TEXT,
  changed_fields TEXT[],
  prev_hash TEXT,
  hash TEXT NOT NULL UNIQUE
);

-- Create append-only trigger — prevent modification
CREATE OR REPLACE FUNCTION prevent_audit_modification()
RETURNS TRIGGER AS $$
BEGIN
  RAISE EXCEPTION 'Audit logs are immutable — modification not allowed';
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER audit_logs_immutable
BEFORE UPDATE OR DELETE ON audit_logs
FOR EACH ROW EXECUTE FUNCTION prevent_audit_modification();

-- Indexes for common queries
CREATE INDEX idx_audit_actor_id ON audit_logs(actor_id);
CREATE INDEX idx_audit_resource ON audit_logs(resource_type, resource_id);
CREATE INDEX idx_audit_timestamp ON audit_logs(timestamp DESC);
CREATE INDEX idx_audit_action ON audit_logs(action);

-- Query: All actions by a user in the last 30 days
SELECT timestamp, action, resource_type, resource_id, result, ip_address
FROM audit_logs
WHERE actor_id = 'user-123'
  AND timestamp > NOW() - INTERVAL '30 days'
ORDER BY timestamp DESC;

-- Query: All access to a specific patient record
SELECT timestamp, actor_id, action, result, ip_address, reason
FROM audit_logs
WHERE resource_type = 'patient_record'
  AND resource_id = 'patient-456'
ORDER BY timestamp DESC;

-- Query: Failed access attempts (security monitoring)
SELECT actor_id, COUNT(*) as failed_attempts, MAX(timestamp) as last_attempt
FROM audit_logs
WHERE result IN ('failure', 'denied')
  AND timestamp > NOW() - INTERVAL '24 hours'
GROUP BY actor_id
HAVING COUNT(*) > 5
ORDER BY failed_attempts DESC;

AWS CloudTrail + Immutable S3

# Create CloudTrail that logs to immutable S3 bucket
aws cloudtrail create-trail \
  --name compliance-audit-trail \
  --s3-bucket-name my-audit-logs-bucket \
  --include-global-service-events \
  --is-multi-region-trail \
  --enable-log-file-validation  # Cryptographic log integrity

# Create S3 bucket with Object Lock (WORM - Write Once Read Many)
aws s3api create-bucket \
  --bucket my-audit-logs-bucket \
  --object-lock-enabled-for-bucket

# Set default retention: compliance mode (cannot delete even by admin)
aws s3api put-object-lock-configuration \
  --bucket my-audit-logs-bucket \
  --object-lock-configuration '{
    "ObjectLockEnabled": "Enabled",
    "Rule": {
      "DefaultRetention": {
        "Mode": "COMPLIANCE",
        "Years": 6
      }
    }
  }'

# Verify log file integrity (CloudTrail)
aws cloudtrail validate-logs \
  --trail-arn arn:aws:cloudtrail:us-east-1:123456789:trail/compliance-audit-trail \
  --start-time 2024-01-01T00:00:00Z

FastAPI Audit Middleware (Python)

from fastapi import FastAPI, Request
import time

app = FastAPI()

@app.middleware("http")
async def audit_middleware(request: Request, call_next):
    start_time = time.time()

    # Extract context before processing
    actor_id = None
    if hasattr(request.state, 'user'):
        actor_id = request.state.user.id

    response = await call_next(request)
    duration_ms = int((time.time() - start_time) * 1000)

    # Determine result from status code
    if response.status_code < 400:
        result = "success"
    elif response.status_code == 403:
        result = "denied"
    else:
        result = "failure"

    # Log asynchronously to avoid blocking response
    import asyncio
    asyncio.create_task(audit_logger.log(
        actor_id=actor_id or "anonymous",
        actor_type="user" if actor_id else "anonymous",
        action=f"{request.method.lower()}:{request.url.path}",
        resource_type="api_endpoint",
        resource_id=str(request.url.path),
        result=result,
        ip_address=request.client.host,
        duration_ms=duration_ms,
        http_status=response.status_code,
    ))

    return response

Compliance Checklist

  • [ ] All required fields logged (actor, action, resource, timestamp, IP, result)
  • [ ] Tamper-evidence implemented (hash chaining or digital signatures)
  • [ ] Append-only storage (trigger prevents UPDATE/DELETE, or S3 Object Lock)
  • [ ] Log aggregation centralized (avoid local file logs that can be deleted)
  • [ ] Retention policy matches regulation (HIPAA: 6yr, PCI: 1yr+1yr archive)
  • [ ] Alerts for anomalies (failed logins, access spikes, unusual hours)
  • [ ] Logs reviewed periodically (monthly for SOC 2, daily for high-security)
  • [ ] Log access itself is audited (who viewed audit logs)
  • [ ] Backup of audit logs separate from primary storage
  • [ ] WORM storage for regulatory compliance archives