ohlcv-processing
Market data preparation including OHLCV resampling, gap handling, anomaly detection, normalization, and multi-source merging
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o ohlcv-processing.zip https://jpskill.com/download/10422.zip && unzip -o ohlcv-processing.zip && rm ohlcv-processing.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/10422.zip -OutFile "$d\ohlcv-processing.zip"; Expand-Archive "$d\ohlcv-processing.zip" -DestinationPath $d -Force; ri "$d\ohlcv-processing.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
ohlcv-processing.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
ohlcv-processingフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 1
📖 Claude が読む原文 SKILL.md(中身を展開)
この本文は AI(Claude)が読むための原文(英語または中国語)です。日本語訳は順次追加中。
OHLCV Processing — Market Data Preparation
Clean, consistent OHLCV data is the foundation of every trading analysis. Garbage in, garbage out — a single anomalous candle can trigger false signals, corrupt indicator calculations, and produce misleading backtest results. This skill covers the full data preparation pipeline: validation, cleaning, resampling, normalization, and multi-source merging.
Why this matters: Crypto OHLCV data is messier than traditional markets. 24/7 trading means no official close, DEX aggregators disagree on prices, low-liquidity tokens produce impossible candles, and API outages create gaps. Every analysis workflow should start with this pipeline.
Quick Start
1. Install Dependencies
uv pip install pandas numpy httpx
2. Standard OHLCV DataFrame Format
All processing functions expect this canonical format:
import pandas as pd
# Canonical OHLCV DataFrame
# - DatetimeIndex in UTC
# - Columns: open, high, low, close, volume (lowercase)
# - Sorted ascending by timestamp
# - No duplicate timestamps
df = pd.DataFrame({
"open": [1.10, 1.12, 1.11],
"high": [1.15, 1.14, 1.13],
"low": [1.08, 1.10, 1.09],
"close": [1.12, 1.11, 1.12],
"volume": [50000, 48000, 52000],
}, index=pd.to_datetime([
"2025-01-01 00:00:00",
"2025-01-01 00:01:00",
"2025-01-01 00:02:00",
], utc=True))
df.index.name = "timestamp"
3. Full Processing Pipeline
import pandas as pd
import numpy as np
def process_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
"""Run complete OHLCV processing pipeline."""
df = standardize_columns(df)
df = validate_ohlcv(df)
df = handle_gaps(df, method="ffill")
df = detect_and_flag_anomalies(df)
return df
Data Validation
Column Checks
REQUIRED_COLUMNS = {"open", "high", "low", "close", "volume"}
def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Normalize column names to lowercase standard."""
df.columns = df.columns.str.lower().str.strip()
# Common renames
rename_map = {"vol": "volume", "v": "volume", "o": "open",
"h": "high", "l": "low", "c": "close"}
df = df.rename(columns=rename_map)
missing = REQUIRED_COLUMNS - set(df.columns)
if missing:
raise ValueError(f"Missing columns: {missing}")
return df[["open", "high", "low", "close", "volume"]]
Structural Validation
def validate_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
"""Validate OHLCV structural integrity."""
# Ensure DatetimeIndex in UTC
if not isinstance(df.index, pd.DatetimeIndex):
df.index = pd.to_datetime(df.index, utc=True)
if df.index.tz is None:
df.index = df.index.tz_localize("UTC")
# Sort and deduplicate
df = df.sort_index()
dupes = df.index.duplicated(keep="last")
if dupes.any():
print(f"Warning: Removed {dupes.sum()} duplicate timestamps")
df = df[~dupes]
# Type enforcement
for col in ["open", "high", "low", "close", "volume"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
return df
Impossible Candle Detection
def find_impossible_candles(df: pd.DataFrame) -> pd.DataFrame:
"""Find candles that violate OHLC constraints."""
issues = pd.DataFrame(index=df.index)
issues["high_lt_low"] = df["high"] < df["low"]
issues["high_lt_open"] = df["high"] < df["open"]
issues["high_lt_close"] = df["high"] < df["close"]
issues["low_gt_open"] = df["low"] > df["open"]
issues["low_gt_close"] = df["low"] > df["close"]
issues["negative_price"] = (df[["open", "high", "low", "close"]] < 0).any(axis=1)
issues["negative_volume"] = df["volume"] < 0
issues["any_issue"] = issues.any(axis=1)
return issues[issues["any_issue"]]
Gap Handling
Crypto trades 24/7, but gaps still occur from API outages, low liquidity, or aggregator downtime.
Detect Gaps
def detect_gaps(df: pd.DataFrame, expected_freq: str = "1min") -> pd.Series:
"""Find missing timestamps based on expected frequency."""
full_index = pd.date_range(
start=df.index.min(), end=df.index.max(), freq=expected_freq, tz="UTC"
)
missing = full_index.difference(df.index)
return missing
Fill Gaps
def handle_gaps(
df: pd.DataFrame,
freq: str = "1min",
method: str = "ffill",
max_gap: int = 5,
) -> pd.DataFrame:
"""Fill gaps in OHLCV data.
Args:
df: OHLCV DataFrame with DatetimeIndex.
freq: Expected bar frequency.
method: 'ffill' (forward fill) or 'interpolate'.
max_gap: Maximum consecutive bars to fill. Larger gaps are left as NaN.
"""
full_index = pd.date_range(
start=df.index.min(), end=df.index.max(), freq=freq, tz="UTC"
)
df = df.reindex(full_index)
df.index.name = "timestamp"
# Mark which bars were filled
df["is_filled"] = df["close"].isna()
if method == "ffill":
# Forward fill OHLC (flat candle), zero volume
df[["open", "high", "low", "close"]] = (
df[["open", "high", "low", "close"]].ffill(limit=max_gap)
)
df["volume"] = df["volume"].fillna(0)
elif method == "interpolate":
df[["open", "high", "low", "close"]] = (
df[["open", "high", "low", "close"]].interpolate(
method="time", limit=max_gap
)
)
df["volume"] = df["volume"].fillna(0)
return df
Anomaly Detection
See references/data_quality.md for the complete anomaly taxonomy.
Price Spike Detection
def detect_price_spikes(
df: pd.DataFrame, window: int = 20, threshold: float = 3.0
) -> pd.Series:
"""Flag bars where return exceeds threshold * rolling std."""
returns = df["close"].pct_change()
rolling_std = returns.rolling(window, min_periods=5).std()
spike = returns.abs() > (threshold * rolling_std)
return spike.fillna(False)
Zero Volume Detection
def detect_zero_volume(df: pd.DataFrame, min_volume: float = 0) -> pd.Series:
"""Flag bars with zero or below-minimum volume."""
return df["volume"] <= min_volume
Composite Anomaly Flagging
def flag_anomalies(df: pd.DataFrame) -> pd.DataFrame:
"""Add anomaly flag columns to DataFrame."""
df["anomaly_spike"] = detect_price_spikes(df)
df["anomaly_zero_vol"] = detect_zero_volume(df)
impossible = find_impossible_candles(df)
df["anomaly_impossible"] = False
if not impossible.empty:
df.loc[impossible.index, "anomaly_impossible"] = True
df["anomaly_any"] = (
df["anomaly_spike"] | df["anomaly_zero_vol"] | df["anomaly_impossible"]
)
return df
Resampling
See references/resampling_guide.md for detailed guidance.
Standard Resample
OHLCV_RESAMPLE_RULES = {
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"volume": "sum",
}
def resample_ohlcv(df: pd.DataFrame, target_freq: str) -> pd.DataFrame:
"""Resample OHLCV to a coarser timeframe.
Args:
df: OHLCV DataFrame (must be finer than target_freq).
target_freq: Pandas frequency string ('5min', '15min', '1h', '4h', '1D').
Returns:
Resampled OHLCV DataFrame with no NaN rows.
"""
ohlcv_cols = ["open", "high", "low", "close", "volume"]
resampled = df[ohlcv_cols].resample(target_freq).agg(OHLCV_RESAMPLE_RULES)
return resampled.dropna(subset=["close"])
Common Timeframe Ladder
TIMEFRAME_LADDER = ["1min", "5min", "15min", "1h", "4h", "1D"]
def resample_ladder(df: pd.DataFrame) -> dict[str, pd.DataFrame]:
"""Resample 1-minute data to all standard timeframes."""
results = {"1min": df.copy()}
for tf in TIMEFRAME_LADDER[1:]:
results[tf] = resample_ohlcv(df, tf)
return results
VWAP Calculation
def compute_vwap(df: pd.DataFrame) -> pd.Series:
"""Compute cumulative VWAP over the DataFrame."""
typical_price = (df["high"] + df["low"] + df["close"]) / 3
cum_vol = df["volume"].cumsum()
cum_tp_vol = (typical_price * df["volume"]).cumsum()
return cum_tp_vol / cum_vol
Normalization
def normalize_prices(
df: pd.DataFrame, method: str = "returns"
) -> pd.DataFrame:
"""Normalize OHLCV price columns.
Methods:
'returns' — Percentage returns (close-to-close).
'log_returns' — Log returns.
'minmax' — Min-max scale to [0, 1].
'zscore' — Z-score normalization.
"""
price_cols = ["open", "high", "low", "close"]
result = df.copy()
if method == "returns":
for col in price_cols:
result[f"{col}_ret"] = result[col].pct_change()
elif method == "log_returns":
for col in price_cols:
result[f"{col}_logret"] = np.log(result[col] / result[col].shift(1))
elif method == "minmax":
for col in price_cols:
cmin, cmax = result[col].min(), result[col].max()
result[f"{col}_norm"] = (result[col] - cmin) / (cmax - cmin)
elif method == "zscore":
for col in price_cols:
result[f"{col}_z"] = (
(result[col] - result[col].mean()) / result[col].std()
)
return result
Multi-Source Merging
When combining data from multiple sources (e.g., Birdeye + DexScreener), timestamps may not align and prices may differ due to different DEX aggregation.
def merge_ohlcv_sources(
primary: pd.DataFrame,
secondary: pd.DataFrame,
tolerance: str = "30s",
) -> pd.DataFrame:
"""Merge two OHLCV sources, preferring the higher-volume source per bar.
Args:
primary: First OHLCV source.
secondary: Second OHLCV source.
tolerance: Maximum time difference for alignment.
"""
merged = pd.merge_asof(
primary.sort_index(),
secondary.sort_index(),
left_index=True, right_index=True,
tolerance=pd.Timedelta(tolerance),
suffixes=("_pri", "_sec"),
)
# Use higher-volume source per bar
use_secondary = merged["volume_sec"] > merged["volume_pri"]
for col in ["open", "high", "low", "close", "volume"]:
merged[col] = np.where(
use_secondary, merged[f"{col}_sec"], merged[f"{col}_pri"]
)
merged["source"] = np.where(use_secondary, "secondary", "primary")
return merged[["open", "high", "low", "close", "volume", "source"]]
Timezone Handling
Standard: Always store and process in UTC. Convert only for display.
def ensure_utc(df: pd.DataFrame) -> pd.DataFrame:
"""Ensure DatetimeIndex is UTC."""
if df.index.tz is None:
df.index = df.index.tz_localize("UTC")
elif str(df.index.tz) != "UTC":
df.index = df.index.tz_convert("UTC")
return df
Data Quality Report
def quality_report(df: pd.DataFrame) -> dict:
"""Generate a data quality summary."""
total = len(df)
return {
"total_bars": total,
"date_range": f"{df.index.min()} → {df.index.max()}",
"missing_values": int(df[["open", "high", "low", "close"]].isna().sum().sum()),
"zero_volume_bars": int((df["volume"] == 0).sum()),
"impossible_candles": int((df["high"] < df["low"]).sum()),
"duplicate_timestamps": int(df.index.duplicated().sum()),
"negative_prices": int((df[["open", "high", "low", "close"]] < 0).any(axis=1).sum()),
"completeness_pct": round((1 - df["close"].isna().mean()) * 100, 2),
}
Files
References
references/data_quality.md— Anomaly types, detection methods, correction strategies, crypto-specific data issuesreferences/resampling_guide.md— Resample rules, timeframe use cases, partial bar handling, VWAP resampling, multi-timeframe alignment
Scripts
scripts/process_ohlcv.py— Full processing pipeline: validate, clean, resample, normalize with anomaly reporting (run with--demofor synthetic data)scripts/merge_sources.py— Multi-source OHLCV merging with conflict resolution and discrepancy reporting (run with--demo)