jpskill.com
🛠️ 開発・MCP コミュニティ

Model Deployment

Deploy machine learning models to production using Flask, FastAPI, Docker, cloud platforms (AWS, GCP, Azure), and model serving frameworks

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o model-deployment.zip https://jpskill.com/download/21474.zip && unzip -o model-deployment.zip && rm model-deployment.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/21474.zip -OutFile "$d\model-deployment.zip"; Expand-Archive "$d\model-deployment.zip" -DestinationPath $d -Force; ri "$d\model-deployment.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して model-deployment.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → model-deployment フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
2

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

[Skill 名] モデルデプロイ

モデルデプロイ

概要

モデルデプロイとは、訓練済みの機械学習モデルを、API、ウェブサービス、またはバッチ処理システムを通じて本番環境で利用できるようにするプロセスです。

使用する場面

  • 訓練済みモデルを実世界の推論や予測のために本番環境に投入する場合
  • モデルサービングのためのREST APIやウェブサービスを構築する場合
  • 複数のユーザーやアプリケーションにサービスを提供するために予測をスケールする場合
  • モデルをクラウドプラットフォーム、エッジデバイス、またはコンテナにデプロイする場合
  • MLモデルの更新のためにCI/CDパイプラインを実装する場合
  • 大規模な予測のためのバッチ処理システムを作成する場合

デプロイのアプローチ

  • REST API: 同期推論のためのFlask、FastAPI
  • バッチ処理: 大規模な予測のためのスケジュールされたジョブ
  • リアルタイムストリーミング: 継続的なデータのためのKafka、Spark Streaming
  • サーバーレス: AWS Lambda、Google Cloud Functions
  • エッジデプロイ: エッジデバイスのためのTensorFlow Lite、ONNX
  • モデルサービング: TensorFlow Serving、Seldon Core、BentoML

主要な考慮事項

  • モデル形式: Pickle、SavedModel、ONNX、PMML
  • スケーラビリティ: ロードバランシング、オートスケーリング
  • レイテンシ: 応答時間の要件
  • モニタリング: モデルドリフト、パフォーマンスメトリクス
  • バージョニング: 本番環境での複数のモデルバージョン

Python実装

import numpy as np
import pandas as pd
import pickle
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import joblib

# FastAPI for REST API
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
import uvicorn

# For model serving
import mlflow.pyfunc
import mlflow.sklearn

# Docker and deployment
import logging
import time
from typing import List, Dict

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("=== 1. Train and Save Model ===")

# Create dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_scaled, y)

# Save model and preprocessing
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'

joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)

print(f"Model saved to {model_path}")
print(f"Scaler saved to {scaler_path}")

# 2. Model Serving Class
print("\n=== 2. Model Serving Class ===")

class ModelPredictor:
    def __init__(self, model_path, scaler_path):
        self.model = joblib.load(model_path)
        self.scaler = joblib.load(scaler_path)
        self.load_time = time.time()
        self.predictions_count = 0
        logger.info("Model loaded successfully")

    def predict(self, features: List[List[float]]) -> Dict:
        try:
            X = np.array(features)
            X_scaled = self.scaler.transform(X)
            predictions = self.model.predict(X_scaled)
            probabilities = self.model.predict_proba(X_scaled)

            self.predictions_count += len(X)

            return {
                'predictions': predictions.tolist(),
                'probabilities': probabilities.tolist(),
                'count': len(X),
                'timestamp': time.time()
            }
        except Exception as e:
            logger.error(f"Prediction error: {str(e)}")
            raise

    def health_check(self) -> Dict:
        return {
            'status': 'healthy',
            'uptime': time.time() - self.load_time,
            'predictions': self.predictions_count
        }

# Initialize predictor
predictor = ModelPredictor(model_path, scaler_path)

# 3. FastAPI Application
print("\n=== 3. FastAPI Application ===")

app = FastAPI(
    title="ML Model API",
    description="Production ML model serving API",
    version="1.0.0"
)

class PredictionRequest(BaseModel):
    features: List[List[float]] = Field(..., example=[[1.0, 2.0, 3.0]])

class PredictionResponse(BaseModel):
    predictions: List[int]
    probabilities: List[List[float]]
    count: int
    timestamp: float

class HealthResponse(BaseModel):
    status: str
    uptime: float
    predictions: int

@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Health check endpoint"""
    return predictor.health_check()

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Make predictions"""
    try:
        result = predictor.predict(request.features)
        return result
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/predict-batch")
async def predict_batch(requests: List[PredictionRequest], background_tasks: BackgroundTasks):
    """Batch prediction with background processing"""
    all_features = []
    for req in requests:
        all_features.extend(req.features)

    result = predictor.predict(all_features)
    background_tasks.add_task(logger.info, f"Batch prediction processed: {result['count']} samples")
    return result

