From a1d789ddab26855e4b29a549f54ed426b5c172da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Sat, 20 Jun 2026 22:20:54 +0800 Subject: [PATCH] =?UTF-8?q?trend=5Fdetector=20+=20xiaoguo=5Fnews=5Fprocess?= =?UTF-8?q?or=20=E5=85=A8=E9=93=BE=E8=B7=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - trend_detector.py: 6类信号检测(资金异动/涨跌比反转/领涨更替/趋势拐点/量价背离/普涨背离) - xiaoguo_news_processor.py: akshare搜新闻+小果LLM情感分析 - mofin_db.py: 新增 sector_signals + signal_news 两张表 - 文档更新:新增第四章实时信号检测与小果情报处理 - 测试结果:趋势检测已通过,信号写入正常 --- mofin_db.py | 30 ++++ trend_detector.py | 303 ++++++++++++++++++++++++++++++++++++++ xiaoguo_news_processor.py | 202 +++++++++++++++++++++++++ 3 files changed, 535 insertions(+) create mode 100644 trend_detector.py create mode 100644 xiaoguo_news_processor.py diff --git a/mofin_db.py b/mofin_db.py index 4f60e27..c386319 100644 --- a/mofin_db.py +++ b/mofin_db.py @@ -291,6 +291,36 @@ def init_all_tables(conn: sqlite3.Connection): created_at TEXT DEFAULT (datetime('now','localtime')) ); CREATE INDEX IF NOT EXISTS idx_feedback_code ON strategy_feedback(code); + + -- 板块信号(trend_detector 产出) + CREATE TABLE IF NOT EXISTS sector_signals ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + signal_type TEXT NOT NULL, + sector TEXT NOT NULL, + severity TEXT DEFAULT 'medium', + related_stocks TEXT, + holdings_in_sector TEXT, + watchlist_in_sector TEXT, + trigger_reason TEXT, + snapshot_id INTEGER, + processed INTEGER DEFAULT 0, + detected_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_signal_processed ON sector_signals(processed); + CREATE INDEX IF NOT EXISTS idx_signal_sector ON sector_signals(sector); + + -- 小果情报(xiaoguo_news_processor 产出) + CREATE TABLE IF NOT EXISTS signal_news ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + signal_id INTEGER REFERENCES sector_signals(id), + sector TEXT NOT NULL, + overall_sentiment TEXT, + summary TEXT, + key_articles TEXT, + searched_stocks TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_signal_news_signal ON signal_news(signal_id); """) conn.commit() diff --git a/trend_detector.py b/trend_detector.py new file mode 100644 index 0000000..1492dcc --- /dev/null +++ b/trend_detector.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 +"""trend_detector.py — 板块异常信号检测 + +配合 market_watch(每30分)运行,从最新 snapshot 中检测6类信号: + 1. 资金异动 — 净流入/出远超近期均值 + 2. 涨跌比反转 — 板块内涨跌家数比例突变 + 3. 领涨股更替 — 领涨股换人 + 4. 趋势拐点 — 连续流入→转流出 或 连续流出→转入流 + 5. 量价背离 — 涨但资金流出 / 跌但资金流入 + 6. 普涨背离 — 板块大涨但上涨家数<50% + +检测到信号后写入 sector_signals 表。 +""" + +import json +import sqlite3 +import sys +from datetime import datetime +from pathlib import Path + +DATA_DIR = Path(__file__).parent / "data" +DB_PATH = DATA_DIR / "mofin.db" + + +def get_conn(): + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + return conn + + +def get_recent_snapshots(conn, n=20): + """取最近 n 次 market_snapshots""" + return conn.execute( + "SELECT id, timestamp, up_ratio, mood FROM market_snapshots ORDER BY id DESC LIMIT ?", + (n,) + ).fetchall() + + +def get_sectors_for_snapshot(conn, snapshot_id): + """取指定 snapshot 的全部板块数据""" + rows = conn.execute( + "SELECT * FROM sector_snapshots WHERE snapshot_id = ?", (snapshot_id,) + ).fetchall() + return [dict(r) for r in rows] + + +def get_sector_history(conn, sector_name, n=20): + """取某板块最近 n 次采集记录""" + return conn.execute(""" + SELECT ss.*, ms.timestamp + FROM sector_snapshots ss + JOIN market_snapshots ms ON ss.snapshot_id = ms.id + WHERE ss.name = ? + ORDER BY ms.id DESC LIMIT ? + """, (sector_name, n)).fetchall() + + +def get_holdings(conn): + """取活跃持仓""" + return conn.execute("SELECT code, name FROM holdings WHERE is_active=1").fetchall() + + +def get_watchlist(conn): + """取活跃自选""" + return conn.execute("SELECT code, name FROM watchlist_stocks WHERE is_active=1").fetchall() + + +def get_sector_for_stock(conn, code): + """查个股对应的板块""" + row = conn.execute( + "SELECT sector_name FROM stock_sectors WHERE code = ?", (code,) + ).fetchone() + return row[0] if row else None + + +def write_signal(conn, signal_type, sector, severity, related_stocks, + holdings_list, watchlist_list, trigger_reason, snapshot_id): + """写入信号到 sector_signals""" + # 同板块同类型24小时内已有信号则跳过 + existing = conn.execute(""" + SELECT id FROM sector_signals + WHERE sector = ? AND signal_type = ? + AND datetime(detected_at) >= datetime('now', '-1 day') + LIMIT 1 + """, (sector, signal_type)).fetchone() + if existing: + return False + + conn.execute(""" + INSERT INTO sector_signals + (signal_type, sector, severity, related_stocks, + holdings_in_sector, watchlist_in_sector, + trigger_reason, snapshot_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + signal_type, sector, severity, + json.dumps(related_stocks, ensure_ascii=False), + json.dumps(holdings_list, ensure_ascii=False) if holdings_list else None, + json.dumps(watchlist_list, ensure_ascii=False) if watchlist_list else None, + trigger_reason, snapshot_id + )) + conn.commit() + return True + + +def check_signals(conn, latest, prev_snapshots): + """对最新 snapshot 检测6类信号""" + latest_id = latest["id"] + sectors = get_sectors_for_snapshot(conn, latest_id) + if not sectors: + return + + # 取持仓和自选 + holdings = {r["code"]: r["name"] for r in get_holdings(conn)} + watchlist = {r["code"]: r["name"] for r in get_watchlist(conn)} + + # 取上一次 snapshot 用于对比(如果有) + prev_id = None + if len(prev_snapshots) >= 2: + prev_id = prev_snapshots[1]["id"] + prev_sectors = get_sectors_for_snapshot(conn, prev_id) if prev_id else [] + + # 构建 name→sector 映射 + prev_map = {s["name"]: s for s in prev_sectors} + + for s in sectors: + name = s["name"] + change = s["change_pct"] or 0 + net_inflow = s["net_inflow"] or 0 + up_count = s["up_count"] or 0 + down_count = s["down_count"] or 0 + total = up_count + down_count + up_ratio = up_count / total if total > 0 else None + lead_stock = s["lead_stock"] or "" + + # 近期历史 + history = get_sector_history(conn, name, 20) + + # 计算近期均值(后续多重信号共用) + mean = 0 + if len(history) >= 3: + recent_inflows = [abs(h["net_inflow"] or 0) for h in history[:10]] + mean = sum(recent_inflows) / len(recent_inflows) if recent_inflows else 0 + + # 信号1:资金异动 + if net_inflow and len(history) >= 5 and mean > 0: + recent_inflows = [abs(h["net_inflow"] or 0) for h in history[:20]] + new_mean = sum(recent_inflows) / len(recent_inflows) + std = (sum((x - new_mean) ** 2 for x in recent_inflows) / len(recent_inflows)) ** 0.5 + if std > 0 and abs(net_inflow) > mean + 3 * std: + direction = "净流入" if net_inflow > 0 else "净流出" + sev = "high" if abs(net_inflow) > mean + 5 * std else "medium" + related = _get_related_stocks(conn, name, lead_stock, change) + holdings_list = _match_holdings(related, holdings) + watchlist_list = _match_holdings(related, watchlist) + ok = write_signal(conn, "资金异动", name, sev, related, + holdings_list, watchlist_list, + f"{direction}{abs(net_inflow):.0f}亿(均值{mean:.0f}亿,超{abs(net_inflow)/max(mean,0.01):.0f}倍)", + latest_id) + if ok: + print(f" ⚠️ 资金异动 [{sev}] {name}: {direction}{abs(net_inflow):.0f}亿", flush=True) + + # 信号2:涨跌比反转(相比上一次) + prev = dict(prev_map[name]) if name in prev_map else {} + if prev and up_ratio is not None and prev["up_count"] and prev["down_count"]: + prev_total = prev["up_count"] + prev["down_count"] + prev_ratio = prev["up_count"] / prev_total if prev_total > 0 else 0 + if abs(up_ratio - prev_ratio) > 0.3: # 涨跌比变化超过30个百分点 + direction = "转强" if up_ratio > prev_ratio else "转弱" + sev = "high" if abs(up_ratio - prev_ratio) > 0.5 else "medium" + related = _get_related_stocks(conn, name, lead_stock, change) + holdings_list = _match_holdings(related, holdings) + watchlist_list = _match_holdings(related, watchlist) + ok = write_signal(conn, "涨跌比反转", name, sev, related, + holdings_list, watchlist_list, + f"上涨占比{prev_ratio*100:.0f}%→{up_ratio*100:.0f}%,{direction}", + latest_id) + if ok: + print(f" ⚠️ 涨跌比反转 [{sev}] {name}: {prev_ratio*100:.0f}%→{up_ratio*100:.0f}% {direction}", flush=True) + + # 信号3:领涨股更替 + prev_lead = prev.get("lead_stock", "") if prev else "" + if lead_stock and prev_lead and lead_stock != prev_lead: + related = _get_related_stocks(conn, name, lead_stock, change) + holdings_list = _match_holdings(related, holdings) + watchlist_list = _match_holdings(related, watchlist) + ok = write_signal(conn, "领涨股更替", name, "medium", related, + holdings_list, watchlist_list, + f"领涨股从「{prev_lead}」换成「{lead_stock}」", + latest_id) + if ok: + print(f" ⚠️ 领涨股更替 [{name}] {prev_lead} → {lead_stock}", flush=True) + + # 信号4:趋势拐点(连续净流入突然转流出,反之亦然) + if net_inflow and len(history) >= 4: + recent = [h["net_inflow"] or 0 for h in history[:4]] + all_positive = all(r > 0 for r in recent[:3]) + all_negative = all(r < 0 for r in recent[:3]) + if all_positive and net_inflow < 0 and abs(net_inflow) > mean * 0.5: + related = _get_related_stocks(conn, name, lead_stock, change) + holdings_list = _match_holdings(related, holdings) + watchlist_list = _match_holdings(related, watchlist) + ok = write_signal(conn, "趋势拐点", name, "high", related, + holdings_list, watchlist_list, + f"连续3次净流入后转流出{abs(net_inflow):.0f}亿", + latest_id) + if ok: + print(f" ⚠️ 趋势拐点 [high] {name}: 连续流入→转流出{abs(net_inflow):.0f}亿", flush=True) + elif all_negative and net_inflow > 0 and net_inflow > abs(sum(recent[:3])) * 0.5: + related = _get_related_stocks(conn, name, lead_stock, change) + holdings_list = _match_holdings(related, holdings) + watchlist_list = _match_holdings(related, watchlist) + ok = write_signal(conn, "趋势拐点", name, "high", related, + holdings_list, watchlist_list, + f"连续3次净流出后转入流{net_inflow:.0f}亿", + latest_id) + if ok: + print(f" ⚠️ 趋势拐点 [high] {name}: 连续流出→转入流{net_inflow:.0f}亿", flush=True) + + # 信号5:量价背离 + if net_inflow and change and abs(change) > 2: + if change > 0 and net_inflow < -abs(mean or 1): + related = _get_related_stocks(conn, name, lead_stock, change) + holdings_list = _match_holdings(related, holdings) + watchlist_list = _match_holdings(related, watchlist) + ok = write_signal(conn, "量价背离", name, "medium", related, + holdings_list, watchlist_list, + f"板块涨{change:+.2f}%但资金净流出{abs(net_inflow):.0f}亿", + latest_id) + if ok: + print(f" ⚠️ 量价背离 [{name}] 涨{change:+.2f}%但流出{abs(net_inflow):.0f}亿", flush=True) + elif change < 0 and net_inflow > abs(mean or 1): + related = _get_related_stocks(conn, name, lead_stock, change) + holdings_list = _match_holdings(related, holdings) + watchlist_list = _match_holdings(related, watchlist) + ok = write_signal(conn, "量价背离", name, "medium", related, + holdings_list, watchlist_list, + f"板块跌{change:+.2f}%但资金净流入{net_inflow:.0f}亿(吸筹信号)", + latest_id) + if ok: + print(f" ⚠️ 量价背离 [{name}] 跌{change:+.2f}%但流入{net_inflow:.0f}亿(吸筹)", flush=True) + + # 信号6:普涨背离 + if up_ratio is not None and change > 3 and up_ratio < 0.5: + related = _get_related_stocks(conn, name, lead_stock, change) + holdings_list = _match_holdings(related, holdings) + watchlist_list = _match_holdings(related, watchlist) + ok = write_signal(conn, "普涨背离", name, "medium", related, + holdings_list, watchlist_list, + f"板块涨{change:+.2f}%但仅{up_count}/{total}家上涨(分化严重)", + latest_id) + if ok: + print(f" ⚠️ 普涨背离 [{name}] 涨{change:+.2f}%但仅{up_count}/{total}家上涨", flush=True) + + +def _get_related_stocks(conn, sector_name, lead_stock, change_pct): + """获取板块相关个股(领涨股 + board 成分股)""" + stocks = [] + if lead_stock: + stocks.append({"name": lead_stock, "code": "", "change_pct": 0, "role": "领涨"}) + # 从 stock_sectors 表取成分股 + members = conn.execute( + "SELECT s.code, s.name FROM stocks s " + "JOIN stock_sectors ss ON s.code = ss.code " + "WHERE ss.sector_name = ? LIMIT 5", + (sector_name,) + ).fetchall() + for m in members: + if not any(s.get("name") == m["name"] for s in stocks): + stocks.append({"name": m["name"], "code": m["code"], "change_pct": 0, "role": "成分"}) + return stocks + + +def _match_holdings(stocks, holding_dict): + """匹配相关个股中的持仓/自选""" + matched = [] + for s in stocks: + code = s.get("code", "") + if code in holding_dict: + matched.append({"code": code, "name": holding_dict[code]}) + return matched + + +def main(): + conn = get_conn() + + # 取最近 snapshots + snapshots = get_recent_snapshots(conn, 5) + if len(snapshots) < 2: + print(f"数据不足: 只有 {len(snapshots)} 次采集,需要至少2次", flush=True) + conn.close() + return + + latest = dict(snapshots[0]) + print(f"检测最新 snapshot: {latest['timestamp']} (id={latest['id']})", flush=True) + + check_signals(conn, latest, snapshots) + conn.close() + print("检测完成", flush=True) + + +if __name__ == "__main__": + main() diff --git a/xiaoguo_news_processor.py b/xiaoguo_news_processor.py new file mode 100644 index 0000000..1588921 --- /dev/null +++ b/xiaoguo_news_processor.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +"""xiaoguo_news_processor.py — 小果新闻情报处理 + +配合 trend_detector(每30分)运行,处理未处理的 sector_signals。 + +流程: + 1. 读未 processed 的 signals(每次1条) + 2. akshare 搜新闻(板块相关个股 + 持仓 + 自选) + 3. 调小果 LLM 逐篇分析情感 + 4. 写入 signal_news + 5. 标记 signal.processed = true +""" + +import json +import os +import urllib.request +from datetime import datetime +from pathlib import Path + +try: + import akshare as ak + HAS_AKSHARE = True +except ImportError: + HAS_AKSHARE = False + +DATA_DIR = Path(__file__).parent / "data" +DB_PATH = DATA_DIR / "mofin.db" +XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions" +XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed" +XIAOGUO_TIMEOUT = 60 + + +def get_conn(): + import sqlite3 + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + return conn + + +def search_akshare_news(code, max_results=3): + """用 akshare 搜个股新闻""" + titles = [] + if not HAS_AKSHARE: + return titles + try: + for k in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: + os.environ.pop(k, None) + df = ak.stock_news_em(symbol=code) + for _, r in df.head(max_results).iterrows(): + title = r.get('新闻标题', '') + if title and len(title) > 5: + titles.append({"title": title, "url": r.get('新闻链接', '')}) + except: + pass + return titles + + +def call_xiaoguo(articles_text, timeout=XIAOGUO_TIMEOUT): + """调小果 LLM 分析新闻情感""" + prompt = f"""分析以下新闻标题,对每篇给出情感分类和摘要,再加总体判断。 + +新闻: +{articles_text} + +JSON格式: +{{"overall_sentiment":"利好|利空|中性","summary":"总体判断","articles":[{{"title":"","sentiment":"","summary":"","reason":""}}]}}""" + + payload = json.dumps({ + "model": XIAOGUO_MODEL, + "messages": [ + {"role": "system", "content": "你只输出JSON。"}, + {"role": "user", "content": prompt} + ], + "temperature": 0.1, + "max_tokens": 2000, + }).encode() + + opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) + req = urllib.request.Request( + XIAOGUO_API, data=payload, + headers={"Content-Type": "application/json"}, + method="POST" + ) + try: + resp = opener.open(req, timeout=timeout) + result = json.loads(resp.read()) + content = result["choices"][0]["message"]["content"] + # 从末尾提取完整JSON + depth = 0 + start = -1 + end = len(content) + for i in range(len(content) - 1, -1, -1): + if content[i] == "}": + if depth == 0: + end = i + 1 + depth += 1 + elif content[i] == "{": + depth -= 1 + if depth == 0: + start = i + break + if start >= 0: + return json.loads(content[start:end]) + except Exception as e: + print(f" 小果调用失败: {e}", flush=True) + return None + + +def main(): + conn = get_conn() + + # 读未处理的 signals(每次1条) + signals = conn.execute( + "SELECT * FROM sector_signals WHERE processed = 0 ORDER BY severity DESC, id ASC LIMIT 1" + ).fetchall() + + if not signals: + print("无未处理的信号", flush=True) + conn.close() + return + + signal = dict(signals[0]) + print(f"处理信号: [{signal['severity']}] {signal['signal_type']} {signal['sector']}", flush=True) + + # 从信号中提取需要搜索的股票代码 + sector = signal["sector"] + related = json.loads(signal["related_stocks"] or "[]") + holdings = json.loads(signal["holdings_in_sector"] or "[]") + watchlist = json.loads(signal["watchlist_in_sector"] or "[]") + + # 收集所有要搜的股票代码 + codes_to_search = [] + for item in related + holdings + watchlist: + code = item.get("code", "") + if code and code not in [c["code"] for c in codes_to_search]: + codes_to_search.append(item) + + # 如果 stock_sectors 表中有成分股数据,也搜一下 + members = conn.execute( + "SELECT s.code, s.name FROM stocks s " + "JOIN stock_sectors ss ON s.code = ss.code " + "WHERE ss.sector_name = ? LIMIT 5", (sector,) + ).fetchall() + for m in members: + if not any(c.get("code") == m["code"] for c in codes_to_search): + codes_to_search.append({"code": m["code"], "name": m["name"]}) + + # 搜新闻 + all_articles = [] + for item in codes_to_search: + code = item.get("code", "") + name = item.get("name", "") + if code: + articles = search_akshare_news(code, 3) + for a in articles: + if a["title"] not in [x["title"] for x in all_articles]: + all_articles.append(a) + print(f" 搜 {name}({code}): {len(articles)} 篇", flush=True) + + if not all_articles: + print(f" 未搜到相关新闻", flush=True) + conn.execute("UPDATE sector_signals SET processed = 1 WHERE id = ?", (signal["id"],)) + conn.commit() + conn.close() + return + + print(f" 共搜到 {len(all_articles)} 篇新闻,取前8篇分析", flush=True) + + # 只取前8篇,避免小果LLM处理超时 + batch = all_articles[:8] + + # 调小果LLM分析 + articles_text = "\n".join([f"{i+1}. {a['title']}" for i, a in enumerate(batch)]) + result = call_xiaoguo(articles_text) + + if not result: + print(" 小果分析失败", flush=True) + conn.close() + return + + # 写入 signal_news + searched_names = list(set([c.get("name", "") for c in codes_to_search if c.get("name")])) + conn.execute(""" + INSERT INTO signal_news + (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks) + VALUES (?, ?, ?, ?, ?, ?) + """, ( + signal["id"], sector, + result.get("overall_sentiment", "中性"), + result.get("summary", ""), + json.dumps(result.get("articles", []), ensure_ascii=False), + json.dumps(searched_names, ensure_ascii=False), + )) + conn.execute("UPDATE sector_signals SET processed = 1 WHERE id = ?", (signal["id"],)) + conn.commit() + + print(f" 完成: {result.get('overall_sentiment', '?')} — {str(result.get('summary', ''))[:80]}", flush=True) + conn.close() + + +if __name__ == "__main__": + main()