jpskill.com
🛠️ 開発・MCP コミュニティ 🔴 エンジニア向け 👤 エンジニア・AI開発者

🛠️ Redisパターン集

redis-patterns

高速なデータ処理システム「Redis」を使い、データの

⏱ コードレビュー 1時間 → 10分

📺 まず動画で見る(YouTube)

▶ 【衝撃】最強のAIエージェント「Claude Code」の最新機能・使い方・プログラミングをAIで効率化する超実践術を解説! ↗

※ jpskill.com 編集部が参考用に選んだ動画です。動画の内容と Skill の挙動は厳密には一致しないことがあります。

📜 元の英語説明(参考)

Redis data structure patterns, caching strategies, distributed locks, rate limiting, pub/sub, and connection management for production applications.

🇯🇵 日本人クリエイター向け解説

一言でいうと

高速なデータ処理システム「Redis」を使い、データの

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o redis-patterns.zip https://jpskill.com/download/1023.zip && unzip -o redis-patterns.zip && rm redis-patterns.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/1023.zip -OutFile "$d\redis-patterns.zip"; Expand-Archive "$d\redis-patterns.zip" -DestinationPath $d -Force; ri "$d\redis-patterns.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して redis-patterns.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → redis-patterns フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-17
取得日時
2026-05-17
同梱ファイル
1

💬 こう話しかけるだけ — サンプルプロンプト

  • Redis Patterns を使って、最小構成のサンプルコードを示して
  • Redis Patterns の主な使い方と注意点を教えて
  • Redis Patterns を既存プロジェクトに組み込む方法を教えて

これをClaude Code に貼るだけで、このSkillが自動発動します。

📖 Claude が読む原文 SKILL.md(中身を展開)

この本文は AI(Claude)が読むための原文(英語または中国語)です。日本語訳は順次追加中。

Redis Patterns

Quick reference for Redis best practices across common backend use cases.

How It Works

Redis is an in-memory data structure store that supports strings, hashes, lists, sets, sorted sets, streams, and more. Individual Redis commands are atomic on a single instance; multi-step workflows require Lua scripts, MULTI/EXEC transactions, or explicit synchronization to stay atomic. Data is optionally persisted via RDB snapshots or AOF logs. Clients communicate over TCP using the RESP protocol; connection pools are essential to avoid per-request handshake overhead.

When to Activate

  • Adding caching to an application
  • Implementing rate limiting or throttling
  • Building distributed locks or coordination
  • Setting up session or token storage
  • Using Pub/Sub or Redis Streams for messaging
  • Configuring Redis in production (pooling, eviction, clustering)

Data Structure Cheat Sheet

Use Case Structure Example Key
Simple cache String product:123
User session Hash session:abc
Leaderboard Sorted Set scores:weekly
Unique visitors Set visitors:2024-01-01
Activity feed List feed:user:456
Event stream Stream events:orders
Counters / rate limits String (INCR) ratelimit:user:123
Bloom filter / HLL HyperLogLog hll:pageviews

Core Patterns

Cache-Aside (Lazy Loading)

import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_product(product_id: int):
    cache_key = f"product:{product_id}"
    cached = r.get(cache_key)

    if cached:
        return json.loads(cached)

    product = db.query("SELECT * FROM products WHERE id = %s", product_id)
    r.setex(cache_key, 3600, json.dumps(product))  # TTL: 1 hour
    return product

Write-Through Cache

def update_product(product_id: int, data: dict):
    # Write to DB first
    db.execute("UPDATE products SET ... WHERE id = %s", product_id)

    # Immediately update cache
    cache_key = f"product:{product_id}"
    r.setex(cache_key, 3600, json.dumps(data))

Cache Invalidation

# Tag-based invalidation — group related keys under a set
def cache_product(product_id: int, category_id: int, data: dict):
    key = f"product:{product_id}"
    tag = f"tag:category:{category_id}"
    pipe = r.pipeline(transaction=True)
    pipe.setex(key, 3600, json.dumps(data))
    pipe.sadd(tag, key)
    pipe.expire(tag, 3600)
    pipe.execute()

