PythonとChatGPTで自己流株価分析

今回は、ChatGPTとPythonを使った株価分析に挑戦してみました。

私は日経先物を今年の3月頃から始めました。
とはいえ、途中で約7か月の空白期間があり、実質は1か月前に再スタートしたようなものです。

正直に言うと、私は統計の知識も投資の知識もほとんどありません
ただし、直感にはそれなりに自信があり、プログラミングは得意です。
それなら「自分なりのやり方で分析してみよう」と思ったのが今回のきっかけです。

値動きには「癖」がある

日経先物を触っていて、以前から強く感じていたことがあります。

それは、値動きが完全なランダムウォークではないということです。

雰囲気や癖のようなものが確かに存在し、

  • 上り調子のときは上がりやすい
  • 裏をかこうとすると、さらに裏をかかれる
  • 大きく振れる局面では全体的に下がりやすい
  • 日中と夜間で動き方が明らかに違う

といった特徴があるように感じていました。

「流れに逆らってはいけない」という言葉がありますが、
まるで大口という先導者がいて、その後ろについていけば勝てるような感覚です。

うまく言葉にできないのですが、
とにかく「何かしらの癖がある」という感触だけは確かにありました。

急激な値動きは“サイン”ではないか?

では、その癖を何を基準に捉えるべきか

考えた末に行き着いたのが、
急激な値動きそのものが大きなサインなのではないか、という仮説です。

最初に作ったプログラムはとてもシンプルで、

「1分間に60円以上の上昇または下落があったら、スマホに通知する」

というものでした。

この通知ソフトを動かしながら数日分の株価データを蓄積し、
「次はこのデータを使って分析してみよう」と考えました。

それが、今回の記事の内容です。

確率を数値で可視化する

最初に考えたのは、次のような指標です。

1分間にT円以上動いたとき、N時間以内にU円動く確率

これをCSVとして出力してみました。

例えば、

「1分間に60円急上昇したとき、2時間以内に100円上がる確率は70%」

といった数値を求めるイメージです。

以下が、ChatGPTに出力してもらったソースコードです。

import glob
import pandas as pd

def movement_trigger_reach_prob_pd(df, trigger=60, hours=2, target=200, verbose=False):
    df["delta_1m"] = df["Price"] - df["Price"].shift(freq="1min")
    if verbose:
        # 自然言語の内部生成
        if trigger > 30:
            trigger_desc = "急騰"
        elif trigger < -30:
            trigger_desc = "急落"
        else:
            trigger_desc = "上昇" if trigger > 0 else "下落"

        target_desc = "上昇" if target > 0 else "下落"
        hour_word = "時間以内"

        t = abs(trigger)
        u = abs(target)

        print(f"1分間に{t}円の{trigger_desc}が発生したとき、{hours}{hour_word}に{u}円{target_desc}する確率")
    
    if trigger > 0:
        trigger_mask = df["delta_1m"] >= trigger
    else:
        trigger_mask = df["delta_1m"] <= trigger

    trigger_times = df.index[trigger_mask]
    if verbose:
        print(f"トリガー発生回数: {len(trigger_times)}")

    success = 0
    total = 0

    for t in trigger_times:
        price_at_t = df.at[t, "Price"]

        # t から 2時間後までのデータ
        window = df.loc[t : t + pd.Timedelta(hours=hours)]

        # データの最後近くで 2時間分ない場合は除外(好みで変えてOK)
        if len(window) == 0:
            continue

        if target > 0:
            # hour時間以内に +target円以上上がったか?
            max_future = window["Price"].max()
            if max_future - price_at_t >= target:
                success += 1
        else:
            # hour時間以内に +target円以上下がったか?
            min_future = window["Price"].min()
            if min_future - price_at_t <= target:
                success += 1

        total += 1

    if total > 0:
        prob = success / total
        if verbose:
            print(f"成功: {success} / {total}")
            print(f"条件付き確率 ≒ {prob:.3f} ({prob*100:.2f}%)")
        return prob
    else:
        if verbose:
            print("トリガーが1件もありませんでした")
        return np.nan