@app.get("/stats")
async def get_stats():
    """Get model statistics"""
    return {
        'model_type': type(predictor.model).__name__,
        'n_estimators': predictor.model.n_estimators,
        'max_depth': predictor.model.max_depth,
        'feature_importance': predictor.model.feature_importances_.tolist(),
        'total_predictions': predictor.predictions_count
    }

# 4. Dockerfile template
print("\n=== 4. Dockerfile Template ===")

dockerfile_content = '''FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY model.pkl .
COPY scaler.pkl .
COPY app.py .

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''

print("Dockerfile content:")
print(dockerfile_content)

# 5. Requirements file
print("\n=== 5. Requiremen
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Model Deployment

Overview

Model deployment is the process of taking a trained machine learning model and making it available for production use through APIs, web services, or batch processing systems.

When to Use

  • When productionizing trained models for real-world inference and predictions
  • When building REST APIs or web services for model serving
  • When scaling predictions to serve multiple users or applications
  • When deploying models to cloud platforms, edge devices, or containers
  • When implementing CI/CD pipelines for ML model updates
  • When creating batch processing systems for large-scale predictions

Deployment Approaches

  • REST APIs: Flask, FastAPI for synchronous inference
  • Batch Processing: Scheduled jobs for large-scale predictions
  • Real-time Streaming: Kafka, Spark Streaming for continuous data
  • Serverless: AWS Lambda, Google Cloud Functions
  • Edge Deployment: TensorFlow Lite, ONNX for edge devices
  • Model Serving: TensorFlow Serving, Seldon Core, BentoML

Key Considerations

  • Model Format: Pickle, SavedModel, ONNX, PMML
  • Scalability: Load balancing, auto-scaling
  • Latency: Response time requirements
  • Monitoring: Model drift, performance metrics
  • Versioning: Multiple model versions in production

Python Implementation

import numpy as np
import pandas as pd
import pickle
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import joblib

# FastAPI for REST API
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
import uvicorn

# For model serving
import mlflow.pyfunc
import mlflow.sklearn

# Docker and deployment
import logging
import time
from typing import List, Dict

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("=== 1. Train and Save Model ===")

# Create dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_scaled, y)

# Save model and preprocessing
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'

joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)

print(f"Model saved to {model_path}")
print(f"Scaler saved to {scaler_path}")

# 2. Model Serving Class
print("\n=== 2. Model Serving Class ===")

class ModelPredictor:
    def __init__(self, model_path, scaler_path):
        self.model = joblib.load(model_path)
        self.scaler = joblib.load(scaler_path)
        self.load_time = time.time()
        self.predictions_count = 0
        logger.info("Model loaded successfully")

    def predict(self, features: List[List[float]]) -> Dict:
        try:
            X = np.array(features)
            X_scaled = self.scaler.transform(X)
            predictions = self.model.predict(X_scaled)
            probabilities = self.model.predict_proba(X_scaled)

            self.predictions_count += len(X)

            return {
                'predictions': predictions.tolist(),
                'probabilities': probabilities.tolist(),
                'count': len(X),
                'timestamp': time.time()
            }
        except Exception as e:
            logger.error(f"Prediction error: {str(e)}")
            raise

    def health_check(self) -> Dict:
        return {
            'status': 'healthy',
            'uptime': time.time() - self.load_time,
            'predictions': self.predictions_count
        }

# Initialize predictor
predictor = ModelPredictor(model_path, scaler_path)

# 3. FastAPI Application
print("\n=== 3. FastAPI Application ===")

app = FastAPI(
    title="ML Model API",
    description="Production ML model serving API",
    version="1.0.0"
)

class PredictionRequest(BaseModel):
    features: List[List[float]] = Field(..., example=[[1.0, 2.0, 3.0]])

class PredictionResponse(BaseModel):
    predictions: List[int]
    probabilities: List[List[float]]
    count: int
    timestamp: float

class HealthResponse(BaseModel):
    status: str
    uptime: float
    predictions: int

@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Health check endpoint"""
    return predictor.health_check()

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Make predictions"""
    try:
        result = predictor.predict(request.features)
        return result
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/predict-batch")
async def predict_batch(requests: List[PredictionRequest], background_tasks: BackgroundTasks):
    """Batch prediction with background processing"""
    all_features = []
    for req in requests:
        all_features.extend(req.features)

    result = predictor.predict(all_features)
    background_tasks.add_task(logger.info, f"Batch prediction processed: {result['count']} samples")
    return result

@app.get("/stats")
async def get_stats():
    """Get model statistics"""
    return {
        'model_type': type(predictor.model).__name__,
        'n_estimators': predictor.model.n_estimators,
        'max_depth': predictor.model.max_depth,
        'feature_importance': predictor.model.feature_importances_.tolist(),
        'total_predictions': predictor.predictions_count
    }

# 4. Dockerfile template
print("\n=== 4. Dockerfile Template ===")

dockerfile_content = '''FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY model.pkl .
COPY scaler.pkl .
COPY app.py .

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''

