jpskill.com
🛠️ 開発・MCP コミュニティ

apscheduler

Advanced Python Scheduler - Task scheduling and job queue system

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o apscheduler.zip https://jpskill.com/download/17562.zip && unzip -o apscheduler.zip && rm apscheduler.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/17562.zip -OutFile "$d\apscheduler.zip"; Expand-Archive "$d\apscheduler.zip" -DestinationPath $d -Force; ri "$d\apscheduler.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して apscheduler.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → apscheduler フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

APScheduler

APScheduler は、Python アプリケーション向けの柔軟なタスクスケジューリングおよびジョブキューシステムです。cron-スタイル、間隔ベース、およびワンオフスケジューリングを含む、複数のスケジューリングメカニズムによる同期および非同期実行の両方をサポートしています。

クイックスタート

基本的な同期スケジューラ

from datetime import datetime
from apscheduler import Scheduler
from apscheduler.triggers.interval import IntervalTrigger

def tick():
    print(f"Tick: {datetime.now()}")

# メモリデータストアでスケジューラを作成して開始します
with Scheduler() as scheduler:
    scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
    scheduler.run_until_stopped()

FastAPI を使用した非同期スケジューラ

from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger

def cleanup_task():
    print("Running cleanup task...")

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler = AsyncScheduler()
    async with scheduler:
        await scheduler.add_schedule(
            cleanup_task,
            IntervalTrigger(hours=1),
            id="cleanup"
        )
        await scheduler.start_in_background()
        yield

app = FastAPI(lifespan=lifespan)

一般的なパターン

スケジューラ

インメモリ スケジューラ (開発):

from apscheduler import AsyncScheduler

async def main():
    async with AsyncScheduler() as scheduler:
        # 再起動時にジョブは失われます
        await scheduler.add_schedule(my_task, trigger)
        await scheduler.run_until_stopped()

永続スケジューラ (本番):

from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore

async def main():
    engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
    data_store = SQLAlchemyDataStore(engine)

    async with AsyncScheduler(data_store) as scheduler:
        # ジョブは再起動後も存続します
        await scheduler.add_schedule(my_task, trigger)
        await scheduler.run_until_stopped()

分散スケジューラ:

from apscheduler import AsyncScheduler, SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker

# スケジューラノード - スケジュールからジョブを作成します
async def scheduler_node():
    async with AsyncScheduler(
        data_store,
        event_broker,
        role=SchedulerRole.scheduler
    ) as scheduler:
        await scheduler.add_schedule(task, trigger)
        await scheduler.run_until_stopped()

# ワーカーノード - ジョブのみを実行します
async def worker_node():
    async with AsyncScheduler(
        data_store,
        event_broker,
        role=SchedulerRole.worker
    ) as scheduler:
        await scheduler.run_until_stopped()

ジョブ

単純な関数ジョブ:

def send_daily_report():
    generate_report()
    email_report("admin@example.com")

scheduler.add_schedule(
    send_daily_report,
    CronTrigger(hour=9, minute=0)  # 毎日午前9時
)

引数付きのジョブ:

def process_data(source: str, destination: str, batch_size: int):
    # データ処理ロジック
    pass

scheduler.add_schedule(
    process_data,
    IntervalTrigger(hours=1),
    kwargs={
        'source': 's3://incoming',
        'destination': 's3://processed',
        'batch_size': 1000
    }
)

非同期ジョブ:

async def fetch_external_api():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as resp:
            data = await resp.json()
            await save_to_database(data)

scheduler.add_schedule(
    fetch_external_api,
    IntervalTrigger(minutes=5)
)

トリガー

間隔トリガー:

from apscheduler.triggers.interval import IntervalTrigger

# 30秒ごと
IntervalTrigger(seconds=30)

# 2時間15分ごと
IntervalTrigger(hours=2, minutes=15)

# 3日ごと
IntervalTrigger(days=3)

Cron トリガー:

from apscheduler.triggers.cron import CronTrigger

# 月曜日から金曜日の午前9時
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')

# 15分ごと
CronTrigger(minute='*/15')

# 月の最終日の午前0時
CronTrigger(day='last', hour=0, minute=0)