データ収集の仕組み

株価監視プログラムでは、

  • 10秒おきに「時刻」と「株価」をペアで記録
  • 株価の動きが止まったらCSVとして出力
  • データをリセット
  • 再び動き出したら記録を再開

という仕組みになっています。

次に、フォルダ内のCSVファイルをすべて連結し、
空白時間を補完したうえで、確率表を作成するプログラムをChatGPTに書いてもらいました。

# フォルダ内のcsvデータを読み込み・結合処理
def load_csv_folder(folder):
    files = glob.glob(folder + "/*.csv")   # data フォルダの CSV 全部
    df_list = []
    for f in files:
        df = pd.read_csv(f, parse_dates=["Timestamp"])
        df_list.append(df)
    df = pd.concat(df_list, ignore_index=True)
    df = df.sort_values("Timestamp").reset_index(drop=True)
    df.resample("10s", on="Timestamp").last().ffill()
    return df

# ---------------------------------------------------------
# 総当たり計算
# ---------------------------------------------------------
# 結果を格納する DataFrame(行=T、列=UP)
def make_reach_prob_csv(df, hours, session_tag="day"):
    T_values = np.arange(-100, 101, 10)
        # df から価格配列を取得
    prices = df["Price"].values

    # --- ① 時間別の固定レンジ ---
    if hours <= 2:
        rng = 200   # ±200円
    elif hours <= 6:
        rng = 400   # ±400円
    else:
        # --- ② 価格データから動的レンジを決定 ---
        p_min = prices.min()
        p_max = prices.max()

        # 最大変動幅
        span = max(abs(p_max - p_min), 1)

        # 200円刻みの倍数に切り上げ
        rng = int(np.ceil(span / 200)) * 200
        rng = min(rng, 1000)
    UP_values = np.arange(-rng, rng + 1, 10)
    result_df = pd.DataFrame(index=T_values, columns=UP_values, dtype=float)
    for T in T_values:
        for UP in UP_values:
            prob = movement_trigger_reach_prob_pd(T, hours, UP)
            result_df.at[T, UP] = prob
            print(f"T={T}, UP={UP} → {prob}")
    # CSV 出力
    result_df_str = result_df.applymap(lambda x: f"{x*100:.2f}%" if pd.notna(x) else "")
    filename = f"movement_trigger_reach_prob_matrix_pd_{session_tag}_{hours}.csv"
    result_df_str.to_csv(filename, encoding="utf-8-sig")
    print(f"CSV 出力完了: {filename}")

#--------------------------------------
# メイン処理開始 
#--------------------------------------
if __name__ == "__main__":
    df_day = load_csv_folder(data_folder_day)
    df_night = load_csv_folder(data_folder_night)

    make_reach_prob_csv(df_day, 1)
    make_reach_prob_csv(df_day, 2)
    make_reach_prob_csv(df_day, 6)
    make_reach_prob_csv(df_day, 12)
    make_reach_prob_csv(df_night, 1, "night")
    make_reach_prob_csv(df_night, 2, "night")
    make_reach_prob_csv(df_night, 6, "night")
    make_reach_prob_csv(df_night, 12, "night")

さらに、その確率表を読み込み、ヒートマップとして可視化します。(ChatGPTに出力してもらいました)

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import mplcursors
import re

# ==== 日本語フォント設定(Windows用) ====
plt.rcParams['font.family'] = 'Yu Gothic'

def parse_path_for_session_hours(filename):
    match = re.search(r"_([a-zA-Z]+)_(\d+)\.csv$", filename)
    if match:
        word = match.group(1)   # "day"
        number = match.group(2) # "6"
        return word, number
    return None, None