def invalidate_category(category_id: int):
    tag = f"tag:category:{category_id}"
    keys = r.smembers(tag)
    if keys:
        r.delete(*keys)
    r.delete(tag)

Session Storage

import time
import uuid

def create_session(user_id: int, ttl: int = 86400) -> str:
    session_id = str(uuid.uuid4())
    key = f"session:{session_id}"
    pipe = r.pipeline(transaction=True)
    pipe.hset(key, mapping={
        "user_id": user_id,
        "created_at": int(time.time()),
    })
    pipe.expire(key, ttl)
    pipe.execute()
    return session_id

def get_session(session_id: str) -> dict | None:
    data = r.hgetall(f"session:{session_id}")
    return data if data else None

def delete_session(session_id: str):
    r.delete(f"session:{session_id}")

Rate Limiting

Fixed Window (Simple)

def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:
    key = f"ratelimit:{user_id}:{int(time.time()) // window}"
    pipe = r.pipeline(transaction=True)
    pipe.incr(key)
    pipe.expire(key, window)
    count, _ = pipe.execute()
    return count > limit

Sliding Window (Lua — Atomic)

-- sliding_window.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])

redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)

if count < limit then
    -- Use unique member (now + sequence) to avoid collisions within the same millisecond
    local seq_key = key .. ':seq'
    local seq = redis.call('INCR', seq_key)
    redis.call('EXPIRE', seq_key, math.ceil(window / 1000))
    redis.call('ZADD', key, now, now .. '-' .. seq)
    redis.call('EXPIRE', key, math.ceil(window / 1000))
    return 1
end
return 0
sliding_window = r.register_script(open('sliding_window.lua').read())

def allow_request(user_id: int) -> bool:
    key = f"ratelimit:sliding:{user_id}"
    now = int(time.time() * 1000)
    return bool(sliding_window(keys=[key], args=[now, 60000, 100]))

Distributed Locks

Distributed Lock (Single Node — SET NX PX)

import uuid

def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None:
    lock_key = f"lock:{resource}"
    token = str(uuid.uuid4())
    acquired = r.set(lock_key, token, px=ttl_ms, nx=True)
    return token if acquired else None

