From 1ff0c356ab2a46c3add655d748443509bd2371ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Sun, 21 Jun 2026 02:28:22 +0800 Subject: [PATCH] =?UTF-8?q?xiaoguo=5Fscanner:=20=E5=B0=8F=E6=9E=9C?= =?UTF-8?q?=E7=8B=AC=E7=AB=8B=E6=89=AB=E6=8F=8F=E7=BA=BF=EF=BC=8C5?= =?UTF-8?q?=E5=88=86=E9=92=9F=E4=B8=80=E8=BD=AE=EF=BC=8C=E4=B8=89=E6=A6=9C?= =?UTF-8?q?=E4=BA=A4=E5=8F=89=E5=8F=91=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/xiaoguo-scanner-design.md | 78 +++++++++++ mofin_db.py | 15 +++ xiaoguo_scanner.py | 236 +++++++++++++++++++++++++++++++++ 3 files changed, 329 insertions(+) create mode 100644 docs/xiaoguo-scanner-design.md create mode 100644 xiaoguo_scanner.py diff --git a/docs/xiaoguo-scanner-design.md b/docs/xiaoguo-scanner-design.md new file mode 100644 index 0000000..50ef97d --- /dev/null +++ b/docs/xiaoguo-scanner-design.md @@ -0,0 +1,78 @@ +# 小果独立扫描线 — 全市场主动发现 + +## 概述 + +不依赖趋势信号触发,小果自己盯着各种排行榜,主动发现可能有料的股票,搜新闻判断后喂给知微。 + +## 时序 + +``` +每5分钟(独立cron,不碰现有管道) +小果扫描 → 榜单采样 → 搜索新闻 → LLM判断 → signal_news + ↓ +知微在下一轮15分钟cron中读到 → 一起分析 +``` + +## 数据源:三榜交集 + +每轮同时拉: + +| 榜单 | 来源 | 速度 | 内容 | +|------|------|------|------| +| 东方财富热榜 | `stock_hot_rank_em()` | <5秒 | 全市场关注度前100,当日 | +| 同花顺轮流榜 | 以下5个轮流,一轮一个 | ~30秒 | 见下表 | + +同花顺轮流拉的榜单(每轮换一个): + +| 榜名 | 函数 | 说明 | +|------|------|------| +| 创新高 | `stock_rank_cxg_ths()` | 股价突破N日内新高 | +| 量价齐升 | `stock_rank_ljqs_ths()` | 成交量+价格同步上涨 | +| 向上突破 | `stock_rank_xstp_ths()` | 技术形态突破关键位 | +| 连续上涨 | `stock_rank_cxd_ths()` | 连续N天上涨 | +| 连续放量 | `stock_rank_cxfl_ths()` | 连续N天放量 | + +**为什么这样组合:** 东方财富热榜代表"大家都在看",同花顺榜单代表"技术面有信号"。一只股票同时上两个榜,比只上一个榜更值得关注。 + +## 去重策略 + +每搜完一只股票,记录搜索时间到 `xiaoguo_scan_tracker` 表: +- 同一股票60分钟内不重复搜索 +- 如果该股票今日已有 signal_news(来源='xiaoguo'),也不再重复 + +## 有料判断 + +小果LLM对每篇新闻判断两条: + +``` +Q1:这条新闻跟这只股票今天上榜有关吗? + → 有关(签大单/出业绩/出政策/被监管/有研报) + → 无关(大盘行情、行业普涨、泛泛而谈) + +Q2:利好还是利空? + → 利好 / 利空 / 中性 +``` + +两条都有明确结果 → 写入signal_news。 + +## signal_news 表改动 + +新增 source 字段: + +| source | 含义 | +|--------|------| +| trend | 现有管道,由 trend_detector 触发 | +| xiaoguo | 小果扫描,由榜单发现 | + +两类信号在 signal_news 里共存。知微判断时可以看到来源,后续可以用来评估哪个渠道更有效。 + +## 新增表:xiaoguo_scan_tracker + +```sql +CREATE TABLE IF NOT EXISTS xiaoguo_scan_tracker ( + code TEXT PRIMARY KEY, + name TEXT, + last_scanned_at TEXT, + found_count INTEGER DEFAULT 0 +); +``` diff --git a/mofin_db.py b/mofin_db.py index c386319..68e2d00 100644 --- a/mofin_db.py +++ b/mofin_db.py @@ -321,9 +321,24 @@ def init_all_tables(conn: sqlite3.Connection): created_at TEXT DEFAULT (datetime('now','localtime')) ); CREATE INDEX IF NOT EXISTS idx_signal_news_signal ON signal_news(signal_id); + + -- 小果扫描跟踪(去重用) + CREATE TABLE IF NOT EXISTS xiaoguo_scan_tracker ( + code TEXT PRIMARY KEY, + name TEXT, + last_scanned_at TEXT, + found_count INTEGER DEFAULT 0 + ); """) conn.commit() + # 迁移:给 signal_news 加 source 字段(幂等) + try: + conn.execute("ALTER TABLE signal_news ADD COLUMN source TEXT DEFAULT 'trend'") + except sqlite3.OperationalError: + pass + conn.commit() + # ═══════════════════════════════════════════════════════════ # 市场快照写入 diff --git a/xiaoguo_scanner.py b/xiaoguo_scanner.py new file mode 100644 index 0000000..0907236 --- /dev/null +++ b/xiaoguo_scanner.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +"""xiaoguo_scanner.py — 小果独立扫描线 + +每5分钟跑一轮,全市场排行榜主动发现潜在标的。 +不依赖 trend_detector 信号,独立产出到 signal_news。 +""" + +import json, os, re, time, urllib.request +from pathlib import Path +from datetime import datetime + +try: + import akshare as ak + HAS_AKSHARE = True +except ImportError: + HAS_AKSHARE = False + +DATA_DIR = Path(__file__).parent / "data" +DB_PATH = DATA_DIR / "mofin.db" +XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions" +XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed" +SCAN_INTERVAL = 3600 # 同一只股1小时内不重复搜 +MAX_STOCKS_PER_RUN = 15 +ARTICLES_PER_STOCK = 3 + +# 同花顺轮流榜 +ROTATING_BOARDS = [ + ("创新高", "stock_rank_cxg_ths"), + ("量价齐升", "stock_rank_ljqs_ths"), + ("向上突破", "stock_rank_xstp_ths"), + ("连续上涨", "stock_rank_cxd_ths"), + ("连续放量", "stock_rank_cxfl_ths"), +] + + +def clean_proxy(): + for k in ['http_proxy','https_proxy','HTTP_PROXY','HTTPS_PROXY']: + os.environ.pop(k, None) + + +def get_conn(): + import sqlite3 + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + return conn + + +def fetch_hot_board(): + """东方财富热榜""" + if not HAS_AKSHARE: + return [] + try: + clean_proxy() + df = ak.stock_hot_rank_em() + if df is None or len(df) == 0: + return [] + # 东方财富热榜列名变化较大,自动检测 + cols = list(df.columns) + code_candidates = [c for c in cols if any(x in c for x in ['代码', 'code', 'CODE'])] + name_candidates = [c for c in cols if any(x in c for x in ['简称', '名称', 'name', 'NAME'])] + code_col = code_candidates[0] if code_candidates else cols[1] + name_col = name_candidates[0] if name_candidates else cols[2] + return [{"code": str(r[code_col]).zfill(6).strip(), "name": str(r[name_col]).strip(), + "rank": i+1, "source": "东方财富热榜"} + for i, (_, r) in enumerate(df.head(30).iterrows())] + except Exception as e: + print(f" 热榜失败: {e}", flush=True) + return [] + + +def fetch_rotating_board(): + """同花顺轮流榜(每轮一个)""" + if not HAS_AKSHARE: + return [] + conn = get_conn() + row = conn.execute("SELECT val FROM state_meta WHERE key='xiaoguo_board_round'").fetchone() + round_idx = (int(row[0]) if row else 0) % len(ROTATING_BOARDS) + conn.execute("INSERT OR REPLACE INTO state_meta (key, val) VALUES ('xiaoguo_board_round', ?)", + (str((round_idx + 1) % len(ROTATING_BOARDS)),)) + conn.commit() + conn.close() + + board_name, func_name = ROTATING_BOARDS[round_idx] + print(f" 同花顺榜: {board_name}", flush=True) + + try: + clean_proxy() + fn = getattr(ak, func_name) + df = fn() + cols = list(df.columns) + code_col = [c for c in cols if '代码' in c][0] + name_col = [c for c in cols if '简称' in c or '名称' in c][0] + return [{"code": str(r[code_col]).zfill(6), "name": str(r[name_col]).strip(), + "source": f"同花顺{board_name}"} + for _, r in df.head(15).iterrows()] + except Exception as e: + print(f" {board_name}失败: {e}", flush=True) + return [] + + +def get_scanned_codes(conn): + """取1小时内已扫描过的代码""" + rows = conn.execute( + "SELECT code FROM xiaoguo_scan_tracker WHERE datetime(last_scanned_at) > datetime('now', '-1 hour')" + ).fetchall() + return {r[0] for r in rows} + + +def mark_scanned(conn, code, name, found): + conn.execute( + "INSERT OR REPLACE INTO xiaoguo_scan_tracker (code, name, last_scanned_at, found_count) " + "VALUES (?, ?, datetime('now','localtime'), COALESCE((SELECT found_count FROM xiaoguo_scan_tracker WHERE code=?),0)+?)", + (code, name, code, 1 if found else 0) + ) + conn.commit() + + +def search_news(code, max_results=3): + """akshare搜个股新闻""" + articles = [] + if not HAS_AKSHARE: + return articles + try: + clean_proxy() + df = ak.stock_news_em(symbol=code) + for _, r in df.head(max_results).iterrows(): + title = r.get('新闻标题', '') + content = r.get('新闻内容', '') + if title and len(title) > 5: + articles.append({"title": title, "content": content}) + except: + pass + return articles + + +def has_substance(title, content): + """小果LLM判断是否有料(返回 True/False + 情感)""" + text = title + (content or '')[:100] + prompt = f"""新闻:{text} +问:这条新闻跟该股今日上榜有关吗? +回答格式:{"有关":"利好|利空|中性"} 或 "无关" +回答:""" + + payload = json.dumps({ + "model": XIAOGUO_MODEL, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.1, "max_tokens": 100, + }).encode() + + opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) + req = urllib.request.Request(XIAOGUO_API, data=payload, + headers={"Content-Type": "application/json"}, method="POST") + try: + resp = opener.open(req, timeout=30) + reply = json.loads(resp.read())["choices"][0]["message"]["content"] + if "有关" in reply: + for s in ["利好", "利空", "中性"]: + if s in reply: + return True, s + return True, "中性" + except: + pass + return False, None + + +def main(): + start_time = time.time() + conn = get_conn() + + # 1. 拉榜 + hot = fetch_hot_board() + rotating = fetch_rotating_board() + elapsed = time.time() - start_time + print(f"榜单: 东方财富{len(hot)}只, 同花顺{len(rotating)}只 ({elapsed:.0f}s)", flush=True) + + if not hot and not rotating: + conn.close() + return + + # 2. 合并去重 + all_stocks = {} + for s in hot + rotating: + code = s["code"] + if code not in all_stocks: + all_stocks[code] = {"code": code, "name": s["name"], "sources": []} + all_stocks[code]["sources"].append(s["source"]) + + # 3. 排除已搜索过的 + scanned = get_scanned_codes(conn) + candidates = [s for code, s in all_stocks.items() + if code not in scanned and len(code) == 6 and code.isdigit()][:MAX_STOCKS_PER_RUN] + + if not candidates: + print(f"无新候选(已有 {len(scanned)} 只已扫描)", flush=True) + conn.close() + return + + print(f"待扫描: {len(candidates)} 只(跳过 {len(all_stocks)-len(candidates)} 只已扫过)", flush=True) + + # 4. 逐只搜新闻+判断 + found_any = False + for stock in candidates: + code, name = stock["code"], stock["name"] + sources = "|".join(stock["sources"]) + + articles = search_news(code, ARTICLES_PER_STOCK) + if not articles: + mark_scanned(conn, code, name, False) + continue + + has_found = False + for art in articles: + ok, sentiment = has_substance(art["title"], art.get("content", "")) + if ok: + # 有料 → 写入signal_news + conn.execute( + "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) " + "VALUES (NULL, ?, ?, ?, ?, ?, 'xiaoguo')", + (f"扫描-{name}", sentiment, f"[{sources}] {art['title'][:80]}", + json.dumps([{"title": art["title"], "sentiment": sentiment, "summary": art.get("content","")[:100]}], ensure_ascii=False), + json.dumps([name], ensure_ascii=False)) + ) + has_found = True + found_any = True + print(f" ✅ {name}({code}) [{sources}] {sentiment}: {art['title'][:50]}", flush=True) + break # 一只股有一条有料就够了 + + mark_scanned(conn, code, name, has_found) + + total_time = time.time() - start_time + print(f"完成: {len(candidates)}只扫描, {'有发现' if found_any else '无发现'} ({total_time:.0f}s)", flush=True) + conn.close() + + +if __name__ == "__main__": + main()