def show_heatmap_from_csv(filename):
    word, number = parse_path_for_session_hours(filename)
    if word is None or number is None:
        return
    print(word)
    print(number)
    # ==== CSV 読み込み ====
    df = pd.read_csv(filename, index_col=0)

    # パーセント表記 → float に変換(0〜1)
    df_float = df.replace('%','', regex=True).astype(float) / 100

    # ==== ヒートマップ描画 ====
    plt.figure(figsize=(6, 4))

    ax = plt.gca()
    heat = sns.heatmap(
        df_float,
        ax=ax,
        cmap="coolwarm",
        annot=False,
        vmin=0, vmax=1,
        cbar_kws={"label": "成功確率"}
    )

    # ==== ★ ダミー画像を重ねて mplcursors を使う ====
    # sns.heatmap が作る QuadMesh は mplcursors 非対応なので、
    # 同じサイズの行列を imshow で描いて、それにカーソルを付ける。
    dummy_image = ax.imshow(
        df_float.to_numpy(),
        zorder=-1,      # ヒートマップの「後ろ」に置く
        aspect="auto"
    )

    cursor = mplcursors.cursor(dummy_image, hover=True)

    @cursor.connect("add")
    def on_add(sel):
        # sel.target は (x, y) 座標(0.0, 1.0, 2.0 ... の中心付近)
        x = int(sel.target[0])
        y = int(sel.target[1])

        # 念のため範囲チェック
        if 0 <= x < df_float.shape[1] and 0 <= y < df_float.shape[0]:
            col_label = df_float.columns[x]
            row_label = df_float.index[y]
            value = df_float.iat[y, x]  # 0〜1 の float

            # ツールチップの中身(行・列ラベル+パーセンテージ)
            sel.annotation.set(
                text=f"T={row_label}, UP={col_label}\n{value:.2%}"
            )
            # 背景を少し不透明にして見やすく
            sel.annotation.get_bbox_patch().set(alpha=0.9)

    # ==== 軸ラベル調整 ====

    # T軸(行) 見やすさ優先で全部出す例
    yticks = range(0, len(df_float.index), 1)
    plt.yticks(yticks, df_float.index[yticks], fontsize=6, rotation=0)

    # UP軸(列)は約10個に間引き
    step = max(1, int(len(df_float.columns) / 10))
    xticks = range(0, len(df_float.columns), step)
    plt.xticks(xticks, df_float.columns[xticks], fontsize=6, rotation=45)

    # ==== タイトル(日本語OK) ====
    plt.title(f"T(1分変化)× UP({number}時間後の変化幅)成功確率ヒートマップ:{word}", fontsize=10)
    plt.xlabel(f"UP({number}時間後の変化幅)", fontsize=14)
    plt.ylabel("T(1分間の変化幅)", fontsize=14)

    plt.tight_layout()
    plt.show()


#--------------------------------------
# メイン処理開始 
#--------------------------------------
if __name__ == "__main__":
    show_heatmap_from_csv("movement_trigger_reach_prob_matrix_df_day_2.csv")
    show_heatmap_from_csv("movement_trigger_reach_prob_matrix_df_night_2.csv")

ヒートマップで見えた「癖」

以下が、出力したヒートマップ画像です(数日分)。

左が日中、右が夜間の値動きです。
見事に癖が出ています。

横向きの放物線が、うっすらと見えるでしょうか。

  • 日中は、大きな値動きがないと徐々に価格が上がる
  • 夜間は、徐々に価格が下がる傾向がある

という性質が読み取れます。

日中は、±60円程度までは放物線のように動き、
それを超えると大きく下落しやすい(相場が荒れているとき)。

一方、夜間では、大きく下がった後は戻す確率が高いことが分かります。

どちらも、60円付近をピークに放物線が崩れているのが印象的です。

「1分間に60円」という最初の直感は、
どうやら間違っていなかったようです。(ただの感ですがw)

こちらは6時間のヒートマップですが、放物線がくっきりと出ています。

日中と夜間で逆向きなのが面白いです。

こちらは12時間のヒートマップ。

こちらは1日の取引時間をオーバーしてるため、正常なデータではないかもしれません。(日中と夜間を分離しているため)

ChatGPTに評価してもらう

出力したCSVのテキストデータをそのままChatGPTに貼り付け、
「このデータを評価・分析してください」と依頼すると、
特徴を的確に分析してくれます。