print("Dockerfile content:")
print(dockerfile_content)

# 5. Requirements file
print("\n=== 5. Requirements.txt ===")

requirements = """fastapi==0.104.1
uvicorn[standard]==0.24.0
numpy==1.24.0
pandas==2.1.0
scikit-learn==1.3.2
joblib==1.3.2
pydantic==2.5.0
mlflow==2.8.1
"""

print("Requirements:")
print(requirements)

# 6. Docker Compose for deployment
print("\n=== 6. Docker Compose Template ===")

docker_compose = '''version: '3.8'

services:
  ml-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - LOG_LEVEL=info
      - WORKERS=4
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 10s
      timeout: 5s
      retries: 3

  ml-monitor:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    command:
      - "--config.file=/etc/prometheus/prometheus.yml"

  ml-dashboard:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
'''

print("Docker Compose content:")
print(docker_compose)

# 7. Testing the API
print("\n=== 7. Testing the API ===")

def test_predictor():
    # Test single prediction
    test_features = [[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0,
                     1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1]]

    result = predictor.predict(test_features)
    print(f"Prediction result: {result}")

    # Health check
    health = predictor.health_check()
    print(f"Health status: {health}")

    # Batch predictions
    batch_features = [
        [1.0] * 20,
        [2.0] * 20,
        [3.0] * 20,
    ]
    batch_result = predictor.predict(batch_features)
    print(f"Batch prediction: {batch_result['count']} samples processed")

test_predictor()

# 8. Model versioning and registry
print("\n=== 8. Model Registry with MLflow ===")

# Log model to MLflow
with mlflow.start_run():
    mlflow.sklearn.log_model(model, "model")
    mlflow.log_param("max_depth", 10)
    mlflow.log_param("n_estimators", 100)
    mlflow.log_metric("accuracy", 0.95)

    model_uri = "runs:/" + mlflow.active_run().info.run_id + "/model"
    print(f"Model logged to MLflow: {model_uri}")

# 9. Deployment monitoring code
print("\n=== 9. Monitoring Setup ===")

class ModelMonitor:
    def __init__(self):
        self.predictions = []
        self.latencies = []

    def log_prediction(self, features, prediction, latency):
        self.predictions.append({
            'timestamp': time.time(),
            'features_mean': np.mean(features),
            'prediction': prediction,
            'latency_ms': latency * 1000
        })

    def check_model_drift(self):
        if len(self.predictions) < 100:
            return {'drift_detected': False}

        recent_predictions = [p['prediction'] for p in self.predictions[-100:]]
        historical_mean = np.mean([p['prediction'] for p in self.predictions[:-100]])
        recent_mean = np.mean(recent_predictions)

        drift = abs(recent_mean - historical_mean) > 0.1

        return {
            'drift_detected': drift,
            'historical_mean': float(historical_mean),
            'recent_mean': float(recent_mean),
            'threshold': 0.1
        }

    def get_stats(self):
        if not self.latencies:
            return {}

        return {
            'avg_latency_ms': np.mean(self.latencies) * 1000,
            'p95_latency_ms': np.percentile(self.latencies, 95) * 1000,
            'p99_latency_ms': np.percentile(self.latencies, 99) * 1000,
            'total_predictions': len(self.predictions)
        }

monitor = ModelMonitor()

print("\nDeployment setup completed!")
print("To run FastAPI server: uvicorn app:app --reload")

Deployment Checklist

  • Model format and serialization
  • Input/output validation
  • Error handling and logging
  • Authentication and security
  • Rate limiting and throttling
  • Health check endpoints
  • Monitoring and alerting
  • Version management
  • Rollback procedures

Cloud Deployment Options

  • AWS: SageMaker, Lambda, EC2
  • GCP: Vertex AI, Cloud Run, App Engine
  • Azure: Machine Learning, App Service
  • Kubernetes: Self-managed on-premises

Performance Optimization

  • Model quantization for smaller size
  • Caching predictions
  • Batch processing
  • GPU acceleration
  • Request pooling

Deliverables

  • Deployed model endpoint
  • API documentation
  • Docker configuration
  • Monitoring dashboard
  • Deployment guide
  • Performance benchmarks
  • Scaling recommendations

同梱ファイル

※ ZIPに含まれるファイル一覧。`SKILL.md` 本体に加え、参考資料・サンプル・スクリプトが入っている場合があります。