From 8f0ee37aa49cd659b4889ab67e9c41a3dcc7c786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Tue, 30 Jun 2026 02:46:55 +0800 Subject: [PATCH] =?UTF-8?q?sync:=20=E6=B8=B8=E7=A6=BB=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E5=BD=92=E4=BD=8D+node122=E5=9C=B0=E5=9D=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - stale_push_wlin.py/stock_scorer.py/xiaoguo_signal_consumer.py → 同步MoFin(含mo_models更新)到profile - xiaoguo_scanner.py → 从profile收入MoFin(游离归位) - 小果地址从硬编码192.168.1.122改为node122(兼容内外网自动切换) --- scripts/xiaoguo_scanner.py | 350 +++++++++++++++++++++++++++++++++++++ 1 file changed, 350 insertions(+) create mode 100644 scripts/xiaoguo_scanner.py diff --git a/scripts/xiaoguo_scanner.py b/scripts/xiaoguo_scanner.py new file mode 100644 index 0000000..f2297b0 --- /dev/null +++ b/scripts/xiaoguo_scanner.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 +"""xiaoguo_scanner.py — 小果独立扫描线 + +每5分钟跑一轮,全市场排行榜主动发现潜在标的。 +不依赖 trend_detector 信号,独立产出到 signal_news。 +""" + +import json, os, re, time, urllib.request +from pathlib import Path +from datetime import datetime + +try: + import akshare as ak + HAS_AKSHARE = True +except ImportError: + HAS_AKSHARE = False + +DATA_DIR = Path(__file__).parent / "data" +DB_PATH = DATA_DIR / "mofin.db" +XIAOGUO_API = "http://node122:18003/v1/chat/completions" +XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed" +SCAN_INTERVAL = 3600 # 同一只股1小时内不重复搜 +MAX_STOCKS_PER_RUN = 15 +ARTICLES_PER_STOCK = 3 + +# 同花顺看多榜(挖掘潜力股) +BULLISH_BOARDS = [ + ("创新高", "stock_rank_cxg_ths"), + ("量价齐升", "stock_rank_ljqs_ths"), + ("向上突破", "stock_rank_xstp_ths"), + ("连续上涨", "stock_rank_lxsz_ths"), + ("持续放量", "stock_rank_cxfl_ths"), + ("险资举牌", "stock_rank_xzjp_ths"), +] + +# 同花顺看空榜(持仓风险预警) +BEARISH_BOARDS = [ + ("创新低", "stock_rank_cxd_ths"), + ("持续缩量", "stock_rank_cxsl_ths"), + ("量价齐跌", "stock_rank_ljqd_ths"), + ("连续下跌", "stock_rank_lxxd_ths"), + ("向下突破", "stock_rank_xxtp_ths"), +] + +ALL_BOARDS = BULLISH_BOARDS + BEARISH_BOARDS +BULLISH_COUNT = len(BULLISH_BOARDS) + +# 行业领涨股扫描(不轮换,每轮都跑) +# 从 market.json 读热门行业领涨股 +SECTOR_HOT_THRESHOLD = 2.5 # 板块涨幅>2.5%时捞它的领涨股 + + +def clean_proxy(): + for k in ['http_proxy','https_proxy','HTTP_PROXY','HTTPS_PROXY']: + os.environ.pop(k, None) + + +def get_conn(): + import sqlite3 + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + return conn + + +def fetch_hot_board(): + """东方财富热榜""" + if not HAS_AKSHARE: + return [] + try: + clean_proxy() + df = ak.stock_hot_rank_em() + if df is None or len(df) == 0: + return [] + # 东方财富热榜列名变化较大,自动检测 + cols = list(df.columns) + code_candidates = [c for c in cols if any(x in c for x in ['代码', 'code', 'CODE'])] + name_candidates = [c for c in cols if any(x in c for x in ['简称', '名称', 'name', 'NAME'])] + code_col = code_candidates[0] if code_candidates else cols[1] + name_col = name_candidates[0] if name_candidates else cols[2] + return [{"code": str(r[code_col]).zfill(6).strip(), "name": str(r[name_col]).strip(), + "rank": i+1, "source": "东方财富热榜"} + for i, (_, r) in enumerate(df.head(30).iterrows())] + except Exception: + pass + return [] + + +def fetch_rotating_board(): + """同花顺轮流榜(每轮一个),返回 (股票列表, 是否看多)""" + if not HAS_AKSHARE: + return [], True + conn = get_conn() + row = conn.execute("SELECT val FROM state_meta WHERE key='xiaoguo_board_round'").fetchone() + round_idx = (int(row[0]) if row else 0) % len(ALL_BOARDS) + conn.execute("INSERT OR REPLACE INTO state_meta (key, val) VALUES ('xiaoguo_board_round', ?)", + (str((round_idx + 1) % len(ALL_BOARDS)),)) + conn.commit() + conn.close() + + board_name, func_name = ALL_BOARDS[round_idx] + is_bullish = round_idx < BULLISH_COUNT + print(f" 同花顺榜: {board_name} {'📈看多' if is_bullish else '📉看空'}", flush=True) + + try: + clean_proxy() + fn = getattr(ak, func_name) + df = fn() + cols = list(df.columns) + code_col = [c for c in cols if '代码' in c][0] + name_col = [c for c in cols if '简称' in c or '名称' in c][0] + return [{"code": str(r[code_col]).zfill(6), "name": str(r[name_col]).strip(), + "source": f"同花顺{board_name}"} + for _, r in df.head(15).iterrows()], is_bullish + except Exception as e: + print(f" {board_name}失败: {e}", flush=True) + return [], is_bullish + + +def fetch_sector_leaders(): + """从 market.json 读热门行业领涨股""" + mkt_path = DATA_DIR / "market.json" + if not mkt_path.exists(): + return [] + try: + mkt = json.loads(mkt_path.read_text()) + sectors = mkt.get("sectors", []) + + # 代码→名称映射(优先用本地缓存,避免每次跑都调akshare) + cache_path = DATA_DIR / "stock_name_code_cache.json" + name_to_code = {} + if cache_path.exists(): + name_to_code = json.loads(cache_path.read_text()) + if not name_to_code: + try: + import akshare as ak + df = ak.stock_info_a_code_name() + for _, r in df.iterrows(): + name_to_code[r["name"].strip()] = r["code"] + cache_path.write_text(json.dumps(name_to_code, ensure_ascii=False)) + print(f" 名称代码映射: {len(name_to_code)}只已缓存", flush=True) + except Exception as e: + print(f" 名称代码映射加载失败: {e}", flush=True) + + leaders = [] + seen = set() + for s in sectors: + chg = s.get("change", 0) or 0 + lead_name = s.get("lead_stock", "") + if chg < SECTOR_HOT_THRESHOLD or not lead_name or lead_name in seen: + continue + seen.add(lead_name) + code = name_to_code.get(lead_name, "") + if not code: + continue + leaders.append({ + "code": code, + "name": lead_name, + "source": f"行业领涨-{s['name']}+{chg:+.1f}%", + }) + return leaders + except Exception as e: + print(f" 行业领涨获取失败: {e}", flush=True) + return [] + + +def get_scanned_codes(conn): + """取1小时内已扫描过的代码""" + rows = conn.execute( + "SELECT code FROM xiaoguo_scan_tracker WHERE datetime(last_scanned_at) > datetime('now', '-1 hour')" + ).fetchall() + return {r[0] for r in rows} + + +def mark_scanned(conn, code, name, found): + conn.execute( + "INSERT OR REPLACE INTO xiaoguo_scan_tracker (code, name, last_scanned_at, found_count) " + "VALUES (?, ?, datetime('now','localtime'), COALESCE((SELECT found_count FROM xiaoguo_scan_tracker WHERE code=?),0)+?)", + (code, name, code, 1 if found else 0) + ) + conn.commit() + + +def search_news(code, max_results=3): + """akshare搜个股新闻""" + articles = [] + if not HAS_AKSHARE: + return articles + try: + clean_proxy() + df = ak.stock_news_em(symbol=code) + for _, r in df.head(max_results).iterrows(): + title = r.get('新闻标题', '') + content = r.get('新闻内容', '') + if title and len(title) > 5: + articles.append({"title": title, "content": content}) + except: + pass + return articles + + +def check_stock(code, name, articles): + """小果LLM判断这只股票是否有料(一次调用判断所有文章)""" + if not articles: + return None, None + + lines = [f"{i+1}. {a['title']}" for i, a in enumerate(articles[:3])] + prompt = f"""以下是最新关于{name}({code})的新闻标题。 +该股今日上了人气热榜/技术榜单。 + +新闻: +{chr(10).join(lines)} + +这只股上榜是否跟这些新闻有关?有关的话是利好还是利空? +回答格式:有关(利好|利空|中性) 或 无关 +回答:""" + + payload = json.dumps({ + "model": XIAOGUO_MODEL, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.1, "max_tokens": 100, + }).encode() + + opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) + req = urllib.request.Request(XIAOGUO_API, data=payload, + headers={"Content-Type": "application/json"}, method="POST") + try: + resp = opener.open(req, timeout=30) + reply = json.loads(resp.read())["choices"][0]["message"]["content"] + if "有关" in reply or "利好" in reply or "利空" in reply: + for s in ["利好", "利空", "中性"]: + if s in reply: + return True, s + return True, "中性" + except Exception as e: + # LLM不可达 → 降级:标记为unknown,不阻塞扫描流程 + print(f" ⚠️ 小果LLM不可达({str(e)[:30]}),降级为unknown", flush=True) + return True, "unknown" + return None, None + + +def main(): + start_time = time.time() + conn = get_conn() + + # 1. 拉板 + hot = fetch_hot_board() + rotating, is_bullish = fetch_rotating_board() + leaders = fetch_sector_leaders() + elapsed = time.time() - start_time + print(f"榜单: 东方财富{len(hot)}只, 同花顺{len(rotating)}只, 行业领涨{len(leaders)}只 ({elapsed:.0f}s)", flush=True) + + if not hot and not rotating and not leaders: + conn.close() + return + + # 加载持仓代码(用于看空榜比对) + holdings = set() + if not is_bullish: + cur = conn.execute("SELECT code FROM holdings WHERE is_active=1") + holdings = {r[0].lstrip("0") for r in cur.fetchall()} + # 也查自选 + cur2 = conn.execute("SELECT code FROM watchlist_stocks") + holdings.update({r[0].lstrip("0") for r in cur2.fetchall()}) + + # 2. 合并去重 + 看空榜只保留持仓股 + all_stocks = {} + # 行业领涨优先(热门板块龙头) + for s in (leaders if is_bullish else []) + hot + rotating: + code = s["code"] + code_stripped = code.lstrip("0") + if not is_bullish: + # 看空榜:只处理持仓/自选中的股票 + if code_stripped not in holdings: + continue + if code not in all_stocks: + all_stocks[code] = {"code": code, "name": s["name"], "sources": []} + all_stocks[code]["sources"].append(s["source"]) + + if not all_stocks: + if is_bullish: + print("榜单为空", flush=True) + else: + print(f"看空榜无持仓股命中", flush=True) + conn.close() + return + + # 3. 排除已搜索过的(看空榜不排除——每次都要检查风险) + scanned = get_scanned_codes(conn) + if is_bullish: + candidates = [s for code, s in all_stocks.items() + if code not in scanned and len(code) == 6 and code.isdigit()][:MAX_STOCKS_PER_RUN] + else: + # 看空榜:不限数量,全检 + candidates = [s for code, s in all_stocks.items() + if len(code) == 6 and code.isdigit()] + + if not candidates: + print(f"无新候选(已有 {len(scanned)} 只已扫描)", flush=True) + conn.close() + return + + print(f"待扫描: {len(candidates)} 只({'看多' if is_bullish else '看空'}榜)", flush=True) + + # 4. 逐只处理 + found_any = False + for stock in candidates: + code, name = stock["code"], stock["name"] + sources = "|".join(stock["sources"]) + + articles = search_news(code, ARTICLES_PER_STOCK) if is_bullish else [] + if not articles and is_bullish: + mark_scanned(conn, code, name, False) + continue + + has_found = False + if is_bullish: + ok, sentiment = check_stock(code, name, articles) + if ok: + has_found = True + found_any = True + conn.execute( + "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) " + "VALUES (NULL, ?, ?, ?, ?, ?, 'xiaoguo')", + (f"扫描-{name}", sentiment, f"[{sources}] {articles[0]['title'][:80]}", + json.dumps([{"title": a["title"], "sentiment": sentiment, "summary": (a.get("content") or "")[:100]} for a in articles[:3]], ensure_ascii=False), + json.dumps([code, name], ensure_ascii=False)) + ) + print(f" ✅ {name}({code}) [{sources}] {sentiment}: {articles[0]['title'][:50]}", flush=True) + mark_scanned(conn, code, name, has_found) + else: + # 看空榜:直接写入风险信号 + has_found = True + found_any = True + conn.execute( + "INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) " + "VALUES (NULL, ?, ?, ?, ?, ?, 'xiaoguo_risk')", + (f"预警-{name}", "偏空", f"[{sources}] {name}登上{sources}榜,需关注持仓风险", + json.dumps([{"title": name, "sentiment": "偏空", "summary": f"上榜{sources}"}], ensure_ascii=False), + json.dumps([code, name], ensure_ascii=False)) + ) + print(f" ⚠️ {name}({code}) [{sources}] 持仓风险信号", flush=True) + mark_scanned(conn, code, name, has_found) + + total_time = time.time() - start_time + print(f"完成: {len(candidates)}只{'看多' if is_bullish else '看空'}扫描, {'有发现' if found_any else '无发现'} ({total_time:.0f}s)", flush=True) + conn.close() + + +if __name__ == "__main__": + main()