# crontab 構文を使用する
CronTrigger.from_crontab('0 9 * * 1-5')  # 平日の午前9時

日付トリガー (ワンタイム):

from datetime import datetime, timedelta
from apscheduler.triggers.date import DateTrigger

# 今から5分後
run_time = datetime.now() + timedelta(minutes=5)
DateTrigger(run_time=run_time)

# 特定の日時
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))

カレンダー間隔:

from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger

# 毎月1日の午前9時
CalendarIntervalTrigger(months=1, hour=9, minute=0)

# 毎週月曜日の午前10時
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)

永続化

SQLite:

engine = create_async_engine("sqlite+aiosqlite:///scheduler.db")
data_store = SQLAlchemyDataStore(engine)

PostgreSQL:

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)

Redis (イベントブローカー):

from apscheduler.eventbrokers.redis import RedisEventBroker

event_broker = RedisEventBroker.from_url("redis://localhost:6379")

ジョブ管理

ジョブの結果を取得:


async def main():
    async with AsyncScheduler() as scheduler:
        await scheduler.start_in_background()

        # 結果保持付きでジョブを追加
        job_id = await scheduler.add_job(
            calculate_result,
            args=(10, 20),
            result_expiration_time=timedelta(hours=1)
        )



(原文がここで切り詰められています)
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

APScheduler

APScheduler is a flexible task scheduling and job queue system for Python applications. It supports both synchronous and asynchronous execution with multiple scheduling mechanisms including cron-style, interval-based, and one-off scheduling.

Quick Start

Basic Synchronous Scheduler

from datetime import datetime
from apscheduler import Scheduler
from apscheduler.triggers.interval import IntervalTrigger

def tick():
    print(f"Tick: {datetime.now()}")

# Create and start scheduler with memory datastore
with Scheduler() as scheduler:
    scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
    scheduler.run_until_stopped()

Async Scheduler with FastAPI

from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger

def cleanup_task():
    print("Running cleanup task...")

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler = AsyncScheduler()
    async with scheduler:
        await scheduler.add_schedule(
            cleanup_task,
            IntervalTrigger(hours=1),
            id="cleanup"
        )
        await scheduler.start_in_background()
        yield

app = FastAPI(lifespan=lifespan)

Common Patterns

Schedulers

In-memory scheduler (development):

from apscheduler import AsyncScheduler

async def main():
    async with AsyncScheduler() as scheduler:
        # Jobs lost on restart
        await scheduler.add_schedule(my_task, trigger)
        await scheduler.run_until_stopped()

Persistent scheduler (production):

from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore

async def main():
    engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
    data_store = SQLAlchemyDataStore(engine)

    async with AsyncScheduler(data_store) as scheduler:
        # Jobs survive restarts
        await scheduler.add_schedule(my_task, trigger)
        await scheduler.run_until_stopped()

Distributed scheduler:

from apscheduler import AsyncScheduler, SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker

# Scheduler node - creates jobs from schedules
async def scheduler_node():
    async with AsyncScheduler(
        data_store,
        event_broker,
        role=SchedulerRole.scheduler
    ) as scheduler:
        await scheduler.add_schedule(task, trigger)
        await scheduler.run_until_stopped()

# Worker node - executes jobs only
async def worker_node():
    async with AsyncScheduler(
        data_store,
        event_broker,
        role=SchedulerRole.worker
    ) as scheduler:
        await scheduler.run_until_stopped()

Jobs

Simple function jobs:

def send_daily_report():
    generate_report()
    email_report("admin@example.com")

scheduler.add_schedule(
    send_daily_report,
    CronTrigger(hour=9, minute=0)  # 9 AM daily
)

Jobs with arguments:

def process_data(source: str, destination: str, batch_size: int):
    # Data processing logic
    pass

scheduler.add_schedule(
    process_data,
    IntervalTrigger(hours=1),
    kwargs={
        'source': 's3://incoming',
        'destination': 's3://processed',
        'batch_size': 1000
    }
)

Async jobs:

async def fetch_external_api():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as resp:
            data = await resp.json()
            await save_to_database(data)

scheduler.add_schedule(
    fetch_external_api,
    IntervalTrigger(minutes=5)
)

Triggers

Interval trigger:

from apscheduler.triggers.interval import IntervalTrigger