~ですが、分析してください。(あらかじめ表の形式を説明しておく)
[CSVテキストを貼り付け]

このヒートマップを見る限り、

🎯 【+60円〜+70円あたりの急騰がもっとも上昇期待値が高い】
という傾向が明確に確認できます。
以下、あなたのデータを統計的に読み解きます。

🔍 1. 「+60円の急騰後」列の特徴
たとえば T = +60円 の行を見ると:

100.00%, 96.30%, 88.89%, 83.33%, 74.07%, 66.67%, 61.11%, 55.56%, 50.00% ...
UP=0〜120円あたりで最大100% → ゆっくり下がる曲線
これが“最も綺麗に右肩下がり”しており、トレンドが続きやすいことを示します。

🔍 2. T=40円・50円よりも T=60円のほうが強い
T=40, 50円の行も強いですが、T=60円はさらに強い:
UP=20〜100 の上昇成功率が一番高い
0〜100円付近の成功率の“平坦な高さ”が際立つ
上方向のエッジの立ち方が最も鋭い
つまり
👉 +60円の急騰は、最も“突破力”が高い局面
を示唆しています。

🔍 3. T=70〜90円はさらに上がるかと思いきや弱くなる理由
ヒートマップを見ると T=70, 80, 90円は:
成功率が高い部分はあるが
安定していない
サンプル数が少ないため結果がバラつく
つまり
👉 非日常級の急騰(+80〜+100円)は「過激すぎて統計が安定しない」
という状態になっています。
(実際、1分で80円〜100円上昇はほとんど発生しない)

🟧 4. まとめ:最も「買い」で優位性があるのは +60〜70円
このマップを統計的に読むと:
1分急騰幅
継続上昇の期待
理由
+10〜30円
中程度
ノイズ領域
+40〜50円
やや強い
トレンドの初動層
+60〜70円
最も強い(買い優位)
明確なトレンド発生点
+80〜100円
不安定
サンプル不足+過熱感
つまり、
🟩 最適な“順張り買いのトリガー”は +60〜70円
これは裁量トレーダーがよく言う
「一気に60〜70円動いたら、大抵はさらに伸びる」
という経験則とも完全に一致しています。

🧠 なぜ +60〜70円が最適なのか?(理論的理由)
1分でこれだけ動くのはトレンドが発生した瞬間
まだ過熱しすぎておらず“反転リスクが少ない”
価格帯が多く、統計的にもサンプルが多い
アルゴが勢いを検知して追随する確率が高い
伸びやすいが、崩れにくい絶妙なライン

🪤 逆に +80〜100円が弱くなる理由
過熱感による反転も増える
そもそもサンプルが少なすぎる
寄り付き直後の異常値が混じる
アルゴの利確が入りやすい

🎯 結論
+60〜70円の急騰後 → 最も上昇期待値が高い(順張りが強い)
✔ 1分で+40〜50円も強いが少し弱い
✔ +80〜100円は不安定でサンプル不足
✔ 買い戦略を作るなら +60〜70円が最適ゾーン

内容を見てみると、まったく違和感のない分析です。
単なる表面的な要約ではなく、
データの傾向や特徴をきちんと捉えています。

正直なところ、
明らかにLLM(大規模言語モデル)の域を超えて「考えている」
と感じました。

内部で何らかの外部ソフトウェアを使っているのか、
あるいはそれに近い処理をしているのでは……と
疑ってしまうレベルです。

トレードへの応用

急落・急上昇を検知したら、
このヒートマップを見てエントリー価格を選ぶだけです。

当初は、

  • スマホに通知が来る
  • ヒートマップを見る
  • 確率80%付近から価格を決めてエントリー

という流れでしたが、
どうしても反応が遅れるという問題がありました。

そこで、通知メッセージ自体に
想定される価格帯を追記するようにしました。

これは確率60%の価格レンジを示しています。

通知が来る

価格はこの範囲で動く確率が高い

下限付近 → 買い

上限付近 → 売り

というシンプルな取引です。

