Files
MoFin/trend_detector.py
知微 a1d789ddab trend_detector + xiaoguo_news_processor 全链路
- trend_detector.py: 6类信号检测(资金异动/涨跌比反转/领涨更替/趋势拐点/量价背离/普涨背离)
- xiaoguo_news_processor.py: akshare搜新闻+小果LLM情感分析
- mofin_db.py: 新增 sector_signals + signal_news 两张表
- 文档更新:新增第四章实时信号检测与小果情报处理
- 测试结果:趋势检测已通过,信号写入正常
2026-06-20 22:20:54 +08:00

304 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""trend_detector.py — 板块异常信号检测
配合 market_watch(每30分)运行,从最新 snapshot 中检测6类信号:
1. 资金异动 — 净流入/出远超近期均值
2. 涨跌比反转 — 板块内涨跌家数比例突变
3. 领涨股更替 — 领涨股换人
4. 趋势拐点 — 连续流入→转流出 或 连续流出→转入流
5. 量价背离 — 涨但资金流出 / 跌但资金流入
6. 普涨背离 — 板块大涨但上涨家数<50%
检测到信号后写入 sector_signals 表。
"""
import json
import sqlite3
import sys
from datetime import datetime
from pathlib import Path
DATA_DIR = Path(__file__).parent / "data"
DB_PATH = DATA_DIR / "mofin.db"
def get_conn():
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def get_recent_snapshots(conn, n=20):
"""取最近 n 次 market_snapshots"""
return conn.execute(
"SELECT id, timestamp, up_ratio, mood FROM market_snapshots ORDER BY id DESC LIMIT ?",
(n,)
).fetchall()
def get_sectors_for_snapshot(conn, snapshot_id):
"""取指定 snapshot 的全部板块数据"""
rows = conn.execute(
"SELECT * FROM sector_snapshots WHERE snapshot_id = ?", (snapshot_id,)
).fetchall()
return [dict(r) for r in rows]
def get_sector_history(conn, sector_name, n=20):
"""取某板块最近 n 次采集记录"""
return conn.execute("""
SELECT ss.*, ms.timestamp
FROM sector_snapshots ss
JOIN market_snapshots ms ON ss.snapshot_id = ms.id
WHERE ss.name = ?
ORDER BY ms.id DESC LIMIT ?
""", (sector_name, n)).fetchall()
def get_holdings(conn):
"""取活跃持仓"""
return conn.execute("SELECT code, name FROM holdings WHERE is_active=1").fetchall()
def get_watchlist(conn):
"""取活跃自选"""
return conn.execute("SELECT code, name FROM watchlist_stocks WHERE is_active=1").fetchall()
def get_sector_for_stock(conn, code):
"""查个股对应的板块"""
row = conn.execute(
"SELECT sector_name FROM stock_sectors WHERE code = ?", (code,)
).fetchone()
return row[0] if row else None
def write_signal(conn, signal_type, sector, severity, related_stocks,
holdings_list, watchlist_list, trigger_reason, snapshot_id):
"""写入信号到 sector_signals"""
# 同板块同类型24小时内已有信号则跳过
existing = conn.execute("""
SELECT id FROM sector_signals
WHERE sector = ? AND signal_type = ?
AND datetime(detected_at) >= datetime('now', '-1 day')
LIMIT 1
""", (sector, signal_type)).fetchone()
if existing:
return False
conn.execute("""
INSERT INTO sector_signals
(signal_type, sector, severity, related_stocks,
holdings_in_sector, watchlist_in_sector,
trigger_reason, snapshot_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
signal_type, sector, severity,
json.dumps(related_stocks, ensure_ascii=False),
json.dumps(holdings_list, ensure_ascii=False) if holdings_list else None,
json.dumps(watchlist_list, ensure_ascii=False) if watchlist_list else None,
trigger_reason, snapshot_id
))
conn.commit()
return True
def check_signals(conn, latest, prev_snapshots):
"""对最新 snapshot 检测6类信号"""
latest_id = latest["id"]
sectors = get_sectors_for_snapshot(conn, latest_id)
if not sectors:
return
# 取持仓和自选
holdings = {r["code"]: r["name"] for r in get_holdings(conn)}
watchlist = {r["code"]: r["name"] for r in get_watchlist(conn)}
# 取上一次 snapshot 用于对比(如果有)
prev_id = None
if len(prev_snapshots) >= 2:
prev_id = prev_snapshots[1]["id"]
prev_sectors = get_sectors_for_snapshot(conn, prev_id) if prev_id else []
# 构建 name→sector 映射
prev_map = {s["name"]: s for s in prev_sectors}
for s in sectors:
name = s["name"]
change = s["change_pct"] or 0
net_inflow = s["net_inflow"] or 0
up_count = s["up_count"] or 0
down_count = s["down_count"] or 0
total = up_count + down_count
up_ratio = up_count / total if total > 0 else None
lead_stock = s["lead_stock"] or ""
# 近期历史
history = get_sector_history(conn, name, 20)
# 计算近期均值(后续多重信号共用)
mean = 0
if len(history) >= 3:
recent_inflows = [abs(h["net_inflow"] or 0) for h in history[:10]]
mean = sum(recent_inflows) / len(recent_inflows) if recent_inflows else 0
# 信号1:资金异动
if net_inflow and len(history) >= 5 and mean > 0:
recent_inflows = [abs(h["net_inflow"] or 0) for h in history[:20]]
new_mean = sum(recent_inflows) / len(recent_inflows)
std = (sum((x - new_mean) ** 2 for x in recent_inflows) / len(recent_inflows)) ** 0.5
if std > 0 and abs(net_inflow) > mean + 3 * std:
direction = "净流入" if net_inflow > 0 else "净流出"
sev = "high" if abs(net_inflow) > mean + 5 * std else "medium"
related = _get_related_stocks(conn, name, lead_stock, change)
holdings_list = _match_holdings(related, holdings)
watchlist_list = _match_holdings(related, watchlist)
ok = write_signal(conn, "资金异动", name, sev, related,
holdings_list, watchlist_list,
f"{direction}{abs(net_inflow):.0f}亿(均值{mean:.0f}亿,超{abs(net_inflow)/max(mean,0.01):.0f}倍)",
latest_id)
if ok:
print(f" ⚠️ 资金异动 [{sev}] {name}: {direction}{abs(net_inflow):.0f}亿", flush=True)
# 信号2:涨跌比反转(相比上一次)
prev = dict(prev_map[name]) if name in prev_map else {}
if prev and up_ratio is not None and prev["up_count"] and prev["down_count"]:
prev_total = prev["up_count"] + prev["down_count"]
prev_ratio = prev["up_count"] / prev_total if prev_total > 0 else 0
if abs(up_ratio - prev_ratio) > 0.3: # 涨跌比变化超过30个百分点
direction = "转强" if up_ratio > prev_ratio else "转弱"
sev = "high" if abs(up_ratio - prev_ratio) > 0.5 else "medium"
related = _get_related_stocks(conn, name, lead_stock, change)
holdings_list = _match_holdings(related, holdings)
watchlist_list = _match_holdings(related, watchlist)
ok = write_signal(conn, "涨跌比反转", name, sev, related,
holdings_list, watchlist_list,
f"上涨占比{prev_ratio*100:.0f}%→{up_ratio*100:.0f}%{direction}",
latest_id)
if ok:
print(f" ⚠️ 涨跌比反转 [{sev}] {name}: {prev_ratio*100:.0f}%→{up_ratio*100:.0f}% {direction}", flush=True)
# 信号3:领涨股更替
prev_lead = prev.get("lead_stock", "") if prev else ""
if lead_stock and prev_lead and lead_stock != prev_lead:
related = _get_related_stocks(conn, name, lead_stock, change)
holdings_list = _match_holdings(related, holdings)
watchlist_list = _match_holdings(related, watchlist)
ok = write_signal(conn, "领涨股更替", name, "medium", related,
holdings_list, watchlist_list,
f"领涨股从「{prev_lead}」换成「{lead_stock}",
latest_id)
if ok:
print(f" ⚠️ 领涨股更替 [{name}] {prev_lead}{lead_stock}", flush=True)
# 信号4:趋势拐点(连续净流入突然转流出,反之亦然)
if net_inflow and len(history) >= 4:
recent = [h["net_inflow"] or 0 for h in history[:4]]
all_positive = all(r > 0 for r in recent[:3])
all_negative = all(r < 0 for r in recent[:3])
if all_positive and net_inflow < 0 and abs(net_inflow) > mean * 0.5:
related = _get_related_stocks(conn, name, lead_stock, change)
holdings_list = _match_holdings(related, holdings)
watchlist_list = _match_holdings(related, watchlist)
ok = write_signal(conn, "趋势拐点", name, "high", related,
holdings_list, watchlist_list,
f"连续3次净流入后转流出{abs(net_inflow):.0f}亿",
latest_id)
if ok:
print(f" ⚠️ 趋势拐点 [high] {name}: 连续流入→转流出{abs(net_inflow):.0f}亿", flush=True)
elif all_negative and net_inflow > 0 and net_inflow > abs(sum(recent[:3])) * 0.5:
related = _get_related_stocks(conn, name, lead_stock, change)
holdings_list = _match_holdings(related, holdings)
watchlist_list = _match_holdings(related, watchlist)
ok = write_signal(conn, "趋势拐点", name, "high", related,
holdings_list, watchlist_list,
f"连续3次净流出后转入流{net_inflow:.0f}亿",
latest_id)
if ok:
print(f" ⚠️ 趋势拐点 [high] {name}: 连续流出→转入流{net_inflow:.0f}亿", flush=True)
# 信号5:量价背离
if net_inflow and change and abs(change) > 2:
if change > 0 and net_inflow < -abs(mean or 1):
related = _get_related_stocks(conn, name, lead_stock, change)
holdings_list = _match_holdings(related, holdings)
watchlist_list = _match_holdings(related, watchlist)
ok = write_signal(conn, "量价背离", name, "medium", related,
holdings_list, watchlist_list,
f"板块涨{change:+.2f}%但资金净流出{abs(net_inflow):.0f}亿",
latest_id)
if ok:
print(f" ⚠️ 量价背离 [{name}] 涨{change:+.2f}%但流出{abs(net_inflow):.0f}亿", flush=True)
elif change < 0 and net_inflow > abs(mean or 1):
related = _get_related_stocks(conn, name, lead_stock, change)
holdings_list = _match_holdings(related, holdings)
watchlist_list = _match_holdings(related, watchlist)
ok = write_signal(conn, "量价背离", name, "medium", related,
holdings_list, watchlist_list,
f"板块跌{change:+.2f}%但资金净流入{net_inflow:.0f}亿(吸筹信号)",
latest_id)
if ok:
print(f" ⚠️ 量价背离 [{name}] 跌{change:+.2f}%但流入{net_inflow:.0f}亿(吸筹)", flush=True)
# 信号6:普涨背离
if up_ratio is not None and change > 3 and up_ratio < 0.5:
related = _get_related_stocks(conn, name, lead_stock, change)
holdings_list = _match_holdings(related, holdings)
watchlist_list = _match_holdings(related, watchlist)
ok = write_signal(conn, "普涨背离", name, "medium", related,
holdings_list, watchlist_list,
f"板块涨{change:+.2f}%但仅{up_count}/{total}家上涨(分化严重)",
latest_id)
if ok:
print(f" ⚠️ 普涨背离 [{name}] 涨{change:+.2f}%但仅{up_count}/{total}家上涨", flush=True)
def _get_related_stocks(conn, sector_name, lead_stock, change_pct):
"""获取板块相关个股(领涨股 + board 成分股)"""
stocks = []
if lead_stock:
stocks.append({"name": lead_stock, "code": "", "change_pct": 0, "role": "领涨"})
# 从 stock_sectors 表取成分股
members = conn.execute(
"SELECT s.code, s.name FROM stocks s "
"JOIN stock_sectors ss ON s.code = ss.code "
"WHERE ss.sector_name = ? LIMIT 5",
(sector_name,)
).fetchall()
for m in members:
if not any(s.get("name") == m["name"] for s in stocks):
stocks.append({"name": m["name"], "code": m["code"], "change_pct": 0, "role": "成分"})
return stocks
def _match_holdings(stocks, holding_dict):
"""匹配相关个股中的持仓/自选"""
matched = []
for s in stocks:
code = s.get("code", "")
if code in holding_dict:
matched.append({"code": code, "name": holding_dict[code]})
return matched
def main():
conn = get_conn()
# 取最近 snapshots
snapshots = get_recent_snapshots(conn, 5)
if len(snapshots) < 2:
print(f"数据不足: 只有 {len(snapshots)} 次采集,需要至少2次", flush=True)
conn.close()
return
latest = dict(snapshots[0])
print(f"检测最新 snapshot: {latest['timestamp']} (id={latest['id']})", flush=True)
check_signals(conn, latest, snapshots)
conn.close()
print("检测完成", flush=True)
if __name__ == "__main__":
main()