#!/usr/bin/env python3 """trend_detector.py — 板块异常信号检测 配合 market_watch(每30分)运行,从最新 snapshot 中检测6类信号: 1. 资金异动 — 净流入/出远超近期均值 2. 涨跌比反转 — 板块内涨跌家数比例突变 3. 领涨股更替 — 领涨股换人 4. 趋势拐点 — 连续流入→转流出 或 连续流出→转入流 5. 量价背离 — 涨但资金流出 / 跌但资金流入 6. 普涨背离 — 板块大涨但上涨家数<50% 检测到信号后写入 sector_signals 表。 """ import json import sqlite3 import sys from datetime import datetime from pathlib import Path DATA_DIR = Path(__file__).parent / "data" DB_PATH = DATA_DIR / "mofin.db" def get_conn(): conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row return conn def get_recent_snapshots(conn, n=20): """取最近 n 次 market_snapshots""" return conn.execute( "SELECT id, timestamp, up_ratio, mood FROM market_snapshots ORDER BY id DESC LIMIT ?", (n,) ).fetchall() def get_sectors_for_snapshot(conn, snapshot_id): """取指定 snapshot 的全部板块数据""" rows = conn.execute( "SELECT * FROM sector_snapshots WHERE snapshot_id = ?", (snapshot_id,) ).fetchall() return [dict(r) for r in rows] def get_sector_history(conn, sector_name, n=20): """取某板块最近 n 次采集记录""" return conn.execute(""" SELECT ss.*, ms.timestamp FROM sector_snapshots ss JOIN market_snapshots ms ON ss.snapshot_id = ms.id WHERE ss.name = ? ORDER BY ms.id DESC LIMIT ? """, (sector_name, n)).fetchall() def get_holdings(conn): """取活跃持仓""" return conn.execute("SELECT code, name FROM holdings WHERE is_active=1").fetchall() def get_watchlist(conn): """取活跃自选""" return conn.execute("SELECT code, name FROM watchlist_stocks WHERE is_active=1").fetchall() def get_sector_for_stock(conn, code): """查个股对应的板块""" row = conn.execute( "SELECT sector_name FROM stock_sectors WHERE code = ?", (code,) ).fetchone() return row[0] if row else None def write_signal(conn, signal_type, sector, severity, related_stocks, holdings_list, watchlist_list, trigger_reason, snapshot_id): """写入信号到 sector_signals""" # 同板块同类型24小时内已有信号则跳过 existing = conn.execute(""" SELECT id FROM sector_signals WHERE sector = ? AND signal_type = ? AND datetime(detected_at) >= datetime('now', '-1 day') LIMIT 1 """, (sector, signal_type)).fetchone() if existing: return False conn.execute(""" INSERT INTO sector_signals (signal_type, sector, severity, related_stocks, holdings_in_sector, watchlist_in_sector, trigger_reason, snapshot_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( signal_type, sector, severity, json.dumps(related_stocks, ensure_ascii=False), json.dumps(holdings_list, ensure_ascii=False) if holdings_list else None, json.dumps(watchlist_list, ensure_ascii=False) if watchlist_list else None, trigger_reason, snapshot_id )) conn.commit() return True def check_signals(conn, latest, prev_snapshots): """对最新 snapshot 检测6类信号""" latest_id = latest["id"] sectors = get_sectors_for_snapshot(conn, latest_id) if not sectors: return # 取持仓和自选 holdings = {r["code"]: r["name"] for r in get_holdings(conn)} watchlist = {r["code"]: r["name"] for r in get_watchlist(conn)} # 取上一次 snapshot 用于对比(如果有) prev_id = None if len(prev_snapshots) >= 2: prev_id = prev_snapshots[1]["id"] prev_sectors = get_sectors_for_snapshot(conn, prev_id) if prev_id else [] # 构建 name→sector 映射 prev_map = {s["name"]: s for s in prev_sectors} for s in sectors: name = s["name"] change = s["change_pct"] or 0 net_inflow = s["net_inflow"] or 0 up_count = s["up_count"] or 0 down_count = s["down_count"] or 0 total = up_count + down_count up_ratio = up_count / total if total > 0 else None lead_stock = s["lead_stock"] or "" # 近期历史 history = get_sector_history(conn, name, 20) # 计算近期均值(后续多重信号共用) mean = 0 if len(history) >= 3: recent_inflows = [abs(h["net_inflow"] or 0) for h in history[:10]] mean = sum(recent_inflows) / len(recent_inflows) if recent_inflows else 0 # 信号1:资金异动 if net_inflow and len(history) >= 5 and mean > 0: recent_inflows = [abs(h["net_inflow"] or 0) for h in history[:20]] new_mean = sum(recent_inflows) / len(recent_inflows) std = (sum((x - new_mean) ** 2 for x in recent_inflows) / len(recent_inflows)) ** 0.5 if std > 0 and abs(net_inflow) > mean + 3 * std: direction = "净流入" if net_inflow > 0 else "净流出" sev = "high" if abs(net_inflow) > mean + 5 * std else "medium" related = _get_related_stocks(conn, name, lead_stock, change) holdings_list = _match_holdings(related, holdings) watchlist_list = _match_holdings(related, watchlist) ok = write_signal(conn, "资金异动", name, sev, related, holdings_list, watchlist_list, f"{direction}{abs(net_inflow):.0f}亿(均值{mean:.0f}亿,超{abs(net_inflow)/max(mean,0.01):.0f}倍)", latest_id) if ok: print(f" ⚠️ 资金异动 [{sev}] {name}: {direction}{abs(net_inflow):.0f}亿", flush=True) # 信号2:涨跌比反转(相比上一次) prev = dict(prev_map[name]) if name in prev_map else {} if prev and up_ratio is not None and prev["up_count"] and prev["down_count"]: prev_total = prev["up_count"] + prev["down_count"] prev_ratio = prev["up_count"] / prev_total if prev_total > 0 else 0 if abs(up_ratio - prev_ratio) > 0.3: # 涨跌比变化超过30个百分点 direction = "转强" if up_ratio > prev_ratio else "转弱" sev = "high" if abs(up_ratio - prev_ratio) > 0.5 else "medium" related = _get_related_stocks(conn, name, lead_stock, change) holdings_list = _match_holdings(related, holdings) watchlist_list = _match_holdings(related, watchlist) ok = write_signal(conn, "涨跌比反转", name, sev, related, holdings_list, watchlist_list, f"上涨占比{prev_ratio*100:.0f}%→{up_ratio*100:.0f}%,{direction}", latest_id) if ok: print(f" ⚠️ 涨跌比反转 [{sev}] {name}: {prev_ratio*100:.0f}%→{up_ratio*100:.0f}% {direction}", flush=True) # 信号3:领涨股更替 prev_lead = prev.get("lead_stock", "") if prev else "" if lead_stock and prev_lead and lead_stock != prev_lead: related = _get_related_stocks(conn, name, lead_stock, change) holdings_list = _match_holdings(related, holdings) watchlist_list = _match_holdings(related, watchlist) ok = write_signal(conn, "领涨股更替", name, "medium", related, holdings_list, watchlist_list, f"领涨股从「{prev_lead}」换成「{lead_stock}」", latest_id) if ok: print(f" ⚠️ 领涨股更替 [{name}] {prev_lead} → {lead_stock}", flush=True) # 信号4:趋势拐点(连续净流入突然转流出,反之亦然) if net_inflow and len(history) >= 4: recent = [h["net_inflow"] or 0 for h in history[:4]] all_positive = all(r > 0 for r in recent[:3]) all_negative = all(r < 0 for r in recent[:3]) if all_positive and net_inflow < 0 and abs(net_inflow) > mean * 0.5: related = _get_related_stocks(conn, name, lead_stock, change) holdings_list = _match_holdings(related, holdings) watchlist_list = _match_holdings(related, watchlist) ok = write_signal(conn, "趋势拐点", name, "high", related, holdings_list, watchlist_list, f"连续3次净流入后转流出{abs(net_inflow):.0f}亿", latest_id) if ok: print(f" ⚠️ 趋势拐点 [high] {name}: 连续流入→转流出{abs(net_inflow):.0f}亿", flush=True) elif all_negative and net_inflow > 0 and net_inflow > abs(sum(recent[:3])) * 0.5: related = _get_related_stocks(conn, name, lead_stock, change) holdings_list = _match_holdings(related, holdings) watchlist_list = _match_holdings(related, watchlist) ok = write_signal(conn, "趋势拐点", name, "high", related, holdings_list, watchlist_list, f"连续3次净流出后转入流{net_inflow:.0f}亿", latest_id) if ok: print(f" ⚠️ 趋势拐点 [high] {name}: 连续流出→转入流{net_inflow:.0f}亿", flush=True) # 信号5:量价背离 if net_inflow and change and abs(change) > 2: if change > 0 and net_inflow < -abs(mean or 1): related = _get_related_stocks(conn, name, lead_stock, change) holdings_list = _match_holdings(related, holdings) watchlist_list = _match_holdings(related, watchlist) ok = write_signal(conn, "量价背离", name, "medium", related, holdings_list, watchlist_list, f"板块涨{change:+.2f}%但资金净流出{abs(net_inflow):.0f}亿", latest_id) if ok: print(f" ⚠️ 量价背离 [{name}] 涨{change:+.2f}%但流出{abs(net_inflow):.0f}亿", flush=True) elif change < 0 and net_inflow > abs(mean or 1): related = _get_related_stocks(conn, name, lead_stock, change) holdings_list = _match_holdings(related, holdings) watchlist_list = _match_holdings(related, watchlist) ok = write_signal(conn, "量价背离", name, "medium", related, holdings_list, watchlist_list, f"板块跌{change:+.2f}%但资金净流入{net_inflow:.0f}亿(吸筹信号)", latest_id) if ok: print(f" ⚠️ 量价背离 [{name}] 跌{change:+.2f}%但流入{net_inflow:.0f}亿(吸筹)", flush=True) # 信号6:普涨背离 if up_ratio is not None and change > 3 and up_ratio < 0.5: related = _get_related_stocks(conn, name, lead_stock, change) holdings_list = _match_holdings(related, holdings) watchlist_list = _match_holdings(related, watchlist) ok = write_signal(conn, "普涨背离", name, "medium", related, holdings_list, watchlist_list, f"板块涨{change:+.2f}%但仅{up_count}/{total}家上涨(分化严重)", latest_id) if ok: print(f" ⚠️ 普涨背离 [{name}] 涨{change:+.2f}%但仅{up_count}/{total}家上涨", flush=True) def _get_related_stocks(conn, sector_name, lead_stock, change_pct): """获取板块相关个股(领涨股 + board 成分股)""" stocks = [] if lead_stock: stocks.append({"name": lead_stock, "code": "", "change_pct": 0, "role": "领涨"}) # 从 stock_sectors 表取成分股 members = conn.execute( "SELECT s.code, s.name FROM stocks s " "JOIN stock_sectors ss ON s.code = ss.code " "WHERE ss.sector_name = ? LIMIT 5", (sector_name,) ).fetchall() for m in members: if not any(s.get("name") == m["name"] for s in stocks): stocks.append({"name": m["name"], "code": m["code"], "change_pct": 0, "role": "成分"}) return stocks def _match_holdings(stocks, holding_dict): """匹配相关个股中的持仓/自选""" matched = [] for s in stocks: code = s.get("code", "") if code in holding_dict: matched.append({"code": code, "name": holding_dict[code]}) return matched def main(): conn = get_conn() # 取最近 snapshots snapshots = get_recent_snapshots(conn, 5) if len(snapshots) < 2: print(f"数据不足: 只有 {len(snapshots)} 次采集,需要至少2次", flush=True) conn.close() return latest = dict(snapshots[0]) print(f"检测最新 snapshot: {latest['timestamp']} (id={latest['id']})", flush=True) check_signals(conn, latest, snapshots) conn.close() print("检测完成", flush=True) if __name__ == "__main__": main()