精度は……体感で7〜8割程度
(MSQ前日の“魔の水曜日”で痛い目を見ましたが)

価格の波が、ドンピシャで反転することも多く
自分でも正直驚いています。
(もちろん、バイアスがかかっている可能性はあります)

メンタル面の変化

これまでは、

  • 買うと下がる
  • 売ると上がる
  • 手の内を読まれているのでは?

と疑心暗鬼になることが多くありました。

しかし、この仕組みを導入してからは、

  • 買うと上がり始める
  • 売ると下がり始める

という値動きの波に乗れている感覚があります。

何より大きいのは、
「データ」という精神的支柱があることで心が安定する点です。

どうでしょう、勝率はなかなかではないでしょうか。
(魔の水曜日に大負けしていますがw)

今はマイクロで遊んでいますが、
安定して勝てるようになったらミニにも挑戦してみようと思います。

numpyを使った効率化

確率分布を計算するプログラムでは、

  • 10秒ごとの株価
  • 数百データ先まで
  • UP円を超えるかどうか

という探索処理を、

行20 × 列40回行います。

このため、Pythonでは計算に非常に時間がかかります。

Pandasを使った実装では、
1つの表を作るのに約80秒かかっていました。

そこでChatGPTに相談し、
numpyを使った行列計算に書き換えたところ、
約3秒で出力できるようになりました。

def movement_trigger_reach_prob_np(price, trigger, hours, target, step=6):
    N = len(price)
    horizon = hours * 60 * step  # 10秒足→6本=1分

    delta_1m = price[step:] - price[:-step]

    trigger_mask = (delta_1m >= trigger) if trigger > 0 else (delta_1m <= trigger)

    idx = np.where(trigger_mask)[0]
    if len(idx) == 0:
        return np.nan

    success = 0
    total = 0

    for i in idx:
        start_price = price[i + step]

        end_idx = i + step + horizon
        if end_idx >= N:
            continue

        window = price[i + step : end_idx + 1]

        if target > 0:
            if window.max() - start_price >= target:
                success += 1
        else:
            if window.min() - start_price <= target:
                success += 1

        total += 1

    return success / total if total > 0 else np.nan

ただし、それでも

  • 日中:1h / 2h / 6h / 12h
  • 夜間:1h / 2h / 6h / 12h

合計8ファイルを出力するのに、
1分近くかかってしまいます。

GPU(CUDA)での爆速化

さらに高速化するため、
**GPUを使った処理(CuPy)**をChatGPTに書いてもらいました。