def release_lock(resource: str, token: str) -> bool:
    release_script = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    else
        return 0
    end
    """
    result = r.eval(release_script, 1, f"lock:{resource}", token)
    return bool(result)

# Usage
token = acquire_lock("order:payment:123")
if token:
    try:
        process_payment()
    finally:
        release_lock("order:payment:123", token)

For multi-node setups use the redlock-py library which implements the full Redlock algorithm.

Pub/Sub & Streams

Pub/Sub (Fire-and-Forget)

# Publisher
def publish_event(channel: str, payload: dict):
    r.publish(channel, json.dumps(payload))

# Subscriber (blocking — run in separate thread/process)
def subscribe_events(channel: str):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    for message in pubsub.listen():
        if message['type'] == 'message':
            handle(json.loads(message['data']))

Redis Streams (Durable Queue)

# Producer
def emit(stream: str, event: dict):
    r.xadd(stream, event, maxlen=10000)  # Cap stream length

# Consumer group — guarantees at-least-once delivery
try:
    r.xgroup_create('events:orders', 'processor', id='0', mkstream=True)
except Exception:
    pass  # Group already exists

def consume(stream: str, group: str, consumer: str):
    while True:
        messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000)
        for _, entries in (messages or []):
            for msg_id, data in entries:
                process(data)
                r.xack(stream, group, msg_id)

Prefer Streams over Pub/Sub when you need delivery guarantees, consumer groups, or replay.

Key Design

Naming Conventions

# Pattern: resource:id:field
user:123:profile
order:456:status
cache:product:789

# Pattern: namespace:resource:id
myapp:session:abc123
myapp:ratelimit:user:123

# Pattern: resource:date (time-bound keys)
stats:pageviews:2024-01-01

TTL Strategy

Data Type Suggested TTL
User session 24h (86400)
API response cache 5–15 min
Rate limit window Match window size
Short-lived tokens 5–10 min
Leaderboard 1h–24h
Static/reference data 1h–1 week

Always set a TTL. Keys without TTL accumulate indefinitely and cause memory pressure.

Connection Management

Connection Pooling

from redis import ConnectionPool, Redis

pool = ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=20,
    decode_responses=True,
    socket_connect_timeout=2,
    socket_timeout=2,
)

r = Redis(connection_pool=pool)

Cluster Mode

from redis.cluster import RedisCluster

r = RedisCluster(
    startup_nodes=[{"host": "redis-1", "port": 6379}],
    decode_responses=True,
    skip_full_coverage_check=True,
)

Sentinel (High Availability)

from redis.sentinel import Sentinel

sentinel = Sentinel(
    [('sentinel-1', 26379), ('sentinel-2', 26379)],
    socket_timeout=0.5,
)
master = sentinel.master_for('mymaster', decode_responses=True)
replica = sentinel.slave_for('mymaster', decode_responses=True)

Eviction Policies

Policy Behavior Best For
noeviction Error on write when full Queues / critical data
allkeys-lru Evict least recently used General cache
volatile-lru LRU only among keys with TTL Mixed data store
allkeys-lfu Evict least frequently used Skewed access patterns
volatile-ttl Evict soonest-to-expire Prioritize long-lived data

Set via redis.conf: maxmemory-policy allkeys-lru

Anti-Patterns

Anti-Pattern Problem Fix
Keys with no TTL Memory grows unbounded Always set TTL
KEYS * in production Blocks the server (O(N)) Use SCAN cursor
Storing large blobs (>100KB) Slow serialization, memory pressure Store reference + fetch from object store
Single Redis for everything No isolation between cache & queue Use separate DBs or instances
Ignoring connection pool limits Connection exhaustion under load Size pool to workload
Not handling cache miss stampede Thundering herd on cold start Use locks or probabilistic early expiry
FLUSHALL without thought Wipes entire instance Scope deletes by key pattern

Cache Miss Stampede Prevention

import threading

_locks: dict[str, threading.Lock] = {}
_locks_mutex = threading.Lock()

def get_with_lock(key: str, fetch_fn, ttl: int = 300):
    cached = r.get(key)
    if cached:
        return json.loads(cached)

    with _locks_mutex:
        if key not in _locks:
            _locks[key] = threading.Lock()
        lock = _locks[key]
    with lock:
        cached = r.get(key)  # Re-check after acquiring lock
        if cached:
            return json.loads(cached)
        value = fetch_fn()
        r.setex(key, ttl, json.dumps(value))
        return value

Note: for multi-process deployments, replace the in-process lock with acquire_lock/release_lock from the Distributed Locks section above.

Examples

Add caching to a Django/Flask API endpoint: Use cache-aside with setex and a 5-minute TTL on the response. Key on the request parameters.

Rate-limit an API by user: Use fixed-window with pipeline(transaction=True) for low-traffic endpoints; use sliding-window Lua for accurate per-user throttling.

Coordinate a background job across workers: Use acquire_lock with a TTL that exceeds the expected job duration. Always release in a finally block.

Fan-out notifications to multiple subscribers: Use Pub/Sub for fire-and-forget. Switch to Streams if you need guaranteed delivery or replay for late consumers.

Quick Reference

Pattern When to Use
Cache-aside Read-heavy, tolerate slight staleness
Write-through Strong consistency required
Distributed lock Prevent concurrent access to a resource
Sliding window rate limit Accurate per-user throttling
Redis Streams Durable event queue with consumer groups
Pub/Sub Broadcast with no delivery guarantees needed
Sorted Set leaderboard Ranked scoring, pagination
HyperLogLog Approximate unique count at low memory

Related

  • Skill: postgres-patterns — relational data patterns
  • Skill: backend-patterns — API and service layer patterns
  • Skill: database-migrations — schema versioning
  • Skill: django-patterns — Django cache framework integration
  • Agent: database-reviewer — full database review workflow