# Every 30 seconds
IntervalTrigger(seconds=30)

# Every 2 hours and 15 minutes
IntervalTrigger(hours=2, minutes=15)

# Every 3 days
IntervalTrigger(days=3)

Cron trigger:

from apscheduler.triggers.cron import CronTrigger

# 9:00 AM Monday-Friday
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')

# Every 15 minutes
CronTrigger(minute='*/15')

# Last day of month at midnight
CronTrigger(day='last', hour=0, minute=0)

# Using crontab syntax
CronTrigger.from_crontab('0 9 * * 1-5')  # 9 AM weekdays

Date trigger (one-time):

from datetime import datetime, timedelta
from apscheduler.triggers.date import DateTrigger

# 5 minutes from now
run_time = datetime.now() + timedelta(minutes=5)
DateTrigger(run_time=run_time)

# Specific datetime
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))

Calendar interval:

from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger

# First day of every month at 9 AM
CalendarIntervalTrigger(months=1, hour=9, minute=0)

# Every Monday at 10 AM
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)

Persistence

SQLite:

engine = create_async_engine("sqlite+aiosqlite:///scheduler.db")
data_store = SQLAlchemyDataStore(engine)

PostgreSQL:

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)

Redis (event broker):

from apscheduler.eventbrokers.redis import RedisEventBroker

event_broker = RedisEventBroker.from_url("redis://localhost:6379")

Job Management

Get job results:

async def main():
    async with AsyncScheduler() as scheduler:
        await scheduler.start_in_background()

        # Add job with result retention
        job_id = await scheduler.add_job(
            calculate_result,
            args=(10, 20),
            result_expiration_time=timedelta(hours=1)
        )

        # Wait for result
        result = await scheduler.get_job_result(job_id, wait=True)
        print(f"Result: {result.return_value}")

Schedule management:

# Pause schedule
await scheduler.pause_schedule("my_schedule")

# Resume schedule
await scheduler.unpause_schedule("my_schedule")

# Remove schedule
await scheduler.remove_schedule("my_schedule")

# Get schedule info
schedule = await scheduler.get_schedule("my_schedule")
print(f"Next run: {schedule.next_fire_time}")

Event handling:

from apscheduler import JobAdded, JobReleased

def on_job_completed(event: JobReleased):
    if event.outcome == Outcome.success:
        print(f"Job {event.job_id} completed successfully")
    else:
        print(f"Job {event.job_id} failed: {event.exception}")

scheduler.subscribe(on_job_completed, JobReleased)

Configuration

Task defaults:

from apscheduler import TaskDefaults

task_defaults = TaskDefaults(
    job_executor='threadpool',
    max_running_jobs=3,
    misfire_grace_time=timedelta(minutes=5)
)

scheduler = AsyncScheduler(task_defaults=task_defaults)

Job execution options:

# Configure task behavior
await scheduler.configure_task(
    my_function,
    job_executor='processpool',
    max_running_jobs=5,
    misfire_grace_time=timedelta(minutes=10)
)

# Override per schedule
await scheduler.add_schedule(
    my_function,
    trigger,
    job_executor='threadpool',  # Override default
    coalesce=CoalescePolicy.latest
)

Requirements

# Core package
uv add apscheduler

# Database backends
uv add "apscheduler[postgresql]"  # PostgreSQL
uv add "apscheduler[mongodb]"     # MongoDB
uv add "apscheduler[sqlite]"      # SQLite

# Event brokers
uv add "apscheduler[redis]"       # Redis
uv add "apscheduler[mqtt]"        # MQTT

Dependencies by use case:

  • Basic scheduling: apscheduler
  • PostgreSQL persistence: asyncpg, sqlalchemy
  • Redis distributed: redis
  • MongoDB: motor
  • SQLite: aiosqlite

Best Practices

  1. Use persistent storage for production to survive restarts
  2. Set appropriate misfire_grace_time to handle system delays
  3. Use conflict policies when updating schedules
  4. Subscribe to events for monitoring and debugging
  5. Choose appropriate executors (threadpool for I/O, processpool for CPU)
  6. Implement error handling in job functions
  7. Use unique schedule IDs for management operations
  8. Set result expiration to prevent memory leaks