def prob_matrix_reach_gpu(
    price_array: np.ndarray,
    hours: int,
    T_VALUES: list,
    UP_VALUES: list,
    step: int = 6,
):
    """
    1分変化(T)をトリガーとして、hours時間以内にUPに到達する確率行列をGPUで計算。
    行: T(-T_abs_max..+T_abs_max)
    列: UP(-UP_abs_max..+UP_abs_max)
    戻り: (result_np, T_VALUES, UP_VALUES)
      result_np は numpy.ndarray (float32, 0..1)
    """
    lookahead = hours * 60 * step           # 2時間なら 720
    price = cp.asarray(price_array, dtype=cp.float32)
    N = int(price.size)
    if N <= lookahead + step:
        raise ValueError("データが短すぎます(lookahead+step以上が必要)")

    # ---------------------------------------------------------
    # 未来 hours 以内の max/min を全時点で計算(トリガー時刻 t に対して)
    # future_up_full[t]   = max(price[t+1..t+lookahead]) - price[t]
    # future_down_full[t] = min(price[t+1..t+lookahead]) - price[t]
    # t は 0..N-lookahead-1
    # ---------------------------------------------------------
    windows = cp.lib.stride_tricks.sliding_window_view(price, window_shape=lookahead + 1)
    base = windows[:, 0]
    fut  = windows[:, 1:]  # t自身は除外(t以降の未来のみ)

    future_up_full   = fut.max(axis=1) - base
    future_down_full = fut.min(axis=1) - base

    # ---------------------------------------------------------
    # 1分変化 delta_1m を作る
    # delta_1m_full[k] は時刻 t = k+step の 1分変化(price[t]-price[t-step])
    # t は step..N-1
    # ---------------------------------------------------------
    delta_1m_full = price[step:] - price[:-step]

    # ---------------------------------------------------------
    # ★ここが最重要:トリガー時刻を揃える
    # 有効なトリガー時刻 t は step..N-lookahead-1
    #
    # future_*_full は t=0..N-lookahead-1
    # delta_1m_full は t=step..N-1(indexは t-step)
    #
    # なので
    #  future_*  = future_*_full[step : N-lookahead]
    #  delta_1m  = delta_1m_full[0 : N-lookahead-step]
    # ---------------------------------------------------------
    L = (N - lookahead) - step  # 有効サンプル数
    delta_1m = delta_1m_full[:L]
    future_up = future_up_full[step:step + L]
    future_down = future_down_full[step:step + L]

    # 出力行列
    result = cp.empty((T_VALUES.size, UP_VALUES.size), dtype=cp.float32)

    T_gpu  = cp.asarray(T_VALUES, dtype=cp.float32)
    UP_gpu = cp.asarray(UP_VALUES, dtype=cp.float32)

    for i in range(T_gpu.size):
        T = T_gpu[i]

        # あなたの既存ルール互換(T>0: >=、それ以外: <=)
        trig = (delta_1m >= T) if float(T) > 0 else (delta_1m <= T)
        total = int(trig.sum())

        if total == 0:
            result[i, :] = cp.nan
            continue

        fu = future_up[trig]
        fd = future_down[trig]

        # UP列を一気に処理(ソート+searchsorted)
        fu_sorted = cp.sort(fu)
        fd_sorted = cp.sort(fd)

        ups = UP_gpu

        pos = ups > 0
        neg = ups < 0
        zero = ups == 0

        prob = cp.empty_like(ups, dtype=cp.float32)

        # UP>0 : fu >= UP
        if pos.any():
            idx = cp.searchsorted(fu_sorted, ups[pos], side="left")
            prob[pos] = (total - idx) / total

        # UP<0 : fd <= UP(より下に到達)
        if neg.any():
            idx = cp.searchsorted(fd_sorted, ups[neg], side="right")
            prob[neg] = idx / total

        # UP==0 : fu >= 0(「上方向に0以上」到達の定義。必要なら変更可)
        if zero.any():
            prob[zero] = 1.0

        result[i, :] = prob

    return result.get(), T_VALUES, UP_VALUES

実行してみると、

  • CUDA起動のオーバーヘッド:約5秒
  • 計算開始後:8ファイルを0.5秒未満で出力

という圧巻の結果でした。

numpyまでは理解できたのですが、
cupyのコードは正直さっぱりです。

「行列と行列をぶつけて、行列を得る」
──そんな世界。

読むことは何とかできても、
自分で書くのは無理だと痛感しました。
(ChatGPTに1行1行解説してもらって何とか理解してますw)

GPUを使った行列計算の圧倒的な速度を体験し、
「これはぜひマスターしたいスキルだ」と思いました。

まとめ

今回は、PythonとChatGPTを使った自己流の株価分析を紹介しました。

私が出したのは仕組みのアイデアだけで、ソースコードのほとんどはChatGPTに書いてもらっています。(この記事も校正してもらってますw)

まだデータ量も少なく、
プログラムの信ぴょう性も検証途中ではありますが、
上限・下限でピタリと止まる場面が多く何度も驚かされています。

最初は、

「多数の要素が複雑に絡んでいるから、未来は絶対に読めない」

と思っていました。

しかし今では、

「多数の要素が絡んでいるからこそ、確率には逆らえない」

と考えるようになっています。

相場に「必勝法」はありませんが、このようにデータ分析をすることによって
自分が納得できる判断軸を持つことはできます。

今回の取り組みは、その第一歩でした。
これからもデータと向き合いながら、
自分なりの相場観を磨いていこうと思います。

おすすめ

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です